tencent cloud

弹性 MapReduce

动态与公告
产品动态
产品公告
安全公告
产品简介
产品概述
产品优势
产品架构
产品功能
应用场景
约束与限制
技术支持范围
产品发行版
购买指南
EMR on CVM 计费说明
EMR on TKE 计费说明
EMR Serverless HBase 计费说明
快速入门
EMR on CVM 快速入门
EMR on TKE 快速入门
EMR on CVM 操作指南
规划集群
管理权限
配置集群
管理集群
管理服务
监控告警
智能管家
EMR on TKE 操作指南
EMR on TKE 简介
配置集群
管理集群
管理服务
监控运维
应用分析
EMR Serverless HBase 操作指南
EMR Serverless HBase 产品简介
配额与限制
规划实例
管理实例
监控告警
开发指南
EMR 开发指南
Hadoop开发指南
Spark 开发指南
HBASE开发指南
Phoenix on Hbase 开发指南
Hive 开发指南
Presto开发指南
Sqoop 开发指南
Hue 开发指南
Oozie 开发指南
Flume 开发指南
Kerberos 开发指南
Knox 开发指南
Alluxio 开发指南
Kylin 开发指南
Livy 开发指南
Kyuubi 开发指南
Zeppelin 开发指南
Hudi 开发指南
Superset 开发指南
Impala 开发指南
Druid 开发指南
Tensorflow 开发指南
Kudu 开发指南
Ranger 开发指南
Kafka 开发指南
Iceberg 开发指南
StarRocks 开发指南
Flink 开发指南
JupyterLab 开发指南
MLflow 开发指南
实践教程
EMR on CVM 运维实践
数据迁移实践
自定义伸缩实践教程
API 文档
History
Introduction
API Category
Cluster Resource Management APIs
Cluster Services APIs
User Management APIs
Data Inquiry APIs
Scaling APIs
Configuration APIs
Other APIs
Serverless HBase APIs
YARN Resource Scheduling APIs
Making API Requests
Data Types
Error Codes
常见问题
EMR on CVM常见问题
服务等级协议
联系我们

从 Kafka 实时摄入数据

PDF
聚焦模式
字号
最后更新时间: 2025-01-03 15:02:25
本文介绍如何使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据。开始本节前,类似 Hadoop 集群,需要确保 Kafka 集群和 Druid 集群之间能够正常通信。
说明
两个集群在同一个 VPC 下,或两个集群在不同 VPC,但两个 VPC 之间能够正常通信(如通过云联网或者对等连接)。
如有必要需要将 Kafka 集群的 Host 信息配置到 Druid 集群中。

命令行方式

1. 首先在 Kafka 集群启动 kafka broker。
./bin/kafka-server-start.sh config/server.properties
1. 创建一个kafka topic,名为 mytopic。
./bin/kafka-topics.sh --create --zookeeper {kafka_zk_ip}:2181 --replication-factor 1 --partitions 1 --topic mytopic
输出:
Created topic "mytopic".
{kafka_zk_ip}:2181为 kafka 集群的 zookeeper 地址。
1. 在 Druid 集群上准备一个数据描述文件 kafka-mytopic.json。
{
"type": "kafka",
"dataSchema": {
"dataSource": "mytopic-kafka",
"parser": {
"type": "string",
"parseSpec": {
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["url", "user"]
},
"format": "json"
}
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "none"
},
"metricsSpec": [{
"type": "count",
"name": "views"
},
{
"name": "latencyMs",
"type": "doubleSum",
"fieldName": "latencyMs"
}
]
},
"ioConfig": {
"topic": "mytopic",
"consumerProperties": {
"bootstrap.servers": "{kafka_ip}:9092",
"group.id": "kafka-indexing-service"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": "100000"
}
}
{kafka_ip}:9092为您 Kafka 集群的 bootstrap.servers IP 和端口。
1. 在 Druid 集群的 Master 节点上添加 Kafka supervisor。
curl -XPOST -H 'Content-Type: application/json' -d @kafka-mytopic.json http://{druid_master_ip}:8090/druid/indexer/v1/supervisor
输出:
{"id":"mytopic-kafka"}
{druid_master_ip}:8090为 overlord 进程部署的节点,一般是 Master 节点。
1. 在 Kafka 集群上开启一个 console producer。
./bin/kafka-console-producer.sh --broker-list {kafka_ip}:9092 --topic mytopic
{kafka_ip}:9092为您 Kafka 集群的 bootstrap.servers IP 和端口。
1. 在 druid 集群准备一个查询文件,命名为 query-mytopic.json。
{
"queryType" : "search",
"dataSource" : "mytopic-kafka",
"intervals" : ["2020-03-13T00:00:00.000/2020-03-20T00:00:00.000"],
"granularity" : "all",
"searchDimensions": [
"url",
"user"
],
"query": {
"type": "insensitive_contains",
"value": "roni"
}
}
1. 在 kafka 上实时输入一些数据。
{"time": "2020-03-19T09:57:58Z", "url": "/foo/bar", "user": "brozo", "latencyMs": 62}
{"time": "2020-03-19T16:57:59Z", "url": "/", "user": "roni", "latencyMs": 15}
{"time": "2020-03-19T17:50:00Z", "url": "/foo/bar", "user": "roni", "latencyMs": 25}
时间戳生成命令:
python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
1. 在 Druid 集群上查询。
curl -XPOST -H 'Content-Type: application/json' -d @query-mytopic.json http://{druid_ip}:8082/druid/v2/?pretty
{druid_ip}:8082为您 Druid 集群的 broker 节点,一般在 Master 或 Router 节点上。 查询结果:
[ {
"timestamp" : "2020-03-19T16:00:00.000Z",
"result" : [ {
"dimension" : "user",
"value" : "roni",
"count" : 2
} ]
} ]

Web 可视化方式

您可通过 Druid Web UI 控制台可视化方式,从 Kafka 集群摄入数据并查询,详细可参考 通过 data loader 加载 Kafka 数据

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈