tencent cloud

Feedback

Connecting to Legacy Self-Built Kafka

Last updated: 2024-01-09 15:02:48
    CKafka is compatible with the Producer and Consumer APIs of Apache Kafka 0.9 and above (currently, directly purchasable versions include v0.10.2, v1.1.1, v2.4.1, v2.8.1, and v3.2.3). To connect to an on-premises Kafka of an earlier version (such as v0.8), partial rewriting of APIs is needed. This document compares the Producer and Consumer APIs of Kafka 0.8 and its earlier versions, and describes how to rewrite the APIs.

    Kafka Producer

    Overview

    In Kafka 0.8.1, the Producer API is rewritten. This Producer client version is officially recommended because it provides better performance and more features. The community will maintain the new version of the Producer API (referred to as the New Producer API).

    Comparison between new producer API and old producer API

    New Producer API Demo
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:4242");
    props.put("acks", "all");
    props.put("retries",0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(0), Integer.toString(0)));
    producer.close();
    Old Producer API Demo
    Properties props = new Properties();
    props.put("metadata.broker.list", "broker1:9092");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("partitioner.class", "example.producer.SimplePartitioner");
    props.put("request.required.acks", "1");
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);
    KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
    producer.send(data);
    producer.close();
    As shown in the previous code, the basic usage of the new and old versions are the same, except for some parameter settings. This means the cost of API partial rewriting is not high.

    Compatibility description

    Producer API 0.8.x can be connected to CKafka successfully without partial rewriting. We recommend using the New Kafka Producer API.

    Kafka Consumer

    Overview

    The open-source Apache Kafka 0.8 provides two types of the Consumer API:
    High Level Consumer API (blocking configuration details)
    Simple Consumer API (support for parameter configuration adjustment)
    Kafka 0.9.x has introduced the New Consumer API that inherits the features of the two types of the old consumer API (v0.8) and reduces the load on ZooKeeper.
    The following describes how to transform the old consumer API (v0.8) to the new consumer API (v0.9).

    Comparison between new consumer API and old consumer API

    Old consumer API (v0.8)

    High Level Consumer API (Demo)
    The High Level Consumer API can meet general requirements if you care only about data, except for message offset. This API, built on the consumer group logic, blocks the offset management, and supports Broker fault handling and Consumer load balancing. It allows developers to get started with the Consumer client quickly.
    Consider the following when you use the High Level Consumer API:
    If the number of consumer threads is greater than the number of partitions, certain consumer threads cannot obtain data.
    If the number of partitions is greater than the number of threads, certain threads consume more than one partition.
    The changes in partitions and consumers will affect rebalancing.
    Low Level Consumer API (Demo) The Low Level Consumer API is recommended if you need message offset and features like repeated consumption or skip read, or if you want to consume specific partitions and ensure more consumption semantics. But in this case, you need to handle offsets and Broker exceptions. When using the Low Level Consumer API, you need to:
    Track and maintain the offset and control the consumption progress.
    Find the leader of partitions for the topic, and deal with partition changes.

    New consumer API (v0.9)

    Kafka 0.9.x has introduced the New Consumer API that inherits the features of the two types of Old Consumer API while providing consumer coordination (High Level API) and lower-level access to customize consumption policies. The New Consumer also simplifies the consumer client and introduces a central coordinator to solve the herd effect and split-brain problems resulting from the separate connections to ZooKeeper, and to reduce the load on ZooKeeper.
    Advantages:
    Introduces coordinators
    The current version of High Level Consumer has the herd effect and split-brain problems. Placing failure detection and rebalancing logics into a highly available central coordinator solves both problems while greatly reducing the load on the ZooKeeper.
    Allows you to assign partitions
    To keep certain states of each local partition unchanged, you need to keep partition mappings unchanged. Some other scenarios are designed to associate the Consumer with the region-dependent Broker.
    Allows you to manage offsets
    You can manage offsets as needed to implement repeated consumption, skipped consumption, and other semantics.
    Triggers callbacks after rebalancing based on your specifications
    Provides non-blocking Consumer API

    Comparison between new consumer API and old consumer API

    Type
    Version
    Automatic Offset Storage
    Manual Offset Management
    Automatic Exception Handling
    Automatic Rebalance Handling
    Automatic Leader Search
    Pros and Cons
    High Level Consumer
    Earlier than v0.9
    Yes
    No
    Yes
    Yes
    Yes
    Herd effect and split brain
    Simple Consumer
    Earlier than v0.9
    No
    Yes
    No
    No
    No
    Various exceptions need to be handled
    New Consumer
    Later than v0.9
    Yes
    Yes
    Yes
    Yes
    Yes
    Mature and recommended for the current version

    Transforming old consumer to new consumer

    New Consumer
    //The main configuration difference is that the ZooKeeper parameters are replaced.
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //Compared with old consumers, new consumers are easier to create.
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
    Old Consumer (High Level)
    //Old consumers require ZooKeeper
    Properties props = new Properties();
    props.put("zookeeper.connect", "localhost:2181");
    props.put("group.id", "test");
    props.put("auto.commit.enable", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "smallest");
    ConsumerConfig config = new ConsumerConfig(props);
    //Require connector creation
    ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
    //Create a message stream
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("foo", 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> streams =
    connector.createMessageStreams(topicCountMap);
    //Obtain data
    KafkaStream<byte[], byte[]> stream = streams.get("foo").get(0);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    MessageAndMetadata<byte[], byte[]> msg = null;
    while (iterator.hasNext()) {
    msg = iterator.next();
    System.out.println(//
    " group " + props.get("group.id") + //
    ", partition " + msg.partition() + ", " + //
    new String(msg.message()));
    }
    Comparing with the old consumer, the new consumer has simpler coding and uses Kafka addresses instead of ZooKeeper parameters. In addition, the new consumer has parameter settings for interactions with coordinators, in which the default settings are suitable for general use.

    Compatibility description

    Both CKafka and the new version of Kafka in the open-source community support the rewritten new consumer API, which blocks the interaction between the consumer client and ZooKeeper (ZooKeeper is not exposed to users any longer). The new consumer solves the herd effect and split brain problems resulting from direct interaction with ZooKeeper, and integrates the features of the old consumer, thus making the consumption more reliable.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support