TDMQ for CKafka (CKafka) is a high-throughput distributed messaging system widely used in numerous enterprise-level applications. High data reliability is one of its key features. This document describes the multi-faceted mechanisms ensuring high data reliability in CKafka.
Multiple Replicas and Leader Election Mechanism
Multiple Replicas
Multi-replica design enhances system availability and reliability. For CKafka, 3 replicas for message topics and at least 2 replicas for production use are recommended.
Typically, replicas are evenly distributed across all broker nodes in the cluster. The replica assignment algorithm is as follows:
1. Sort all brokers (assuming there are n brokers in total) and the partitions to be assigned.
2. Assign the i-th partition to the (i mod n)-th broker.
3. Assign the j-th replica of the i-th partition to the ((i + j) mod n)-th broker.
Leader Election Mechanism
CKafka dynamically maintains an in-sync replica (ISR) in ZooKeeper. All replicas in the ISR are ensured to have caught up with the leader. Only members in the ISR are eligible to be elected as leaders.
Assuming there are f + 1 replicas in the ISR, a partition can tolerate failures of f replicas without losing committed messages.
Assuming there are 2f + 1 replicas (including the leader and followers), it must be ensured that f + 1 replicas have replicated the message before the message is committed. To guarantee the proper election of a new leader, the number of failed replicas must not exceed f.
Leader Switch Principle
When a topic is created, the CKafka broker cluster assigns a leader for each partition, distributing the partitions of the current topic evenly across each broker.
However, after a period of usage, partitions may become unevenly distributed across brokers, or clients may throw exceptions such as BrokerNotAvailableError and NotLeaderForPartitionError during production or consumption.
This is usually caused by a partition leader switch. Typical scenarios are as follows:
When unexpected events occur on the broker where a partition leader resides, such as network interruptions, program crashes, or hardware faults, and result in the inability to communicate with the broker controller, the current topic partition will undergo a leader switch, with the leader migrating to a follower partition.
When auto.leader.rebalance.enable = true is set for the Kafka cluster and a rebalance is performed automatically, or when brokers are manually added/removed and a rebalance is manually triggered, a leader switch will also occur at this time since an automatic partition rebalance is involved.
When a leader switch occurs due to an unexpected broker outage:
If the client sets ack = all and min.insync.replicas > 1, the message will not be lost because it is acknowledged on both the leader partition and follower partition.
If the client sets ack = 1, messages within the replica.lag.time.max.ms period may not have been synchronized to the follower partition, potentially causing message loss.
When a leader switch is triggered by manual/automatic rebalancing (such as during instance upgrades, switching from single-availability zone (AZ) deployment to cross-AZ deployment, or instance migration) while brokers are functioning normally, it does not cause message loss for the following reasons:
If the client sets ack = all and min.insync.replicas > 1 , the message will not be lost because it is acknowledged on both the leader partition and follower partition.
If the client sets ack = 1, the leader switch will automatically synchronize the offsets in the partition. Therefore, messages will not be lost.
acks Parameter Configuration
1. acks = all (or -1)
When the producer sets the acks parameter to all (or -1), it indicates that the producer needs to wait for all ISR replicas to acknowledge receipt of the message before the message is considered successfully sent.
This configuration provides the highest data reliability but may increase message sending latency. For example, in a partition with 3 replicas, after the producer sends a message, it needs to wait for both the leader replica and the two follower replicas to acknowledge receipt of the message before the message is considered successfully sent.
2. acks = 1
When the acks parameter is set to 1, the producer considers the message successfully sent as long as the leader replica acknowledges receipt of the message. The synchronization status of follower replicas does not affect the producer's judgment.
This configuration strikes a balance between data reliability and message sending latency. If the leader replica fails after the message is acknowledged and the follower replicas have not fully synchronized the message, it may result in message loss.
3. acks = 0
When the acks parameter is set to 0, the producer can consider the message successfully sent without waiting for acknowledgment from any replicas. This configuration offers the lowest message sending latency but also the lowest data reliability. If a broker node fails before receiving the message, the message will be lost.
Relationship Between the Number of Replicas, acks, and min.insync.replicas
Key Rules and Configuration Suggestions
1. Correlation between the number of replicas and min.insync.replicas
Mandatory constraint: min.insync.replicas is less than or equal to the number of replicas; otherwise, the broker would reject write requests.
Recommended configuration:
Production environment: Number of replicas = 3 & min.insync.replicas = 2 (balancing disaster recovery and performance)
Financial scenario: Number of replicas = 5 & min.insync.replicas = 3 (strong consistency required)
2. Association rules between acks and min.insync.replicas
Effective conditions for acks = all:
min.insync.replicas must be set to at least 2; otherwise, when only the leader remains in the ISR, the acks setting degrades from acks = all to acks = 1, and data security cannot be guaranteed.
Potential risks for acks = 1:
If the leader replica fails and the follower replicas have not synchronized the message, the message may be lost (compensation requires retries and idempotence).
Combining Common Parameters Such as the Number of Replicas, acks, and min.insync.replicas with Scenarios to Ensure Data Reliability
|
Arbitrary value | 0 | Arbitrary value | The producer does not wait for acknowledgment. Messages may be lost before being written to any replica. | Lowest: No data reliability guarantee. |
2 | all | 1 | The value degrades to acks = 1 (The ISR may contain only the leader). Failure of one broker is tolerated. | Low: Data may be lost when the leader fails. |
3 | 1 | 1 | The producer only requires the leader's acknowledgment. The ISR may contain only the leader. Failure of two brokers is tolerated. | Medium: Data may be lost when the leader fails and no follower synchronizes it. |
3 | all | 2 | The producer needs to wait for acknowledgment from at least 2 replicas in the ISR. The ISR contains the leader and at least 1 follower. Failure of one broker is tolerated. | High: Data is not lost when at least 2 replicas in the ISR are alive. |
3 | all | 3 | The producer needs to wait for acknowledgment from all 3 replicas. The ISR must contain all replicas. Failure of 0 brokers is tolerated. | Highest: Only 0 node failures are tolerated under extreme circumstances. |
5 | all | 3 | The producer needs to wait for acknowledgment from 3 replicas. The ISR contains the leader and 2 followers. Failure of two brokers is tolerated. | High: Data is safe when most replicas are alive. |
Data Loss Scenarios and Solutions
This section describes the factors that affect the data reliability of CKafka from the producer, server (CKafka), and consumer perspectives, respectively, and provides corresponding solutions.
How Do I Deal with Data Loss on the Producer?
Causes of Data Loss
When a producer sends data to CKafka, the data may be lost due to network jitter, and CKafka does not receive the data. Possible cases:
The network load is high or the disk is busy, and the producer does not have a retry mechanism.
The disk exceeds the limit of the purchased specification. For example, if the disk specification of an instance is 9000 GB and the capacity is not expanded in time after the disk is full, data cannot be written to CKafka.
Sudden or sustained peak traffic exceeds the limit of the purchased specification. For example, if the peak throughput specification of an instance is 100 MB/s and the capacity is not expanded in time after the peak throughput has exceeded the limit for a long time, this will slow down data writing to CKafka. If the producer has a queuing timeout mechanism, data may fail to be written to CKafka.
Solutions
The producer enables the failure retry mechanism for its important data.
For disk usage, set monitoring and alarm policies when configuring instances to prevent such cases.
When the disk is full, promptly upgrade the configuration in the console (non-exclusive instances of CKafka support smooth upgrades with zero downtime and allow separate disk upgrades) or reduce disk storage by adjusting the message retention period. To minimize message loss on the producer, you can tune the buffer size through buffer.memory and batch.size (in bytes). Bigger buffers are not always better. If the producer crashes for any reason, the more data in the buffer, the more garbage that needs collection, and the slower the recovery. You should always pay attention to the number of messages produced by the producer and average message size (CKafka provides diverse monitoring metrics).
Configure producer acks.
When a producer sends data to the leader, the data reliability level can be set via request.required.acks and min.insync.replicas.
Recommended Parameter Values
The following parameter values are for reference only. The values in the actual business situation shall prevail.
Retry mechanism: message.send.max.retries=3;retry.backoff.ms=10000;
Guaranteed high reliability: request.required.acks=-1;min.insync.replicas=2;
Guaranteed high performance: request.required.acks=0;
Reliability + performance: request.required.acks=1;
How Do I Deal with Data Loss on the Server (CKafka)?
Causes of Data Loss
The leader of the partition crashes before follower replicas complete the backup. Even if a new leader is elected, the data is lost because it was not backed up in time.
Apache Kafka features an asynchronous disk writing mechanism. Data is first stored in the PageCache. If the broker gets disconnected, restarts, or fails before the data is formally written to the disk, the data in PageCache is lost because it has not been written to the disk in time.
Disk failure causes the loss of data that has been written to the disk.
Solutions
Since Apache Kafka has multiple replicas, it is recommended that you use replicas to ensure data integrity. With multiple replicas in place, data loss occurs only when all the replicas and brokers fail simultaneously, offering significantly higher reliability than single-replica configurations. Therefore, CKafka requires that at least two replicas be configured for a topic and supports configuring 3 replicas for a topic.
The CKafka service is configured with more reasonable parameters, log.flush.interval.messages and log.flush.interval.ms, to flush data to the disk.
CKafka implements specialized technical measures for disks to ensure that data reliability remains unaffected even in the event of partial disk damage.
Recommended Parameter Values
A replica in an asynchronous status can be elected as a leader: unclean.leader.election.enable=false // Disable it.
How Do I Deal with Data Loss on the Consumer?
Causes of Data Loss
The offset is committed before the data is actually consumed. If the consumer crashes midway but the offset has been refreshed, the consumer will miss a piece of data and need to reset the offset of the consumer group to restore the data.
A gap between consumption and production speeds has existed for too long, while the message retention period is too short. As a result, messages are deleted upon expiration before being consumed in time.
Solutions
Reasonably configure the parameter auto.commit.enable. When it is set to true, it indicates automatic commits. It is recommended to use scheduled commits to avoid frequently committing offsets.
Monitor the consumer status and adjust the data retention period appropriately. Monitor the current consumption offset and the number of unconsumed messages, and configure alarms to prevent messages from being deleted upon expiration due to slow consumption.
Troubleshooting Scheme for Data Loss
Printing Partitions and Offsets Locally for Troubleshooting
The print information code is as follows:
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Topic, messageKey, messageStr));
RecordMetadata recordMetadata = future.get();
log.info("partition: {}", recordMetadata.partition());
log.info("offset: {}", recordMetadata.offset());
If the partition and offset can be printed, it indicates that the sent message has been correctly saved on the server. You can then use a message query tool to query messages at relevant offsets.
If the partition and offset cannot be printed, it indicates that the message was not saved on the server, and the client needs to retry.