tencent cloud

数据传输服务

动态与公告
产品动态
公告
产品简介
产品概述
数据迁移功能描述
数据同步功能描述
数据订阅(Kafka 版)功能描述
产品优势
支持的地域
规格说明
购买指南
计费概述
变更配置说明
欠费说明
退费说明
快速入门
数据迁移操作指导
数据同步操作指导
数据订阅操作指导(Kafka 版)
准备工作
业务评估
网络准备
添加 DTS IP 地址至对接数据库白名单
DTS 服务权限准备
数据库及权限准备
配置自建 MySQL 系的 Binlog
数据迁移
数据迁移支持的数据库
云数据库跨账号实例间迁移
迁移至 MySQL 系列
迁移至 PostgreSQL
迁移至 MongoDB
迁移至 SQL Server
迁移至腾讯云分布式缓存数据库
任务管理
数据同步
数据同步支持的数据库
云数据库跨账号实例间同步
同步至 MySQL 系列
同步至 PostgreSQL
同步至 MongoDB
同步至 Kafka
任务管理
数据订阅(Kafka 版)
数据订阅支持的数据库
MySQL 系列数据订阅
TDSQL PostgreSQL 数据订阅
MongoDB 数据订阅
任务管理
消费管理
前置校验不通过处理方法
检查项汇总
割接说明
监控与告警
支持的监控指标
告警通知功能
通过控制台配置指标告警和事件告警
通过 API 配置指标告警和事件告警
运维管理
配置系统维护时间
任务状态扭转说明
实践教程
本地数据库同步上云
构建双向同步数据结构
构建多对一同步数据结构
构建多活数据中心
数据同步冲突策略如何选择
使用 CLB 代理将其他账号下的数据库迁移至本账号下
通过云联网方式迁移自建数据库至腾讯云数据库
DTS 性能调优最佳实践
常见问题
数据迁移
数据同步
数据订阅 Kafka 版常见问题
数据订阅正则表达式
错误处理
常见错误处理
连通性测试不通过
校验项结果不通过或者出现警告
云联网接入配置源数据时无法选择子网
迁移慢或者进度卡住
数据同步有延时
数据订阅延迟过高
数据消费异常
API 文档
History
Introduction
API Category
Making API Requests
(NewDTS) Data Migration APIs
Data Sync APIs
Data Consistency Check APIs
(NewDTS) Data Subscription APIs
Data Types
Error Codes
DTS API 2018-03-30
相关协议
服务等级协议

数据消费操作指导

PDF
聚焦模式
字号
最后更新时间: 2024-07-08 18:59:27

操作场景

数据同步到 Kafka 后,您可以通过0.11版本及以上的 Kafka 客户端 进行消费订阅数据,本文为您提供了 Java、Go、Python 语言的客户端消费 Demo 示例,方便您快速测试消费数据的流程,了解数据格式解析的方法。

注意事项

1. Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
2. 目标 Ckafka 中消息大小设置的上限需要大于源库表中单行数据的最大值,以便数据可以正常同步到目标端。
3. 在同步指定库/表对象(非源实例全部),并且采用 Kafka 单分区的场景中,DTS 解析增量数据后,仅将同步对象的数据写入 Kafka Topic 中,其他非同步对象的数据会转成空事务写入 Kafka Topic,所以在消费数据时会出现空事务。空事务的 Begin/Commit 消息中保留了事务的 GTID 信息,可以保证 GTID 的连续性和完整性。同时,在 MySQL/TDSQL-C MySQL 的消费 Demo 中,多个空事务也做了压缩处理以减少消息数量。
4. 为了保证数据可重入,DTS 同步到 Kafka 链路引入 Checkpoint 机制。消息写入 Kafka Topic 时,一般每10秒会插入一个 Checkpoint,用来标识数据同步的位点,在任务中断后再重启识别断点位置,实现断点续传。另外,消费端遇到 Checkpoint 消息会做一次 Kafka 消费位点提交,以便及时更新消费位点。
5. 数据格式选择 JSON 时,如果您使用过或者熟悉开源订阅工具 Canal,可以选择将这里消费出来的 JSON 格式数据转换成 Canal 工具兼容的数据格式,再进行后续处理,我们的 Demo 中已经提供了相关支持,在启动 Demo 的参数中添加参数 trans2canal 即可实现。目前该功能仅限 Java 语言支持。

消费 Demo 下载

在配置同步任务中,您可以选择不同的数据格式,Avro 和 JSON。Avro 采用二进制格式,消费效率更高,JSON 采用轻量级的文本格式,更加简单易用。选择的数据格式不同,参考的 Demo 示例也不同。
如下提供的 Demo 示例,均已包含对应的 Avro/JSON 协议文件,无需另外下载。
Demo 中的逻辑讲解及关键参数说明,请参考 Demo 说明
Demo 语言
Avro 格式
JSON 格式
Go
地址
地址
Java
地址
地址
Python
地址
地址

Java Demo 使用说明

编译环境:Maven 或者 Gradle 包管理工具,JDK8。用户可自行选择打包工具,如下以 Maven 为例进行介绍。 操作步骤:
1. 下载 Java Demo,然后解压该文件。
2. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件,用户根据需要选用。 使用 Maven 进行打包:mvn clean package。
3. 运行 Demo。 使用 Maven 打包后,进入目标文件夹 target,运行如下代码:java -jar consumerDemo-avro-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --trans2sql
broker 为 CKafka 的访问地址,topic 为同步任务中设置的 topic 名称,如果是多 topic 需要分别消费。这两个可通过数据同步 > 操作 > 查看获取。
group为消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
trans2sql 表示是否转换为 SQL 语句,java 代码中,携带该参数表示转换为 SQL 语句,不携带则不转换。
trans2canal 表示是否转换为 Canal 格式打印出来,携带该参数表示转换为 Canal 格式,不携带则不转换。
说明:
携带 trans2sql 时,将使用 javax.xml.bind.DatatypeConverter.printHexBinary() 将 byte 值转成16进制,请使用 JDK1.8 版本及以上避免不兼容。如果不需要转 SQL,可以注释此处代码。
4. 观察消费情况。



Golang Demo 使用说明

编译环境:Golang 1.12 及以上版本,配置好 Go Module 环境。 操作步骤:
1. 下载 Golang Demo,然后解压该文件。
2. 进入解压后的目录,运行 go build -o subscribe ./main/main.go,生成可执行文件 subscribe。
3. 运行 ./subscribe --brokers=xxx --topic=xxx --group=xxx --trans2sql=true
broker 为 CKafka 的访问地址,topic 为同步任务中设置的 topic 名称,如果是多 topic 需要分别消费。这两个可通过 数据同步>操作>查看 获取。
group为消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
trans2sql 表示是否转换为 SQL 语句。
4. 观察消费情况。



Python3 Demo 使用说明

编译运行环境:安装 Python3,pip3(用于依赖包安装)。 使用 pip3 安装依赖包:
pip install flag
pip install kafka-python
pip install avro
操作步骤:
1. 下载 Python3 Demo ,然后解压该文件。
2. 运行 python main.py --brokers=xxx --topic=xxx --group=xxx --trans2sql=1
broker 为 CKafka 的访问地址,topic 为同步任务中设置的 topic 名称,如果是多 topic 需要分别消费。这两个可通过数据同步 > 操作 > 查看获取。
group为消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
trans2sql 表示是否转换为 SQL 语句。
3. 观察消费情况。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈