tencent cloud

腾讯云数据仓库 TCHouse-D

产品简介
产品概述
基本概念
集群架构
产品优势
应用场景
购买指南
计费概述
续费说明
到期与欠费说明
退费说明
配置变更计费说明
快速入门
通过控制台使用腾讯云数据仓库 TCHouse-D
通过客户端使用腾讯云数据仓库 TCHouse-D
操作指南
集群操作
监控和告警配置
账户权限管理
数据管理
查询管理
配置管理
节点管理
日志分析
SQL 工作区
开启资源隔离
开发指南
数据表设计
数据导入
数据导出
基础功能
查询优化
生态扩展功能
API 文档
History
Introduction
API Category
Making API Requests
Cluster Operation APIs
Database and Table APIs
Cluster Information Viewing APIs
Hot-Cold Data Layering APIs
Database and Operation Audit APIs
User and Permission APIs
Resource Group Management APIs
Data Types
Error Codes
云上生态
为子账号授予 CAM 策略
查询加速腾讯云 DLC
实践教程
基本功能使用
高级特性使用
资源规格选型及调优建议
命名规范及库表限制
表设计与数据导入
查询调优
建议规避的用法
通过 JDBC 方式公网访问 TCHouse-D
性能测试
TPC-H 性能测试
SSB 性能测试
TPC-DS 性能测试
常见问题
常见操作问题
常见报错
联系我们
词汇表
产品协议
服务等级协议
隐私政策
数据处理和安全协议

从 Kafka 导入数据

PDF
聚焦模式
字号
最后更新时间: 2024-06-27 11:04:55
用户可以通过提交例行导入作业,直接订阅 Kafka 中的消息数据,以近实时的方式进行数据同步。
Doris 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once 消费语义。

订阅 Kafka 消息

订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。 用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。 请注意以下使用限制:
1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
2. 支持的消息格式如下:
csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
Json 格式。
3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

访问 SSL 认证的 Kafka 集群

例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。 访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE 命令上传到 Plao 中,并且 catalog 名称为 kafkaCREATE FILE 命令的具体帮助可以参见 CREATE FILE 命令手册。这里给出示例:

上传文件

CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
上传完成后,可以通过 SHOW FILES 命令查看已上传的文件。

创建例行导入作业

创建例行导入任务的具体命令,请参阅 ROUTINE LOAD 命令手册。这里给出示例:
1. 访问无认证的 Kafka 集群。
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
max_batch_interval/max_batch_rows/max_batch_size 用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。
2. 访问 SSL 认证的 Kafka 集群。
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ",",
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list"= "broker1:9091,broker2:9091",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
);

查看导入作业状态

查看作业状态的具体命令和示例请参阅 SHOW ROUTINE LOAD 命令文档。
查看某个作业的任务运行状态的具体命令和示例请参阅 SHOW ROUTINE LOAD TASK 命令文档。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

修改作业属性

用户可以修改已经创建的作业的部分属性。具体说明请参阅 ALTER ROUTINE LOAD 命令手册。

作业控制

用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。 具体命令请参阅:STOP ROUTINE LOADPAUSE ROUTINE LOADRESUME ROUTINE LOAD 命令文档。

更多帮助

关于 ROUTINE LOAD 的更多详细语法和最佳实践,请参阅 ROUTINE LOAD 命令手册。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈