tencent cloud

Java SDK
Last updated:2026-01-20 17:10:14
Java SDK
Last updated: 2026-01-20 17:10:14

Background

TDMQ CKafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It provides features such as high throughput, low latency, scalability, and fault tolerance.
Kafka Clients: Kafka's built-in client, implemented in Java, is the client for Kafka's standard production and consumption protocols.
This article focuses on introducing the key parameters, practical tutorials, and common issues of the aforementioned Java client.

Producer Practices

Version Selection

Kafka client and cluster compatibility is crucial. Generally, newer client versions are compatible with older cluster versions, but the reverse may not hold true. Since the Broker version of a CKafka instance remains fixed after deployment, you can directly select a client version that matches the Broker version.
In the Java ecosystem, Spring Kafka is widely used. For the correspondence between Spring Kafka versions and Kafka Broker versions, refer to the Version Compatibility on the official Spring website.

Producer Parameters and Tuning

Producer Parameters

When using the Kafka Client to write to Kafka, the following key parameters need to be configured, and the relevant parameters and their default values are as follows:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) throws ExecutionException, InterruptedException {
// Create the Kafka producer configuration.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // List of Kafka cluster addresses in the format host1:port1,host2:port2. Producers will use this list to locate the cluster and establish connections.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Set the producer key parameters and their default values.
props.put(ProducerConfig.ACKS_CONFIG, "1"); // acks, default value is 1, the level of message acknowledgment. 0 means do not wait for acknowledgment; 1 means wait for the Leader replica to write; all or -1 means wait for all replicas to write.
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // batch.size, the batch size in bytes. The producer packs multiple messages into a batch for sending to improve performance. The default size is 16384 bytes.
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // buffer.memory, the memory size in bytes that the producer uses to buffer unsent messages. The default is 33554432, which is 32MB.
props.put(ProducerConfig.CLIENT_ID_CONFIG, ""); // client.id, Client ID. This ID can be used to identify the source of messages in server logs.
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); // compression.type, message compression type. Default is none (no compression). Options include none, gzip, snappy, lz4, zstd.
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000); // connections.max.idle.ms, the maximum idle time for connections in milliseconds. Idle connections exceeding this time will be closed. Default is 540s.
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // delivery.timeout.ms, the maximum delivery time for messages in milliseconds. Unacknowledged messages exceeding this time will be considered failed. Default is 120s.
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // enable.idempotence, whether to enable idempotency. If enabled, the producer ensures that each message is sent exactly once, even in cases of network errors or retries.
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ""); // interceptor.classes, interceptor class list. The producer will invoke these interceptors before and after sending messages.
props.put(ProducerConfig.LINGER_MS_CONFIG, 0); // linger.ms, the time to wait in milliseconds before sending. The producer waits for this period to allow additional messages to be batched together.
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); // max.block.ms, the maximum blocking time in milliseconds for the producer when obtaining metadata or buffer space.
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max.in.flight.requests.per.connection, the maximum number of unacknowledged requests allowed per connection.
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576); // max.request.size, the maximum request size in bytes
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000); // metadata.max.age.ms, the maximum age for metadata in milliseconds. Metadata older than this time will be refreshed.
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ""); // metric.reporters, metric reporter class list. The producer uses these reporters to report metrics.
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"); // partitioner.class, partitioner class. The producer uses this partitioner to determine which partition each message is sent to.
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768); // receive.buffer.bytes, the size of the receive buffer in bytes.
props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072); // send.buffer.bytes, the size of the send buffer in bytes.
props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000); // reconnect.backoff.max.ms, the maximum backoff interval for reconnection in milliseconds.
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50); // reconnect.backoff.ms, the reconnection backoff interval in milliseconds.
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // request.timeout.ms, the request timeout in milliseconds.
props.put(ProducerConfig.RETRIES_CONFIG, 2147483647); //retries the number of retries for failed sends.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); // retry.backoff.ms, the retry interval in milliseconds.
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); //transaction.timeout.ms, the transaction timeout in milliseconds
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null); // transactional.id, transactional ID. If this parameter is set, the producer will enable the transactional feature.
props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default"); // client.dns.lookup, DNS lookup policy. The options are default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only.

// Create a producer.
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Send messages.
for (int i = 0; i < 100; i++) {
String key = "key-" + i;
String value = "value-" + i;

// Create a message record.
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

// Send messages.
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully: key=" + key + ", value=" + value);
} else {
System.err.println("Message sending failed: " + exception.getMessage());
}
}
});
}

// Close the producer.
producer.close();
}
}

Parameter Description Tuning

About acks Parameter Optimization
The acks parameter controls the acknowledgment mechanism when producers send messages. Its default value is 1, meaning the producer returns after the message is sent to the Leader Broker and the Leader acknowledges the message has been written. The acks parameter also supports the following optional values:
0: Returning immediately without waiting for any acknowledgment.
1: Returning after the Leader replica acknowledges the write.
-1 or all: Returning after the Leader replica and relevant Follower replicas acknowledge the write.
As indicated, in cross-AZ scenarios and for topics with a high number of replicas, the value of the acks parameter affects message reliability and throughput. Therefore:
In scenarios involving online business messages where throughput requirements are not high, you can set the acks parameter to -1 to ensure messages are returned only after being received and acknowledged by all replicas, thereby enhancing message reliability. However, this comes at the cost of reduced write throughput and performance, resulting in increased latency.
In scenarios such as log collection, big data, or offline computing where high throughput (i.e., the amount of data written to Kafka per second) is required, setting acks to 1 can improve throughput.
Optimizing Batch-Related Parameters
By default, when transmitting the same amount of data, using a single request for network transmission instead of multiple requests can effectively reduce related computational and network resource consumption, thereby increasing overall write throughput.
Therefore, this parameter can be configured to optimize the message-sending throughput of clients. In high-throughput scenarios, it can be used in conjunction with calculation and configuration:
batch.size: defaults to 16K.
linger.ms: defaults to 0. You can appropriately increase the wait time (e.g., set to 100ms) to aggregate more messages for batch sending.
buffer.memory: defaults to 32MB. For high-throughput Producers, it can be set larger (e.g., 256MB) if heap memory is sufficient.
Optimizing Transaction Parameters

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // enable.idempotence, whether to enable idempotency. If enabled, the producer ensures that each message is sent exactly once, even in cases of network errors or retries. // whether idempotency is required; set to true for transactional scenarios
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max.in.flight.requests.per.connection, the maximum number of unacknowledged requests allowed per connection. This value should not exceed 5 in transactional scenarios.
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null); // transactional.id, transactional ID. If this parameter is set, the producer will enable the transactional feature.
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); //transaction.timeout.ms, the transaction timeout in milliseconds, which can be appropriately extended.
It should be emphasized that transactions incur additional computational costs to ensure exactly once semantics for messages.
For transactional scenarios, the transaction timeout can be appropriately increased to tolerate fluctuations caused by write latency in high-throughput scenarios.
About Compression Parameter Optimization
Kafka Java Client supports the following compression parameters:
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); // compression.type, message compression type. Default is none (no compression). Options include none, gzip, snappy, lz4, zstd.

The following compression configurations are currently supported:
none: No compression algorithm is used.
gzip: Uses the GZIP compression algorithm.
snappy: Uses the Snappy compression algorithm.
lz4: Uses the LZ4 compression algorithm.
zstd: Uses the ZSTD compression algorithm.
To use compressed messages in the Kafka Java client, set the compression.type parameter when creating the producer. For example, to use the LZ4 compression algorithm, set compression.type to lz4.
Kafka compressed messages are an optimization that trades computation for bandwidth. Although compression and decompression occur on the client side, Brokers incur additional computational costs for validating compressed messages. This is particularly significant with gzip compression, where server-side validation costs are substantial. In some cases, the trade-off may not be worthwhile, as increased computation leads to high CPU utilization on Brokers, reducing their capacity to handle other requests and degrading overall performance. To mitigate this, consider bypassing Broker validation using the following approach:
1. In the Producer, messages are independently compressed to generate compressed data packets: messageCompression, while storing the compression method in the message's key:
{"Compression","lz4"}
2. At the Producer end, send messageCompression as a normal message.
3. On the Consumer side, read the message key to obtain the compression method used and independently decompress.

Create Producer Instance

If the application requires higher throughput, asynchronous sending can be used to increase message delivery speed. Simultaneously, messages can be sent in batches to reduce network overhead and IO consumption. If the application demands higher reliability, synchronous sending should be used to ensure successful message delivery. Additionally, the ACK confirmation mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter tuning, refer to Producer Parameters and Tuning.

Synchronous sending

In the Kafka Java Client, the sample code for synchronous sending is as follows:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerSyncExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) {
// Create the Kafka producer configuration.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Set producer properties.
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// Create a producer.
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Send messages synchronously
for (int i = 0; i < 10; i++) {
String key = "sync-key-" + i;
String value = "sync-value-" + i;

// Create a message record.
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

try {
// Send a message and wait for the result.
RecordMetadata metadata = producer.send(record).get();
System.out.println("Synchronous sending successful: key=" + key + ", value=" + value);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Synchronous sending failed: " + e.getMessage());
}
}

// Close the producer.
producer.close();
}
}

Asynchronous sending

Asynchronous sending: When sending messages asynchronously, the current thread is not blocked, and the producer has a higher throughput. However, message results need to be handled through a callback function. The sample is as follows:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerAsyncExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) {
// Create the Kafka producer configuration.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Set producer properties.
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// Create a producer.
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Send messages asynchronously.
for (int i = 0; i < 10; i++) {
String key = "async-key-" + i;
String value = "async-value-" + i;

// Create a message record.
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

// Send a message and set the callback function.
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Async sending successful: key=" + key + ", value=" + value);
} else {
System.err.println("Asynchronous sending failed: " + exception.getMessage());
}
}
});
}

// Close the producer.
producer.close();
}
}



Consumer Practices

Consumer Parameters


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // "bootstrap.servers", the address of the Kafka cluster, with no default value.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "key.deserializer", the deserialization method for message keys, with no default value.
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "value.deserializer", the deserialization method for message values, with no default value.
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // "group.id", the consumer group ID, with no default value.
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // "auto.offset.reset", the behavior when the offset does not exist. "latest" means starting consumption from the latest message. Default value is "latest"
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // "enable.auto.commit", whether to automatically commit offsets. Default value is "true"
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // "auto.commit.interval.ms", the interval for auto-committing offsets in milliseconds, default value is "5000"
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // "session.timeout.ms", the session timeout for consumer group members in milliseconds, default value is "10000"
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // "max.poll.records", the maximum number of messages returned in a single poll, default value is "500"
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // "max.poll.interval.ms", the maximum allowed interval between two poll operations in milliseconds, default value is "300000"
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); // "fetch.min.bytes", the minimum amount of data the server should return. If set greater than 1, the server will wait until the accumulated data volume exceeds this value. Default value is "1"
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800"); // "fetch.max.bytes", the maximum amount of data the server should return in bytes, default value is "52428800"
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // "fetch.max.wait.ms", the maximum time the server will block waiting for data to meet the condition specified by fetch.min.bytes, in milliseconds, default value is "500"
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // "heartbeat.interval.ms", the heartbeat interval in milliseconds, default value is "3000"
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id"); // "client.id", the client ID, with no default value.
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); // "request.timeout.ms", the client request timeout in milliseconds, default value is "30000"

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}

Parameter optimization

1. When using Kafka consumers, performance can be optimized by adjusting certain parameters. Below are some common parameter tuning solutions:
fetch.min.bytes: If the minimum message size is uncertain, this parameter is recommended to be set to 1. If you know the minimum message size, you can set this value to the minimum message size.
max.poll.records: This parameter can be adjusted based on the processing capacity of your application. If your application can handle more records, you can set this parameter to a larger value to reduce the number of poll operations.
auto.commit.interval.ms: This parameter can be adjusted according to your application requirements. For automatic offset commit scenarios, it is recommended to keep the default value of 5000ms. Note that excessively frequent offset commits may impact performance and consume additional computational resources on the Broker.
client.id: A unique ID can be set for each consumer to distinguish different consumers in monitoring and logs.
The above are some common parameter tuning solutions; however, the optimal settings may vary depending on your application's specific characteristics and requirements. When tuning parameters, always remember to conduct performance testing to ensure your configuration achieves the desired results.
2. For issues related to frequent rebalance times and consumer thread blocking, refer to the following parameter optimization instructions:
session.timeout.ms: For versions prior to v0.10.2, it is appropriate to increase this parameter value. It should exceed the time taken to consume a batch of data but not exceed 30s, with 25s recommended. For v0.10.2 and later versions, retain the default value of 10s.
max.poll.records: Reduce this parameter value. It is recommended to be significantly less than the product of <number of messages consumed per second per thread> * <number of consumption threads> * <max.poll.interval.ms>.
max.poll.interval.ms: This value should be greater than <max.poll.records> / (<number of messages consumed per second per thread> * <number of consumption threads>).

Create Consumer Instance

Kafka Java Client provides a subscription model for creating consumers, offering both manual and automatic offset commit methods.

Automatic Offset Commit

Automatic offset commit: After polling messages, the consumer automatically commits offsets without manual intervention. This approach offers simplicity and ease of use but may lead to duplicate message consumption or message loss. Note that the auto.commit.interval.ms should not be set too short, as this may cause high CPU usage on the Broker, affecting the processing of other requests.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerAutoCommitExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
private static final String GROUP_ID = "test-group";

public static void main(String[] args) {
// Create Kafka consumer configuration.
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Enable automatic offset commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Automatic commit interval, unit: 5000 milliseconds

// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to a Topic.
consumer.subscribe(Collections.singletonList(TOPIC));

// Consumption message
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
// Close the consumer.
consumer.close();
}
}
}



Manual Offset Commit

Manual offset commit: After processing messages, the consumer needs to manually commit offsets. The advantage of this approach is precise control over offset commits, preventing duplicate message consumption or message loss. However, note that overly frequent manual offset commits can cause high CPU usage on the Broker, impacting performance. As message volume increases, high CPU consumption may affect other features of the Broker. Therefore, it is recommended to commit offsets at certain message intervals.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerManualCommitExample {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));

int count = 0;
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
count++;
if (count % 10 == 0) {
consumer.commitSync();
System.out.println("Committed offsets.");
}
}
}
} finally {
consumer.close();
}
}
}

Assign Consumption

Kafka Java Client's assign consumption model allows consumers to directly specify subscribed partitions instead of automatically assigning partitions through topic subscription. This model is suitable for scenarios requiring manual control over consumption partitions, such as implementing specific load balancing policies or skipping certain partitions under specific circumstances. The general process involves using the assign method to manually specify consumer partitions, the seek method to set the starting consumption offset, and then executing the consumption logic. An example is as follows:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerAssignAndSeekApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

String topic = "my-topic";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

// Set the starting consumption offset
long startPosition0 = 10L;
long startPosition1 = 20L;
consumer.seek(partition0, startPosition0);
consumer.seek(partition1, startPosition1);

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync(); // Manually commit offsets
}
} finally {
consumer.close();
}
}
}

Kafka Java Client Producer and Consumer Common Issues

Kafka Java Producer fails to send messages
First, check if the Kafka cluster's IP and port can be connected normally. If not, resolve the communication issue first.
Next, check whether the access point is correctly configured and whether the version matches the Broker version. You can refer to the sending demo in the practical tutorial.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback