tencent cloud


Kafka Data import

Last updated: 2022-03-02 12:12:06

    This document describes how to consume data from Kafka to Cloud Data Warehouse in real time.


    The Kafka data source cluster and the target Cloud Data Warehouse cluster must be in the same VPC.


    1. Log in to the Cloud Data Warehouse cluster and create a Kafka consumption table.
      CREATE TABLE queue (
           timestamp UInt64,
           level String,
           message String
      ) ENGINE = Kafka
                   kafka_broker_list = 'localhost:9092',
                   kafka_topic_list = 'topic',
                   kafka_group_name = 'group',
                   kafka_format = 'JSONEachRow',
                   kafka_num_consumers = 1,
                   kafka_max_block_size = 65536,
                   kafka_skip_broken_messages = 0,
                   kafka_auto_offset_reset = 'latest';

    The common parameters are described as follows:

    Name Required Description
    kafka_broker_list Yes A list of Kafka service brokers separated by commas. We recommend you use `Ip:port` instead of domain name to avoid possible DNS resolution issues.
    kafka_topic_list Yes Kafka topics separated by commas
    kafka_group_name Yes Kafka consumption group name
    kafka_format Yes Kafka data format. For information on ClickHouse-supported formats, see the parameters in Formats for Input and Output Data.
    kafka_row_delimiter No Row delimiter used to split data rows. The default value is `\n`, but you can also set it to another value according to the actual segmentation format during data write.
    kafka_num_consumers No Number of consumers for a single Kafka engine. You can increase the consumption data throughput by increasing this parameter value, but it cannot exceed the total number of partitions in the corresponding topic.
    kafka_max_block_size No Block size of the target table to which Kafka data is written. It is 65536 bytes by default. If the data size exceeds this value, the data will be flushed.
    kafka_skip_broken_messages No Number of data records with parsing exceptions that can be ignored. If the number of exceptions exceeds the specified value (`N`), the backend thread will stop. The default value is 0.
    kafka_commit_every_batch No Frequency of Kafka commit execution.
    0: commits only after the data of an entire block is written.
    1: commits after the data of each batch is written.
    kafka_auto_offset_reset No The offset from which to read Kafka data. Its value can be `earliest` or `latest`.
    1. Create a ClickHouse local table (target table).
    • If your cluster has one replica:

      CREATE TABLE daily on cluster default_cluster
         day Date,
         level String,
         total UInt64
      engine = SummingMergeTree()
      order by int_id;
    • If your cluster has two replicas:

      create table daily on cluster default_cluster
         day Date,
         level String,
         total UInt64
      engine = ReplicatedSummingMergeTree('/clickhouse/tables/test/test/{shard}', '{replica}')
      order by int_id;`
    • Create a distributed table:

      create table daily_dis on cluster default_cluster
      AS test.test
      engine = Distributed('default_cluster', 'default', 'daily', rand());
    1. Create a materialized view to sync data consumed by the Kafka consumption table to the ClickHouse target table.
      CREATE MATERIALIZED VIEW consumer TO daily
      AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
      FROM queue GROUP BY day, level;
    1. Query the data.
      SELECT level, sum(total) FROM daily GROUP BY level;


    If you want to stop receiving topic data or change the conversion logic, perform detach and attach view operations.

     DETACH TABLE consumer;
     ATTACH TABLE consumer;
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support