Why Is the Consumption Offset Not Synchronized to the Server in a Timely Manner?
Issue Symptom
The consumption offset of CKafka is not updated to the server in a timely manner. Therefore, the consumption speed cannot be viewed in the console.
Preliminary Preparations
A CKafka cluster displays the consumption offset by evaluating the consumption speed and offset based on the commit frequency of the client offset. Therefore, when encountering such issues, check the client offset commit status first.
Troubleshooting Guide
Step 1: Checking Whether Production Data Is Being Written
If no consumption backlog and consumption speed have been found, you can check the topic and production monitoring metrics to see whether there is data being written.
1. Go to the console, choose Instance List > ID/Name > Topic List to check whether the end offset of the corresponding topic partition is updated. 2. Go to the console, choose Instance List > ID/Name > Monitoring > Topic to check whether metrics such as the corresponding production traffic and the total disk space used by messages in the topic are greater than 0. If no real-time data is being written to the topic, there are two possible causes:
If you create a consumer group and set auto.offset.reset=latest, data will be consumed from the latest offset. Since no real-time data is written to the topic, no offsets will be committed.
If the topic data has exceeded the retention period and you set auto.offset.reset=earliest, data will be consumed from the beginning. Since the data has been retained, no data can be consumed, and no offsets will be committed.
In the above cases, no commit offset behavior occurs. Therefore, the server does not update the consumption offset.
Step 2: Checking Client Configurations
For a native Kafka client, you need to check the consumer configurations to see whether automatic commit of offset: enable.auto.commit is enabled.
Native Kafka Java Client Examples
One method is to directly check client configurations. The other method is to search for ConsumerConfig in the program logs, as shown in the figure below. This log is only printed during KafkaConsumer initialization and might not be found if the log retention period is short.
Based on the consumption characteristics of the client, if automatic commit of offsets can be enabled, you can enable this parameter, and the client consumption offset will be updated to the server.
If manual management of offset commits is required, such as when there are stricter requirements for avoiding duplicate consumption or data loss, and enable.auto.commit is disabled, you need to check whether the client program performs synchronous or asynchronous offset commit operations during consumption, such as consumer.commitSync().
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(500);
for (ConsumerRecord record : consumerRecords) {
System.out.printf("Receive messages: partition:%d, offset:%d, time:%d, key:%s, value:%s\\n", record.partition(), record.offset(), record.timestamp(), record.key(), record.value());
}
consumer.commitSync();
}
After troubleshooting and adjusting the native Kafka client in the above ways, monitor again whether the client consumption offset has been updated to the server.
Flink Used Quite Frequently in Kafka Streaming Scenarios
For the Flink consumption client, its underlying access to Kafka is integrated with the Kafka Source API of Flink and combined with the fault tolerance mechanisms of the Flink framework, such as checkpoint and savepoint mechanisms. This presents two scenarios: underlying processing of enable.auto.commit by the Kafka client and processing of offsets by the Flink checkpoint.
Kafka Source commits the current offset when a checkpoint is completed to ensure that the checkpoint status of Flink is consistent with the committed offsets on the Kafka broker.
If the checkpoint is disabled, Kafka Source relies on the internal logic of the Kafka consumer for periodic and automatic offset committing. This automatic commit feature is configured by the two Kafka consumer configuration items: enable.auto.commit and auto.commit.interval.ms. Note that if users do not actively specify enable.auto.commit in the configuration when using Flink Kafka Source, the Flink framework will set enable.auto.commit to false, which means disabling automatic offset commits. If the Flink checkpoint is disabled, users need to manually enable this parameter.