Parameter | Description |
bootstrap.servers | Accessed network, which can be copied in the Network column in the Access Mode section on the instance details page in the console. ![]() |
topic | Topic name, which can be copied from the Topic Management page in the console. ![]() | | You can customize it. After the demo runs successfully, you can see the consumer on the Consumer Group page. |
.public class CKafkaConfigurer {private static Properties properties;public synchronized static Properties getCKafkaProperties() {if (null != properties) {return properties;}//Obtain the content of the configuration file ``Properties kafkaProperties = new Properties();try {kafkaProperties.load(CKafkaProducerDemo.class.getClassLoader().getResourceAsStream(""));} catch (Exception e){System.out.println("getCKafkaProperties error");}properties = kafkaProperties;return kafkaProperties;}}
.public class CKafkaProducerDemo {public static void main(String args[]) {//Load ``Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();Properties properties = new Properties();//Set the access point. Obtain the access point of the topic via the, kafkaProperties.getProperty("bootstrap.servers"));//Set the method for serializing Kafka messages. `StringSerializer` is used in this,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//Set the maximum request wait timeproperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);//Set the number of retries for the clientproperties.put(ProducerConfig.RETRIES_CONFIG, 5);//Set the retry interval for the, 3000);//Construct a producer objectKafkaProducer<String, String> producer = new KafkaProducer<>(properties);//Construct a Kafka messageString topic = kafkaProperties.getProperty("topic"); //Topic of the message. Enter the topic you created in the console.String value = "this is ckafka msg value"; //Message content.try {//Batch obtaining future objects can speed up the process, but the batch size should not be too large.List<Future<RecordMetadata>> futureList = new ArrayList<>(128);for (int i = 0; i < 10; i++) {//Send the message and obtain a future objectProducerRecord<String, String> kafkaMsg = new ProducerRecord<>(topic,value + ": " + i);Future<RecordMetadata> metadataFuture = producer.send(kafkaMsg);futureList.add(metadataFuture);}producer.flush();for (Future<RecordMetadata> future : futureList) {//Sync the future object obtainedRecordMetadata recordMetadata = future.get();System.out.println("produce send ok: " + recordMetadata.toString());}} catch (Exception e){//If the sending still fails after client internal retries, the system needs to report and handle the errorSystem.out.println("error occurred");}}}
to send the message.Produce ok:ckafka-topic-demo-0@198Produce ok:ckafka-topic-demo-0@199
for a consumer to subscribe to messages.public class CKafkaConsumerDemo {public static void main(String args[]) {//Load ``Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();Properties props = new Properties();//Set the access point. Obtain the access point of the topic via the console.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));//Set the maximum interval between two polls//If the consumer does not return a heartbeat message within the interval, the broker will determine that the consumer is not alive, and then remove the consumer from the consumer group and trigger rebalancing. The default value is 30s.props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);//Set the maximum number of messages that can be polled at a time//Do not set this parameter to an excessively large value. If polled messages are not consumed before the next poll, load balancing is triggered and lagging occurs.props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);//Set the method for deserializing messagesprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//The instances in the same consumer group consume messages in load balancing modeprops.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty(""));//Create a consumer object, which means generating a consumer instanceKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//Set one or more topics to which the consumer group subscribes//You are advised to configure consumer instances with the same `GROUP_ID_CONFIG` value to subscribe to the same topicsList<String> subscribedTopics = new ArrayList<>();//If you want to subscribe to multiple topics, add the topics here//You must create the topics in the console in advance.String topicStr = kafkaProperties.getProperty("topic");String[] topics = topicStr.split(",");for (String topic : topics) {subscribedTopics.add(topic.trim());}consumer.subscribe(subscribedTopics);//Consume messages in loopwhile (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);//All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`//You are advised to create a separate thread to consume messages and then return the result in async modefor (ConsumerRecord<String, String> record : records) {System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));}} catch (Exception e){System.out.println("consumer error!");}}}}
to consume messages.Consume partition:0 offset:298Consume partition:0 offset:299
