This article describes how to import data from Kafka into a ClickHouse cluster.
For more technical exchanges on ClickHouse, submit a ticket to us, and we will add you into the ClickHouse technical exchange group.
Kafka is a widely used open source messaging middleware. A common use case of Kafka is to collect data from services as a data bus, including service, subscription, and spending data, and then generate reports or data applications. ClickHouse has a built-in Kafka engine, making it easy to integrate ClickHouse and Kafka.
Standard process for importing data from Kafka into ClickHouse:
After completing the above three steps, you can import the data from Kafka to the ClickHouse cluster.
ClickHouse provides a Kafka engine that serves as a connector (or a data stream) to access Kafka clusters. The detailed steps are as shown below:
CREATE TABLE source ( `ts` DateTime, `tag` String, `message` String ) ENGINE = Kafka() SETTINGS kafka_broker_list = '172.19.0.47:9092', kafka_topic_list = 'tag', kafka_group_name = 'clickhouse', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 1, kafka_num_consumers = 2
|kafka_broker_list||Yes||Enter Kafka brokers and separate each one with a comma|
|kafka_topic_list||Yes||Enter Kafka topics and separate each one with a comma|
|kafka_group_name||Yes||Enter the consumer group name|
|kafka_format||Yes||Kafka data format. For formats supported by ClickHouse, see Formats for Input and Output Data|
|kafka_skip_broken_messages||No||Enter an integer greater than or equal to 0, which indicates the number of errors to tolerate when parsing messages. Where N errors occur, background threads end. The materialized view will rearrange background threads to listen to data|
|kafka_num_consumers||No||Number of consumers of a single external Kafka engine table. By giving a higher value for this parameter, you can increase the throughput of consumed data, but the number of consumers should not be greater than the number of partitions in the topic|
|kafka_schema||No||If kafka_format requires a schema definition, this parameter determines the schema|
|kafka_max_block_size||No||This parameter determines the maximum block size allowed for writing Kafka data into the target table. The data will be flushed to the disk if the block size exceeds this value|
CREATE TABLE target ( `ts` DateTime, `tag` String ) ENGINE = MergeTree() PARTITION BY toYYYYMM(ts) ORDER BY tag
After completing the above three steps, you can query the data from Kafka in the target table.
CREATE MATERIALIZED VIEW source_mv TO target AS SELECT ts, tag FROM source
In the above process, the materialized view acts as an intermediate pipeline to write the data streams represented by Kafka engine to the target table. In fact, a data stream can be attached to multiple materialized views to import the data from Kafka to multiple target tables simultaneously. You can also detach a data stream from a table or attach it to a target table.