Why Is Consumption Progress Not Synchronized to the Server in Time
Symptom
CKafka consumption progress is not updated on the server in time, and the consumption speed cannot be seen via console.
Pre-Reserve
The CKafka cluster shows consumption progress based on the commit frequency of the client consumption offset to estimate consumption speed and progress. Therefore, prioritize troubleshooting client consumption and offset submission in such problems.
Troubleshooting Guide
Step 1: Check Whether There Is Production Data Written
If no accumulation or consumption speed is detected, check the Topic and production monitoring metrics to see if data is being written.
1. Enter the console, click instance list > ID/Name > Topic list, and check whether the end offset of related partitions updates. 2. Enter the console, click instance list > ID/Name > monitor > Topic, and check whether the production flow and total amount of messages occupying disk exceed 0. If no real-time data is written to the Topic, there are two cases:
Create a consumer group, set the auto.offset.reset=latest behavior to start consuming from the latest. Since there is no real-time data write, it will not generate commit offset action;
Topic data exceeds the retention time. If you set the auto.offset.reset=earliest behavior, it will start consuming from the beginning. Since the data has been retention, consumption failure occurs, and no commit offset action will be generated.
The above scenario does not generate commit offset action, so the server will not update the consumption progress.
Step 2: Check Client Configuration
For Kafka native Client, need to check Consumer configuration, whether auto commit offset: enable.auto.commit is enabled.
Native Kafka Java Client Example
A direct way is to check the client configuration, or you can search for ConsumerConfig in the program logs, as shown below. This log is only printed during KafkaConsumer initialization and may be unable to find if the log retention time is short.
Based on client consumption features, if auto commit offset can be enabled, you can turn on this parameter, and client consumption progress will be updated to the server;
If manual offset submission management is required, such as stricter requirements for duplicate consumption or data loss, when enable.auto.commit is off, check whether the client program synchronously or asynchronously performs offset submission actions during consumption, for example consumer.commitSync()
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(500);
for (ConsumerRecord record : consumerRecords) {
System.out.printf("receive message: partition:%d, offset:%d, time:%d, key:%s, value:%s\\n", record.partition(), record.offset(), record.timestamp(), record.key(), record.value());
}
consumer.commitSync();
}
After troubleshooting and adjustment with the Native Kafka Client, observe again whether Client consumption progress is updated to the server.
Targeting Kafka Streaming Scenarios with Relatively High Flink Occurrence Frequency
For the Flink consumer client, its underlying layer integrates with Kafka through the Flink Kafka Source API, combining with the Flink framework's fault tolerance mechanism, such as checkpoint and savepoint mechanisms. Here, two cases arise: Kafka Client's underlying enable.auto.commit handling and Flink checkpoint's offset processing.
Kafka Source submits the current consumer offset upon checkpoint completion to ensure Flink checkpoint status matches the commit offset on the Kafka broker.
If checkpoint is not enabled, Kafka Source depends on the Kafka consumer's internal scheduled auto-commit logic for offsets. The autocommit feature is configured by the two Kafka consumer configuration items: enable.auto.commit and auto.commit.interval.ms. Notably, If the user does not manually specify enable.auto.commit in the configuration when using Flink Kafka Source, the Flink framework will override enable.auto.commit=false, disabling auto-commit of offsets. If Flink checkpoint is not enabled, the user is required to manually enable this parameter.