tencent cloud

Elastic MapReduce

Release Notes and Announcements
Release Notes
Announcements
Security Announcements
Product Introduction
Overview
Strengths
Architecture
Features
Use Cases
Constraints and Limits
Technical Support Scope
Product release
Purchase Guide
EMR on CVM Billing Instructions
EMR on TKE Billing Instructions
EMR Serverless HBase Billing Instructions
Getting Started
EMR on CVM Quick Start
EMR on TKE Quick Start
EMR on CVM Operation Guide
Planning Cluster
Administrative rights
Configuring Cluster
Managing Cluster
Managing Service
Monitoring and Alarms
TCInsight
EMR on TKE Operation Guide
Introduction to EMR on TKE
Configuring Cluster
Cluster Management
Service Management
Monitoring and Ops
Application Analysis
EMR Serverless HBase Operation Guide
EMR Serverless HBase Product Introduction
Quotas and Limits
Planning an Instance
Managing an Instance
Monitoring and Alarms
Development Guide
EMR Development Guide
Hadoop Development Guide
Spark Development Guide
Hbase Development Guide
Phoenix on Hbase Development Guide
Hive Development Guide
Presto Development Guide
Sqoop Development Guide
Hue Development Guide
Oozie Development Guide
Flume Development Guide
Kerberos Development Guide
Knox Development Guide
Alluxio Development Guide
Kylin Development Guide
Livy Development Guide
Kyuubi Development Guide
Zeppelin Development Guide
Hudi Development Guide
Superset Development Guide
Impala Development Guide
Druid Development Guide
TensorFlow Development Guide
Kudu Development Guide
Ranger Development Guide
Kafka Development Guide
Iceberg Development Guide
StarRocks Development Guide
Flink Development Guide
JupyterLab Development Guide
MLflow Development Guide
Practical Tutorial
Practice of EMR on CVM Ops
Data Migration
Practical Tutorial on Custom Scaling
API Documentation
History
Introduction
API Category
Cluster Resource Management APIs
Cluster Services APIs
User Management APIs
Data Inquiry APIs
Scaling APIs
Configuration APIs
Other APIs
Serverless HBase APIs
YARN Resource Scheduling APIs
Making API Requests
Data Types
Error Codes
FAQs
EMR on CVM
Service Level Agreement
Contact Us

Ingesting Data from Kafka in Real Time

PDF
Focus Mode
Font Size
Last updated: 2025-01-03 15:02:25
This document describes how to consume Kafka data in real time by using the Apache Druid Kafka indexing service. Before performing operations described in this document, just like in a Hadoop cluster, you need to make sure that the Kafka and Druid clusters can properly communicate with each other.
Note
The two clusters should be in the same VPC. If they are in different VPCs, the two VPCs should be able to communicate with each other (through CCN or Peering Connection, for example).
If necessary, you should configure the Druid cluster with the host information of the Kafka cluster.

Using Command Line

1. Start the Kafka broker in the Kafka cluster.
./bin/kafka-server-start.sh config/server.properties
2. Create a Kafka topic named mytopic.
./bin/kafka-topics.sh --create --zookeeper {kafka_zk_ip}:2181 --replication-factor 1 --partitions 1 --topic mytopic
Output:
Created topic "mytopic".
{kafka_zk_ip}:2181 is the ZooKeeper address of the Kafka cluster.
3. Prepare a data description file kafka-mytopic.json in the Druid cluster.
{
"type": "kafka",
"dataSchema": {
"dataSource": "mytopic-kafka",
"parser": {
"type": "string",
"parseSpec": {
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["url", "user"]
},
"format": "json"
}
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "none"
},
"metricsSpec": [{
"type": "count",
"name": "views"
},
{
"name": "latencyMs",
"type": "doubleSum",
"fieldName": "latencyMs"
}
]
},
"ioConfig": {
"topic": "mytopic",
"consumerProperties": {
"bootstrap.servers": "{kafka_ip}:9092",
"group.id": "kafka-indexing-service"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": "100000"
}
}
{kafka_ip}:9092 is the IP and port of the bootstrap.servers in your Kafka cluster.
4. Add the Kafka supervisor on the master nodes in the Druid cluster.
curl -XPOST -H 'Content-Type: application/json' -d @kafka-mytopic.json http://{druid_master_ip}:8090/druid/indexer/v1/supervisor
Output:
{"id":"mytopic-kafka"}
{druid_master_ip}:8090 is the node where the overload process is deployed, which is generally a master node.
5. Enable a console producer in the Kafka cluster.
./bin/kafka-console-producer.sh --broker-list {kafka_ip}:9092 --topic mytopic
{kafka_ip}:9092 is the IP and port of the bootstrap.servers in your Kafka cluster.
6. Prepare a query file named query-mytopic.json in the Druid cluster.
{
"queryType" : "search",
"dataSource" : "mytopic-kafka",
"intervals" : ["2020-03-13T00:00:00.000/2020-03-20T00:00:00.000"],
"granularity" : "all",
"searchDimensions": [
"url",
"user"
],
"query": {
"type": "insensitive_contains",
"value": "roni"
}
}
7. Enter some data on Kafka in real time.
{"time": "2020-03-19T09:57:58Z", "url": "/foo/bar", "user": "brozo", "latencyMs": 62}
{"time": "2020-03-19T16:57:59Z", "url": "/", "user": "roni", "latencyMs": 15}
{"time": "2020-03-19T17:50:00Z", "url": "/foo/bar", "user": "roni", "latencyMs": 25}
Timestamp generation command:
python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
8. Perform a query in the Druid cluster.
curl -XPOST -H 'Content-Type: application/json' -d @query-mytopic.json http://{druid_ip}:8082/druid/v2/?pretty
{druid_ip}:8082 is the broker node of your Druid cluster, which generally resides on a master or router node. Query result:
[ {
"timestamp" : "2020-03-19T16:00:00.000Z",
"result" : [ {
"dimension" : "user",
"value" : "roni",
"count" : 2
} ]
} ]

Using Web-Based Visualization

You can ingest data from a Kafka cluster and perform queries in the Druid Web UI Console in a visualized manner. For detailed directions, please see Loading data with the data loader.

Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback