tencent cloud

Feedback

Automatically Resetting Offset

Last updated: 2024-01-09 14:47:32
    This document describes the concept and usage of the auto.offset.reset parameter.

    What is auto.offset.reset?

    The auto.offset.reset parameter defines where to start consumption if the offset of the partition to be consumed cannot be obtained. For example, it specifies how the offset will be initialized if no offset is configured for the broker (such as upon initial consumption or when the offset expired for more than seven days) or how it will be reset if the error OFFSET_OUT_OF_RANGE occurs.
    The auto.offset.reset parameter has the following valid values:
    earliest: Reset to the minimum offset in the partition.
    latest: Reset to the maximum offset in the partition. This is the default value.
    none: Report an OffsetOutOfRangeException exception without resetting the offset.

    When will OFFSET_OUT_OF_RANGE occur?

    This error indicates that the offset committed by the client is out of the offset range allowed by the broker. For example, if LogStartOffset and LogEndOffset of partition 1 in topicA are 100 and 300 respectively, but the offset committed by the client is less than 100 or greater than 300, then the broker will return this error, and the offset will be reset.
    This error may occur in the following cases:
    If an offset is set on the client but not used for consumption during a period of time, it will be deleted on the broker due to log scrolling after the message retention period set for the topic has elapsed. In this case, if it is recommitted by the client after the deletion, this error will occur.
    If the client commits an abnormal offset due to issues such as SDK bugs and network packet loss, this error will occur.
    If there are unsynced replicas on the broker and the leader is switched, follower replicas will be truncated. In this case, if the offset committed by the client falls in the truncated range, this error will occur. For more information on leader switch, see Client > What is leader switch?.

    auto.offset.reset=none Description

    Background

    You don't want the offset to be automatically reset, as your business doesn't allow such large-scale repeated consumption.
    Note:
    In this case, the consumer group will report an error for failing to find the offset in its first consumption. Therefore, you need to manually set the offset in catch.

    Note

    After auto.offset.reset is set to none, automatic offset reset can be avoided; however, as the automatic reset mechanism is disabled, when a new partition is added, the client doesn't know where to start consuming the new partition, and an exception will occur. In this case, you need to manually set a consumer group offset and start consuming.

    Instructions

    During consumption, if you set auto.offset.reset to none for the consumer, you need to capture the exception NoOffsetForPartitionException and set the offset in catch on your own. ‌You can select one of the following methods based on your actual business needs:
    Specify the offset. You need to maintain the offset, which is convenient for retries.
    Specify to start consumption from the beginning.
    Use the nearest available offset.
    Obtain and set the offset based on the timestamp.
    Below is the sample code:
    package com.tencent.tcb.operation.ckafka.plain;
    
    import com.google.common.collect.Lists;
    import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;
    import java.time.Instant;
    import java.time.temporal.ChronoUnit;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Properties;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
    import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.config.SaslConfigs;
    
    public class KafkaPlainConsumerDemo {
    
    public static void main(String args[]) {
    // Set the path of the JAAS configuration file.
    JavaKafkaConfigurer.configureSaslPlain();
    
    // Load `kafka.properties`.
    Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
    
    Properties props = new Properties();
    // Set the access point. Obtain the access point of the corresponding topic in the console.
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
    
    // Set the access protocol.
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    // Set the PLAIN mechanism.
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    // Set the maximum interval between two polls.
    // If the consumer does not return a heartbeat message within the interval, the broker will determine that the consumer is not alive, and then remove the consumer from the consumer group and trigger rebalancing. The default value is 30s.
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    // Set the maximum number of messages that can be polled at a time.
    // Do not set this parameter to an excessively large value. If polled messages are not all consumed before the next poll starts, load balancing is triggered and lagging occurs.
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
    // Set the method for deserializing messages.
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    // Set the consumer group for the current consumer instance. You need to apply for a consumer group in the console first.
    // The instances in the same consumer group consume messages in load balancing mode.
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
    
    // Consumption offset Note If `auto.offset.reset` is set to `none`, the consumer group will report an error for failing to find the offset in its first consumption. Therefore, you need to manually set the offset in `catch`.
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    // Construct a consumer object. This generates a consumer instance.
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    // Set one or more topics to which the consumer group subscribes.
    // We recommend that you configure consumer instances with the same `GROUP_ID_CONFIG` value to subscribe to the same topics.
    List<String> subscribedTopics = new ArrayList<String>();
    // If you want to subscribe to multiple topics, add the topics here.
    // You must create the topics in the console in advance.
    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic : topics) {
    subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
    // Consume messages in loop.
    while (true){
    try {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    // All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`. We recommend that you create a separate thread to consume messages and then return the result in async mode.
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(
    String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
    }
    } catch (NoOffsetForPartitionException e) {
    System.out.println(e.getMessage());
    
    // If you set `auto.offset.reset` to `none`, you need to capture the exception and set the offset on your own. ‌You can select one of the following methods based on your actual business needs:
    // Sample 1. Specify the offset. You need to maintain the offset, which is convenient for retries.
    Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);
    Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);
    consumer.seek(new TopicPartition(topicStr, 0), 0);
    
    // Sample 2. Specify to start consumption from the beginning
    consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));
    
    // Sample 3. Use the nearest available offset
    consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));
    
    // Sample 4. Obtain and set the offset based on the timestamp. For example, reset the offset to 10 minutes ago.
    Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
    Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();
    timestampsToSearch.put(new TopicPartition(topicStr, 0), value);
    Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer
    .offsetsForTimes(timestampsToSearch);
    for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap
    .entrySet()) {
    TopicPartition topicPartition = entry.getKey();
    OffsetAndTimestamp entryValue = entry.getValue();
    consumer.seek(topicPartition, entryValue.offset()); // Specify the offset. You need to maintain the offset, which is convenient for retries
    }
    
    
    }
    }
    }
    
    /**
    * Get the earliest and nearest offsets of the topic
    * @param consumer
    * @param topicStr
    * @param beginOrEnd true begin; false end
    * @return
    */
    private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,
    boolean beginOrEnd) {
    Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);
    List<TopicPartition> tp = new ArrayList<>();
    Map<Integer, Long> map = new HashMap<>();
    partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));
    Map<TopicPartition, Long> topicPartitionLongMap;
    if (beginOrEnd) {
    topicPartitionLongMap = consumer.beginningOffsets(tp);
    } else {
    topicPartitionLongMap = consumer.endOffsets(tp);
    }
    topicPartitionLongMap.forEach((key, beginOffset) -> {
    int partition = key.partition();
    map.put(partition, beginOffset);
    });
    return map;
    }
    
    }
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support