initTransactions() 方法来初始化事务。commitTransaction() 方法来提交事务,所有消息将被写入到 Kafka。abortTransaction() 方法来中止事务,所有消息将不会被写入。transactional.id:每个事务性生产者都需要一个唯一的标识符。这个 ID 用于标识事务的所有消息。acks:设置为 all 以确保所有副本都确认消息。enable.idempotence:设置为 true 以启用幂等性,确保消息不会被重复发送。import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;public class TransactionalProducerDemo {public static void main(String[] args) {// Kafka 配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker 地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); // 事务 IDprops.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性// 创建 Kafka 生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 初始化事务producer.initTransactions();try {// 开始事务producer.beginTransaction();// 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);RecordMetadata metadata = producer.send(record).get(); // 发送消息并等待确认System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), metadata.partition(), metadata.offset());}// 提交事务producer.commitTransaction();System.out.println("Transaction committed successfully.");} catch (Exception e) {// 如果发生异常,回滚事务producer.abortTransaction();System.err.println("Transaction aborted due to an error: " + e.getMessage());} finally {// 关闭生产者producer.close();}}}
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class TransactionalConsumerDemo {public static void main(String[] args) {// Kafka 配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker 地址props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组 IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读取已提交的事务消息// 创建 Kafka 消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("my-topic"));try {while (true) {// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}}} catch (Exception e) {e.printStackTrace();} finally {// 关闭消费者consumer.close();}}}
事务状态日志 的内部主题来管理事务的状态。这个日志记录了每个事务的状态(如进行中、已提交、已中止)以及与该事务相关的消息。事务状态日志的管理涉及以下几个方面:checkpoint 间隔(详情可参见 社区 ISSUE)。sink.partitioner 为 Fixed 模式。文档反馈