A single producer produces messages to a single partition sequentially.
For a single client instance (a producer object created with the new
command), the total number of connections established between it and all servers ranges from one to n (n refers to the number of brokers).
Each Java producer manages TCP connections as follows:
Sender
thread will be initiated when a KafkaProducer
instance is created to establish TCP connections to all brokers in bootstrap.servers
.KafkaProducer
instance is updated, TCP connections to all brokers in the cluster will be established again.connections.max.idle.ms
parameter on the producer to a value above 0, TCP connections established in step 1 will be closed automatically. The parameter value is 9 minutes by default; that is, if no requests are sent through a TCP connection in 9 minutes, Kafka will automatically close the connection. If you set the parameter to -1, TCP connections established in step 1 cannot be closed and will become "zombie" connections.After sending a message, most clients will return a Callback
or Future
. A successful callback indicates that the message is successfully sent.
You can also check whether a message is successfully sent in the console by the following methods:
You can print the partition information returned by the send
method to check whether the message is successfully sent:
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, messageKey, messageStr));
RecordMetadata recordMetadata = future.get();
log.info("partition: {}", recordMetadata.partition());
log.info("offset: {}", recordMetadata.offset());
If the partition and offset information can be printed out, the currently sent message has been correctly saved on the server. At this time, you can use the message query tool to query the information of the relevant offset.
If the partition and offset information cannot be printed out, the message has not been saved on the server, and the client needs to retry.
When a topic is created, the Kafka broker cluster specifies a leader for each partition, and the topic partitions are evenly distributed to each broker.
As time elapses, the partitions may become unevenly distributed across brokers,
and the client may throw exceptions such as BrokerNotAvailableError
and NotLeaderForPartitionError
during the production or consumption process.
Generally, such issues occur because the partition leader has been switched as described in the following scenarios:
leader switch will occur, that is, the current topic partition leader will be replaced by a new leader elected from among the follower partitions.
auto.leader.rebalance.enable = true
to automatically trigger rebalancing or when rebalancing is manually triggered by the increase/decrease of the number of brokers, leader switch will also occur due to partition rebalancing.
When the leader is switched due to the broker’s accidental disconnection:
ack = all
and min.insync.replicas > 1
, messages won’t get lost as they have been acknowledged by both the leader and follower partitions.ack = 1
, some messages may get lost as they may fail to be synced to the follower partitions within the specified replica.lag.time.max.ms
.Messages won’t get lost if the broker works normally and the leader is switched due to the rebalancing manually/automatically triggered by instance upgrade, the single- to multi-AZ deployment mode switchover, or instance migration. This is because:
ack = all
and min.insync.replicas > 1
, messages won’t get lost as they have been acknowledged by both the leader and follower partitions.ack = 1
, messages won’t get lost as the partition offset will be automatically synced when the leader is switched.
Was this page helpful?