import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutionException;public class KafkaProducerExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {// Create the Kafka producer configuration.Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // List of Kafka cluster addresses in the format host1:port1,host2:port2. Producers will use this list to locate the cluster and establish connections.props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// Set the producer key parameters and their default values.props.put(ProducerConfig.ACKS_CONFIG, "1"); // acks, default value is 1, the level of message acknowledgment. 0 means do not wait for acknowledgment; 1 means wait for the Leader replica to write; all or -1 means wait for all replicas to write.props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // batch.size, the batch size in bytes. The producer packs multiple messages into a batch for sending to improve performance. The default size is 16384 bytes.props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // buffer.memory, the memory size in bytes that the producer uses to buffer unsent messages. The default is 33554432, which is 32MB.props.put(ProducerConfig.CLIENT_ID_CONFIG, ""); // client.id, Client ID. This ID can be used to identify the source of messages in server logs.props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); // compression.type, message compression type. Default is none (no compression). Options include none, gzip, snappy, lz4, zstd.props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000); // connections.max.idle.ms, the maximum idle time for connections in milliseconds. Idle connections exceeding this time will be closed. Default is 540s.props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // delivery.timeout.ms, the maximum delivery time for messages in milliseconds. Unacknowledged messages exceeding this time will be considered failed. Default is 120s.props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // enable.idempotence, whether to enable idempotency. If enabled, the producer ensures that each message is sent exactly once, even in cases of network errors or retries.props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ""); // interceptor.classes, interceptor class list. The producer will invoke these interceptors before and after sending messages.props.put(ProducerConfig.LINGER_MS_CONFIG, 0); // linger.ms, the time to wait in milliseconds before sending. The producer waits for this period to allow additional messages to be batched together.props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); // max.block.ms, the maximum blocking time in milliseconds for the producer when obtaining metadata or buffer space.props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max.in.flight.requests.per.connection, the maximum number of unacknowledged requests allowed per connection.props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576); // max.request.size, the maximum request size in bytesprops.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000); // metadata.max.age.ms, the maximum age for metadata in milliseconds. Metadata older than this time will be refreshed.props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ""); // metric.reporters, metric reporter class list. The producer uses these reporters to report metrics.props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"); // partitioner.class, partitioner class. The producer uses this partitioner to determine which partition each message is sent to.props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768); // receive.buffer.bytes, the size of the receive buffer in bytes.props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072); // send.buffer.bytes, the size of the send buffer in bytes.props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000); // reconnect.backoff.max.ms, the maximum backoff interval for reconnection in milliseconds.props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50); // reconnect.backoff.ms, the reconnection backoff interval in milliseconds.props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // request.timeout.ms, the request timeout in milliseconds.props.put(ProducerConfig.RETRIES_CONFIG, 2147483647); //retries the number of retries for failed sends.props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); // retry.backoff.ms, the retry interval in milliseconds.props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); //transaction.timeout.ms, the transaction timeout in millisecondsprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null); // transactional.id, transactional ID. If this parameter is set, the producer will enable the transactional feature.props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default"); // client.dns.lookup, DNS lookup policy. The options are default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only.// Create a producer.KafkaProducer<String, String> producer = new KafkaProducer<>(props);// Send messages.for (int i = 0; i < 100; i++) {String key = "key-" + i;String value = "value-" + i;// Create a message record.ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);// Send messages.producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully: key=" + key + ", value=" + value);} else {System.err.println("Message sending failed: " + exception.getMessage());}}});}// Close the producer.producer.close();}}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // enable.idempotence, whether to enable idempotency. If enabled, the producer ensures that each message is sent exactly once, even in cases of network errors or retries. // whether idempotency is required; set to true for transactional scenariosprops.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max.in.flight.requests.per.connection, the maximum number of unacknowledged requests allowed per connection. This value should not exceed 5 in transactional scenarios.props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null); // transactional.id, transactional ID. If this parameter is set, the producer will enable the transactional feature.props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); //transaction.timeout.ms, the transaction timeout in milliseconds, which can be appropriately extended.
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); // compression.type, message compression type. Default is none (no compression). Options include none, gzip, snappy, lz4, zstd.
{"Compression","lz4"}
import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutionException;public class KafkaProducerSyncExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";public static void main(String[] args) {// Create the Kafka producer configuration.Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// Set producer properties.props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 3);// Create a producer.KafkaProducer<String, String> producer = new KafkaProducer<>(props);// Send messages synchronouslyfor (int i = 0; i < 10; i++) {String key = "sync-key-" + i;String value = "sync-value-" + i;// Create a message record.ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);try {// Send a message and wait for the result.RecordMetadata metadata = producer.send(record).get();System.out.println("Synchronous sending successful: key=" + key + ", value=" + value);} catch (InterruptedException | ExecutionException e) {System.err.println("Synchronous sending failed: " + e.getMessage());}}// Close the producer.producer.close();}}
import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerAsyncExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";public static void main(String[] args) {// Create the Kafka producer configuration.Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// Set producer properties.props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 3);// Create a producer.KafkaProducer<String, String> producer = new KafkaProducer<>(props);// Send messages asynchronously.for (int i = 0; i < 10; i++) {String key = "async-key-" + i;String value = "async-value-" + i;// Create a message record.ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);// Send a message and set the callback function.producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Async sending successful: key=" + key + ", value=" + value);} else {System.err.println("Asynchronous sending failed: " + exception.getMessage());}}});}// Close the producer.producer.close();}}
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // "bootstrap.servers", the address of the Kafka cluster, with no default value.properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "key.deserializer", the deserialization method for message keys, with no default value.properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "value.deserializer", the deserialization method for message values, with no default value.properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // "group.id", the consumer group ID, with no default value.properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // "auto.offset.reset", the behavior when the offset does not exist. "latest" means starting consumption from the latest message. Default value is "latest"properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // "enable.auto.commit", whether to automatically commit offsets. Default value is "true"properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // "auto.commit.interval.ms", the interval for auto-committing offsets in milliseconds, default value is "5000"properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // "session.timeout.ms", the session timeout for consumer group members in milliseconds, default value is "10000"properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // "max.poll.records", the maximum number of messages returned in a single poll, default value is "500"properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // "max.poll.interval.ms", the maximum allowed interval between two poll operations in milliseconds, default value is "300000"properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); // "fetch.min.bytes", the minimum amount of data the server should return. If set greater than 1, the server will wait until the accumulated data volume exceeds this value. Default value is "1"properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800"); // "fetch.max.bytes", the maximum amount of data the server should return in bytes, default value is "52428800"properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // "fetch.max.wait.ms", the maximum time the server will block waiting for data to meet the condition specified by fetch.min.bytes, in milliseconds, default value is "500"properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // "heartbeat.interval.ms", the heartbeat interval in milliseconds, default value is "3000"properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id"); // "client.id", the client ID, with no default value.properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); // "request.timeout.ms", the client request timeout in milliseconds, default value is "30000"KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} finally {consumer.close();}}}
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerAutoCommitExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";private static final String GROUP_ID = "test-group";public static void main(String[] args) {// Create Kafka consumer configuration.Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// Enable automatic offset commitprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Automatic commit interval, unit: 5000 milliseconds// Create a consumerKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// Subscribe to a Topic.consumer.subscribe(Collections.singletonList(TOPIC));// Consumption messagetry {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}} finally {// Close the consumer.consumer.close();}}}
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerManualCommitExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList("test-topic"));int count = 0;try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());count++;if (count % 10 == 0) {consumer.commitSync();System.out.println("Committed offsets.");}}}} finally {consumer.close();}}}
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Arrays;import java.util.Properties;public class KafkaConsumerAssignAndSeekApp {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "my-topic";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);consumer.assign(Arrays.asList(partition0, partition1));// Set the starting consumption offsetlong startPosition0 = 10L;long startPosition1 = 20L;consumer.seek(partition0, startPosition0);consumer.seek(partition1, startPosition1);try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync(); // Manually commit offsets}} finally {consumer.close();}}}
Feedback