The transaction feature of Kafka is designed to support atomic operations in distributed environments. It enables producers to ensure message integrity and consistency during message sending, particularly in scenarios requiring multiple messages to be processed as a whole. The main concepts and features of Kafka transactions are introduced as follows.
Related Concepts of Transactions
Basic Concepts of Transactions
Atomicity: All operations in a transaction either succeed or fail. Kafka ensures that messages sent in a transaction are either successfully written to the topic or not written at all.
Consistency: The status of data should remain consistent before and after a transaction is executed.
Isolation: Operations between transactions are independent of each other. The execution of one transaction should not affect the execution of other transactions.
Durability: Once a transaction is committed, its results are permanent and will not be lost even in the event of a system crash.
Workflow of Transactions
The workflow of Kafka transactions mainly consists of the following steps:
1. Start a transaction: The producer calls the initTransactions() method to initialize a transaction before sending messages.
2. Send messages: The producer can send multiple messages to one or more topics, and these messages will be marked as transactional messages.
3. Commit or abort the transaction:
Commit the transaction: If all messages are successfully sent, the producer calls the commitTransaction() method to commit the transaction, and all messages will be written to Kafka.
Abort the transaction: If an error occurs during the sending process, the producer can call the abortTransaction() method to abort the transaction, and no messages will be written.
Configuration of Transactions
To use the transaction feature of Kafka, you need to set the following parameters in the producer configuration:
transactional.id: Each transaction producer requires a unique ID, which is used to identify all messages of a transaction.
acks: Set it to all to ensure that messages are acknowledged by all replicas.
enable.idempotence: Set it to true to enable idempotency, ensuring that messages are not sent repeatedly.
Limits on Transactions
Performance overheads: Using transactions incurs additional performance overheads because it requires more coordination and acknowledgment.
Transaction timeout: Kafka has a timeout limit for transactions. The default value is 60 seconds. If a transaction is not committed or aborted within this time, it will be automatically aborted.
Consumer handling: When processing transactional messages, consumers need to be aware that they can only see these messages after the transaction is committed.
Transaction Usage Examples
producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class TransactionalProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), metadata.partition(), metadata.offset());
}
producer.commitTransaction();
System.out.println("Transaction committed successfully.");
} catch (Exception e) {
producer.abortTransaction();
System.err.println("Transaction aborted due to an error: " + e.getMessage());
} finally {
producer.close();
}
}
}
consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class TransactionalConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
Kafka Transaction Management
In Kafka, transaction management involves multiple components and data structures to ensure the atomicity and consistency of transactions. The memory usage of transaction information is mainly related to the following aspects:
Transaction ID and Producer ID
Transaction ID: Each transaction has a unique ID for identification. The transaction ID, typically a string, is specified by the producer during message sending.
Producer ID: Each producer is assigned a unique ID when connecting to Kafka. This ID is used to identify the producer's messages to ensure the order and idempotency of messages.
Note:
Initializing transaction producer IDs in large quantities or frequently may cause memory overflow, leading to server overload and affecting stability.
Transaction Status Management
Kafka uses an internal topic called Transaction Status Log to manage the status of transactions. This log records the status (such as In Progress, Committed, and Aborted) of each transaction and the messages associated with the transaction. The management of transaction status logs involves the following aspects:
Data structure in memory: Kafka maintains a data structure (such as a hash table or map) in memory to store information about currently active transactions, including the transaction ID, producer ID, transaction status, and timestamp.
Persistent storage: Transaction status logs are persisted to the disk to ensure that the transaction status can be recovered during a Kafka server restart or failure recovery.
Memory Usage of Transaction Information
The memory usage of transaction information primarily depends on the following two factors:
Number of active transactions: The number of currently active transactions directly affects memory usage. Each active transaction consumes a certain amount of space in memory.
Transaction metadata: The metadata (such as the transaction ID, producer ID, and status) of each transaction also consumes memory. The specific memory usage depends on the size of the metadata.
Transaction Cleanup
To prevent excessive memory usage, Kafka periodically checks and cleans up completed transactions based on the configured expiration time. By default, these transactions are retained for 7 days and deleted upon expiration.
Common FullGC/OOM Issues in Transactions
From transaction management, it can be seen that transaction information consumes a significant amount of memory. The two most direct factors affecting the memory usage of transaction information are the number of transaction IDs and the number of producer IDs.
The number of transaction IDs refers to the number of transactions that clients initialize and commit to the broker. This is strongly correlated with the frequency at which clients commit new transactions.
The producer ID refers to the producer status information stored in each topic partition in the broker. Therefore, the number of producer IDs is strongly correlated with the number of partitions in the broker.
In transaction scenarios, transaction IDs are strongly bound to producer IDs. If a producer ID bound to the same transaction ID sends messages to all partitions in a broker, the theoretical maximum number of producer IDs in a single broker could reach the product of the number of transaction IDs and the number of partitions in that broker. Assuming the number of transaction IDs in an instance is t and the number of partitions in a broker is p, the maximum number of producer IDs is calculated as: t x p.
Note:
Therefore, assuming the number of transaction IDs in a broker is t, the average memory usage per transaction is tb, the number of partitions in a broker is p, and the average memory usage per producer ID is pb, the total memory usage of transaction information in this broker is calculated as: t x tb + t x p x pb.
It can be seen that memory usage may surge in the following two scenarios:
The client frequently commits new transaction IDs during instance initialization.
When the same transaction ID sends data to multiple partitions, the cross product of producer IDs will surge dramatically, easily using up memory.
Note:
Therefore, the above two scenarios should be avoided as much as possible, whether for Flink clients or self-implemented transaction producers. For example, for Flink, you can appropriately reduce the checkpoint frequency to decrease the frequency of transaction ID changes caused by the transaction ID prefix and random string calculation. Additionally, you should try to ensure that the same transaction ID sends data to the same partition.
Transaction Usage Notes for Flink
For Flink, the following optimization methods can be used to ensure that transaction information does not surge:
Optimize client parameters: Increase the checkpoint interval in Flink. (For details, see Community Issues.) Optimize sink.partitioner to the Fixed mode for Flink production tasks.