CKafka is compatible with production/consumption APIs of version 0.9 and later (currently available versions include 2.4.1, 2.8.1, and 3.2.3). If you want to access a self-built older version of Kafka (such as version 0.8), corresponding API modifications are required. This document compares Kafka version 0.8 with later versions from both the producer and consumer, and provides modification methods.
Kafka Producer
Overview
In Kafka version 0.8.1, the Producer API was rewritten. This client is the officially recommended edition, offering better performance and more features. The community will maintain the new version of the Producer API.
Comparison of Old and New Versions of the Producer API
Demo of the new version Producer API
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries",0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(0), Integer.toString(0)));
producer.close();
Demo of the old version Producer API
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
producer.close();
It can be seen that the usage methods of the old and new versions are basically the same, with only some parameter configurations being different. Therefore, the cost of modifications is not significant.
Compatibility Notes
For Kafka, the Producer API of version 0.8.x can successfully connect to CKafka without modifications. It is recommended to use the new Kafka Producer API.
Kafka Consumer
Overview
The open-source Apache Kafka 0.8 supports two consumer APIs, namely:
High Level Consumer API (blocking configuration details)
Simple Consumer API (supporting specifying configuration details)
Kafka 0.9.x version introduced the New Consumer, which incorporates the features of both Consumer APIs from the Old Consumer (0.8 version), reducing the load on ZooKeeper.
Therefore, the following section provides the method for migrating from the 0.8 version Consumer to the 0.9 version New Consumer.
Comparison of Old and New Versions of Consumer API
Consumer API in Version 0.8
High Level Consumer API (See Demo)
If you only require data without considering message offset-related processing, the High Level API can meet general consumption requirements. The High Level Consumer API revolves around the logical concept of Consumer Group, shielding Offset management while providing Broker exception handling and Consumer load balancing features. This enables developers to quickly get started with the Consumer client.
When using the High Level Consumer, note the following points: If the number of consuming threads exceeds the number of partitions, some consuming threads will be unable to obtain data.
If the number of partitions is greater than the number of threads, some threads will consume multiple partitions.
Changes to partitions and consumers affect rebalancing.
Low Level Consumer API (See Demo)
If users care about message offsets and require features such as message replay, skip reading, or wish to consume specific partitions while ensuring more consumption semantics, the Low Level Consumer API is recommended. However, users need to handle offsets and Broker exceptions themselves.
When using the Low Level Consumer, note the following points: Track and maintain the offset on your own to control consumption progress.
Find the leader of the corresponding partition of the topic and handle partition changes.
New Consumer API in Version 0.9
Kafka 0.9.x introduced the New Consumer, which integrates the features of the two Consumer APIs from the Old Consumer, while providing consumer coordination (high-level API) and lower-level access, and enabling the construction of custom consumption policies. The New Consumer also simplifies the consumer client by introducing a central coordinator to solve the Herd Effect and Split Brain issues caused by separate connections to ZooKeeper, while also reducing the load on ZooKeeper.
Strengths:
Introduction of Coordinator
The current version of the High Level Consumer suffers from Herd Effect and Split Brain issues. By placing failure detection and Rebalance logic into a highly available central Coordinator, both problems can be resolved. This approach also significantly reduces the load on ZooKeeper.
Allows self-assignment of partitions
To maintain the state of each local Partition, the Partition mapping must remain unchanged. In other scenarios, it associates the Consumer with region-specific brokers.
Allows self-management of Offset
You can manage the Offset as needed to achieve semantics such as message replay and skip reading.
Triggering the user-specified callback after rebalancing
Non-blocking Consumer API
Feature Comparison Between Old and New Consumer API Versions
|
| | | | | | | Herd Effect and Split Brain. |
| | | | | | | Need to handle multiple exceptions. |
| | | | | | | Mature, recommended for the current version. |
Converting an Old Consumer to a New Consumer
New Consumer
// The main change in config is that ZooKeeper parameters were replaced.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Compared to the Old Consumer, creating a consumer here is simpler.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
Old Consumer (High Level)
// The Old Consumer requires ZooKeeper.
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test");
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig config = new ConsumerConfig(props);
// Connector creation is required.
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
// Create a message stream.
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("foo", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams =
connector.createMessageStreams(topicCountMap);
// Obtain data.
KafkaStream<byte[], byte[]> stream = streams.get("foo").get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
MessageAndMetadata<byte[], byte[]> msg = null;
while (iterator.hasNext()) {
msg = iterator.next();
System.out.println(//
" group " + props.get("group.id") + //
", partition " + msg.partition() + ", " + //
new String(msg.message()));
}
As can be seen, the modification to New Consumer simplifies writing. The main change is replacing the input of ZooKeeper parameters with the Kafka address input. Meanwhile, New Consumer also adds configuration parameters for interacting with the Coordinator. In general, using the default setting is sufficient.
Compatibility Notes
CKafka is consistent with the high-version Kafka in the open-source community and supports the rewritten New Consumer API, blocking the interaction between the consumer client and ZooKeeper (ZooKeeper is no longer exposed to users). The New Consumer solves the Herd Effect and Split Brain issues caused by direct interaction with ZooKeeper and incorporates the features of the original Old Consumer, making the consumption process more reliable.