tencent cloud

Feedback

Java SDK

Last updated: 2024-05-13 15:50:52

    Overview

    TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It offers high throughput, low latency, scalability, and fault tolerance.
    Kafka Clients: These are Kafka's built-in clients, implemented in Java. They serve as clients for Kafka's standard production and consumption protocols.
    This document describes the key parameters, best practices, and FAQs about the aforementioned Java clients.

    Producer Practice

    Version Selection

    The compatibility between Kafka clients and clusters is very important. Generally, newer versions of clients are compatible with older versions of clusters, but the reverse may not necessarily be true. Typically, the version of the CKafka instance's broker is clear after deployment, so you can just choose the matching client version based on the broker's version.
    In the Java ecosystem, Spring Kafka is widely used. The correspondence between Spring Kafka versions and Kafka Broker versions can be found on the official Spring website under Version Correspondence.
    Producer parameters and optimization

    Producer Parameters

    When writing to Kafka using the Kafka Client, you need to configure the following key parameters. The parameters and their default values are:
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaProducerExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    // Create Kafka producer configuration.
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // List of Kafka cluster addresses, formatted as host1:port1,host2:port2. The producer uses this list to locate the cluster and establish a connection.
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    // Set key parameters and default values of producers.
    props.put(ProducerConfig.ACKS_CONFIG, "1");// acks: Represents the level of message confirmation, with the default value of 1. 0 means no wait for confirmation; 1 means waiting for the leader replica to be written; all or -1 means waiting for all replicas to be written.
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// batch.size, the batch sending size in bytes. The producer bundles multiple messages into a batch for sending to improve performance. The default size is 16,384 bytes.
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// buffer.memory, the memory size in bytes used by the producer for caching messages waiting to be sent. The default is 33,554,432 bytes, which is 32 MB.
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "");// client.id, the client ID. This ID can be used to identify the message source in server logs.
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");// compression.type, the message compression type. The default is none, with valid values including none, gzip, snappy, lz4, and zstd.
    props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);// connections.max.idle.ms, the maximum idle time for connections, in milliseconds. Idle connections exceeding this time will be closed. The default is 540,000 ms.
    props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);// delivery.timeout.ms, the maximum delivery time for messages, in milliseconds. Messages not acknowledged within this time will be considered failed. The default is 120,000 ms.
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);// enable.idempotence, determines whether Idempotence is enabled. If true, the producer will ensure each message is sent only once, even in the cases of network errors or retries.
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "");// interceptor.classes, a list of interceptor classes. The producer will call these interceptors before and after sending messages.
    props.put(ProducerConfig.LINGER_MS_CONFIG, 0);// linger.ms, the delay for sending messages, in milliseconds. The producer will wait for a period of time in order to batch more messages together for sending.
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);// max.block.ms, the maximum blocking time for the producer when accessing metadata or cache space, in milliseconds.
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);// max.in.flight.requests.per.connection, the maximum number of unacknowledged requests per connection.
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);// max.request.size, the maximum size of a request, in bytes.
    props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);//metadata.max.age.ms, the maximum age of metadata, in milliseconds. Metadata older than this age will be refreshed.
    props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");// metric.reporters, a list of metric reporter classes. The producer will use these reporters to report metric information.
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner");// partitioner.class, the class of the partitioner. The producer uses this partitioner to decide which partition each message should be sent to.
    props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);// receive.buffer.bytes, the size of the receive buffer, in bytes.
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);// send.buffer.bytes, the size of the send buffer, in bytes.
    props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);// reconnect.backoff.max.ms, the maximum reconnection interval, in milliseconds.
    props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);// reconnect.backoff.ms, the reconnection interval, in milliseconds.
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// request.timeout.ms, the request timeout, in milliseconds.
    props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);// retries, the number of retries upon send failure.
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);// retry.backoff.ms, the retry interval, in milliseconds.
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);// transaction.timeout.ms, the transaction timeout, in milliseconds.
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);// transactional.id, transaction ID. If this parameter is set, the producer will enable the transaction feature.
    props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default");// client.dns.lookup, DNS lookup policy. Valid values are default, use_all_dns_ips, and resolve_canonical_bootstrap_servers_only.
    
    // Create a producer.
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    // Send the message.
    for (int i = 0; i < 100; i++) {
    String key = "key-" + i;
    String value = "value-" + i;
    
    // Create a message record.
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
    
    // Send the message.
    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
    System.out.println("Message sent successfully:key=" + key + ", value=" + value);
    } else {
    System.err.println("Message sending failed:" + exception.getMessage());
    }
    }
    });
    }
    
    // Close the producer.
    producer.close();
    }
    }

    Parameter Description and Optimization

    acks Parameter Optimization

    The acks parameter is used to control the confirmation mechanism when the producer sends messages. Its default value is 1, which means that after the message is sent to the leader broker, it returns upon the leader's confirmation of the message being written. The acks parameter also has the following optional values:
    0: Do not wait for any confirmation, return directly.
    1: Wait for the leader replica to confirm the write before returning.
    -1 or all: Wait for the Leader replica and the relevant follower replicas to confirm the write before returning.
    In cross availability zone scenarios, and for topics with a higher number of replicas, the choice of acks parameter affects the message's reliability and throughput. Therefore:
    In some online business message scenarios, where throughput requirements are not high, you can set the acks parameter to -1 to ensure that the message is received and confirmed by all replicas before returning. This improves message reliability but sacrifices write throughput and performance, and the latency will increase.
    In scenarios involving big data, such as log collection, or offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to improve throughput.

    Batch Parameter Optimization

    By default, for transmitting the same volume of data, a single request's network transmission can effectively reduce related computation and network resources compared to multiple requests, thereby improving the overall write throughput.
    Therefore, this parameter can be set to optimize the client's message sending throughput capabilities. In high throughput scenarios, you can set this parameter in combination with computation:
    batch.size: Default is 16 K.
    linger.ms: Default is 0. You can appropriately increase the delay, such as setting it to 100 ms, to aggregate more messages for batch sending.
    buffer.memory: Default is 32 MB. For high-traffic producers, you can set it larger if there is sufficient heap memory, such as setting it to 256 MB.

    Transaction Parameter Optimization

    
    put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);// enable.idempotence, determines whether Idempotence is enabled. If true, the producer will ensure each message is sent only once, even in the cases of network errors or retries. // Whether Idempotence is required. It should be set to true in transaction scenarios
    put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);// max.in.flight.requests.per.connection, the maximum number of unacknowledged requests per connection. For transaction scenarios, do not make it exceed 5.
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);// transactional.id, transaction ID. If this parameter is set, the producer will enable the transaction feature.
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);// transaction.timeout.ms, the timeout for a transaction, in milliseconds. The transaction time can be extended as appropriate.
    Note that the transaction will incur additional computing resources, because it guarantee exactly once semantics.
    For transaction scenarios, it is appropriate to increase the transaction timeout to tolerate jitter brought on by write latency in high throughput scenarios.

    Compression Parameter Optimization

    The Kafka Java Client supports the following Compression Parameters:
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");// compression.type, the message compression type. The default is none, with valid values including none, gzip, snappy, lz4, zstd.
    
    Currently, it supports the following Compression configurations:
    none: No compression algorithm.
    gzip: Compress by GZIP algorithm.
    snappy: Compress by Snappy algorithm.
    lz4: Compress by LZ4 algorithm.
    zstd: Compress by ZSTD algorithm.
    To use compressed messages in Kafka Java Client, you need to set the compression.type parameter when creating a producer. For example, to use the LZ4 compression algorithm, you can set compression.type to lz4.
    Kafka message compression is an optimization method that uses compute to save bandwidth. Although Kafka message compression and decompression occur on the client side, the broker performs verification actions on compressed messages, leading to extra computation cost. As the increased compute leads to high broker CPU usage, reducing the processing capability for other requests, the overall performance drop, especially for gzip compression. The server-side verification computation cost of such compressed messages can be very high. For some cases, the cost is not worth the benefit. We recommend the following method to avoid broker verification in such cases:
    1. Independently compress message data on the producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
    {"Compression","lz4"}
    2. On the producer side, send messageCompression as a normal message.
    3. On the consumer side, read the message key, access the compression method used and performs decompression independently.

    Creating Producer Instance

    If your application needs higher throughput, you can use asynchronous sending to increase the speed of message sending. At the same time, batch message sending can be utilized to reduce network overhead and IO consumption. If the application requires higher reliability, synchronous sending can ensure message delivery success. Meanwhile, ACK confirmation mechanism and transaction mechanism can be used to ensure the reliability and consistency of messages. For specific parameter optimization, see producer parameters and optimization.

    Synchronous Sending

    An example of synchronous sending in the Kafka Java Client:
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaProducerSyncExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    
    public static void main(String[] args) {
    // Create Kafka producer configuration.
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    // Set producer parameters.
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    
    // Create a producer.
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    // Synchronously send messages.
    for (int i = 0; i < 10; i++) {
    String key = "sync-key-" + i;
    String value = "sync-value-" + i;
    
    // Create a message record.
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
    
    try {
    // Send messages and wait for results.
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("Synchronous sending succeeded:key=" + key + ", value=" + value);
    } catch (InterruptedException | ExecutionException e) {
    System.err.println("Synchronous sending failed:" + e.getMessage());
    }
    }
    
    // Close the producer.
    producer.close();
    }
    }

    Asynchronous Sending

    Asynchronous sending: When messages are sent asynchronously, the current thread won't be blocked, and the producer throughput is higher. However, message results need to be handled through a callback function. Example:
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerAsyncExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    
    public static void main(String[] args) {
    // Create Kafka producer configuration.
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    // Set producer parameters.
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    
    // Create a producer.
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    // Send messages asynchronously.
    for (int i = 0; i < 10; i++) {
    String key = "async-key-" + i;
    String value = "async-value-" + i;
    
    // Create a message record.
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
    
    // Send the message and set the callback function.
    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
    System.out.println("Asynchronous sending succeeded:key=" + key + ", value=" + value);
    } else {
    System.err.println("Asynchronous sending failed:" + exception.getMessage());
    }
    }
    });
    }
    
    // Close the producer.
    producer.close();
    }
    }
    
    

    Consumer Practice

    Consumer Parameters

    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerDemo {
    
    public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // "bootstrap.servers", address of the Kafka cluster, no default value
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "key.deserializer", method of deserializing the message key, no default value.
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "value.deserializer", method of deserializing the message value, no default value.
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // "group.id", consumer group ID, no default value.
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // "auto.offset.reset", action when the offset does not exist. "latest" means to start consuming from the newest message. Default value is "latest".
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // "enable.auto.commit", whether to automatically commit offsets. Default value is "true".
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // "auto.commit.interval.ms", interval for automatically committing offsets, in milliseconds, with the default value being "5000".
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // "session.timeout.ms", the session timeout for consumer group members, in milliseconds, with the default value being "10000".
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // "max.poll.records", the maximum number of messages to fetch in one poll, with the default value being "500".
    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // "max.poll.interval.ms", the maximum allowed time between polls, in milliseconds, with the default value being "300000".
    properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); // "fetch.min.bytes", the minimum amount of data the server should return. If it is set to more than 1, the server will wait until the accumulated data is greater than this value. Default value is "1".
    properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800"); // "fetch.max.bytes", the maximum amount of data the server can return, in bytes, with the default value being "52428800".
    properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // "fetch.max.wait.ms", the maximum wait time for satisfying the fetch.min.bytes condition, in milliseconds, with the default value being "500".
    properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // "heartbeat.interval.ms", heartbeat interval, in milliseconds, with the default value being "3000".
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id"); // "client.id", client ID, no default value.
    properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); // "request.timeout.ms", client request timeout period, in milliseconds, with the default value being "30000".
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("test-topic"));
    
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    }
    } finally {
    consumer.close();
    }
    }
    }

    Parameter Optimization

    1. When using Kafka consumers, we can optimize performance by adjusting some parameters. Here are some common parameter optimization methods:
    fetch.min.bytes: If you don't know the minimum message size, we recommend you to set this parameter to 1. You can set this value to the minimum message size if you know it.
    max.poll.records: This parameter can be adjusted based on the processing capacity of the application. If your application can handle more records, you can set this value to a larger number to reduce the frequency of poll operations.
    auto.commit.interval.ms: This parameter can be adjusted according to the needs of your application. Generally, for scenarios with automatic offset commits, it is recommended to set it to the default value of 5,000 ms. Note that excessively frequent offset commits can affect performance and additionally consume broker's computational resources.
    client.id: You can set a unique ID for each consumer to distinguish between different consumers in monitoring and logs.
    The above are some common parameter optimization methods, but the optimal settings might vary based on the features and requirements of your application. When optimize parameters, remember to always conduct performance testing to ensure the result matches your expectation.
    2. For issues of frequent rebalance and consumption thread blocking, see the following parameter optimization instructions:
    session.timeout.ms: For versions before v0.10.2, increase this parameter value appropriately to make it greater than the time it takes to consume a batch of data and not exceed 30 s. The recommended value is 25 s. For v0.10.2 and later versions, use the default value of 10 s.
    max.poll.records: Decrease this value to make it significantly less than the product of <the number of messages consumed per second per thread> x <the number of consumption threads> x <max.poll.interval.ms> as recommended.
    max.poll.interval.ms: This value should be greater than <max.poll.records> / (<the number of messages consumed per second per thread> x <the number of consumption threads>).

    Creating Consumer Instance

    The Kafka Java Client provides a subscription model to create consumers, where it offers two ways to commit the offset: manually and automatically.

    Auto-Commit Offsets

    Auto-commit offsets: Consumers automatically commit their offsets after pulling messages, eliminating the need for manual operation. This method's advantage is its simplicity and ease of use, but it may lead to duplicate message consumption or loss. Note that the auto-commit interval, auto.commit.interval.ms, should not be set too short; otherwise, it may lead to relatively high broker CPU utilization, affecting the processing of other requests.
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerAutoCommitExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    private static final String GROUP_ID = "test-group";
    
    public static void main(String[] args) {
    // Create Kafka consumer configuration.
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    // Enable auto-commit offset.
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Auto-commit interval, in units of 5000 milliseconds.
    
    // Create a consumer.
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    // Subscribe to topics.
    consumer.subscribe(Collections.singletonList(TOPIC));
    
    // Consume messages.
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("Consume message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
    record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    }
    } finally {
    // Close the consumer.
    consumer.close();
    }
    }
    }
    
    

    Manual-Commit Offsets

    Manual-commit offsets: After processing messages, consumers need to manually commit their offsets. The advantage of this method is that it allows for precise control over offset commit, avoiding duplicate message consumption or loss. However, it should be noted that manual commit can lead to high broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the broker. Therefore, it is recommended to commit offset after a certain number of messages.
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerManualCommitExample {
    
    public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("test-topic"));
    
    int count = 0;
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    count++;
    if (count % 10 == 0) {
    consumer.commitSync();
    System.out.println("Committed offsets.");
    }
    }
    }
    } finally {
    consumer.close();
    }
    }
    }

    Assign Consumption

    The Kafka Java Client's assign consumption mode allows consumers to directly specify the partitions for subscription, rather than automatically assigning partitions through topic subscription. This mode is suitable for scenarios where manual control of consumed partitions is needed, such as implementing specific cloud load balancer policy, or skipping certain partitions in some cases. The general process involves using the assign method to manually specify the partitions consumed by the consumer, setting the starting offset for consumption with the seek method, and then executing the consumption logic. For example:
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumerAssignAndSeekApp {
    public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("enable.auto.commit", "false");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    String topic = "my-topic";
    TopicPartition partition0 = new TopicPartition(topic, 0);
    TopicPartition partition1 = new TopicPartition(topic, 1);
    consumer.assign(Arrays.asList(partition0, partition1));
    
    // Set the starting offset for consumption.
    long startPosition0 = 10L;
    long startPosition1 = 20L;
    consumer.seek(partition0, startPosition0);
    consumer.seek(partition1, startPosition1);
    
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync(); // Manually submit the offset
    }
    } finally {
    consumer.close();
    }
    }
    }

    Production and Consumption FAQs of Kafka Java Client

    Kafka Java Producer is unable to send messages successfully
    First, check if the IP and port of the Kafka cluster can be connected normally. If not, resolve the connection issues first.
    Next, verify the correct configuration of the access point and whether the version matches the broker version. Send demo according to the best practices.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support