tencent cloud

数据传输服务

动态与公告
产品动态
公告
产品简介
产品概述
数据迁移功能描述
数据同步功能描述
数据订阅(Kafka 版)功能描述
产品优势
支持的地域
规格说明
购买指南
计费概述
变更配置说明
欠费说明
退费说明
快速入门
数据迁移操作指导
数据同步操作指导
数据订阅操作指导(Kafka 版)
准备工作
业务评估
网络准备
添加 DTS IP 地址至对接数据库白名单
DTS 服务权限准备
数据库及权限准备
配置自建 MySQL 系的 Binlog
数据迁移
数据迁移支持的数据库
云数据库跨账号实例间迁移
迁移至 MySQL 系列
迁移至 PostgreSQL
迁移至 MongoDB
迁移至 SQL Server
迁移至腾讯云分布式缓存数据库
任务管理
数据同步
数据同步支持的数据库
云数据库跨账号实例间同步
同步至 MySQL 系列
同步至 PostgreSQL
同步至 MongoDB
同步至 Kafka
任务管理
数据订阅(Kafka 版)
数据订阅支持的数据库
MySQL 系列数据订阅
TDSQL PostgreSQL 数据订阅
MongoDB 数据订阅
任务管理
消费管理
前置校验不通过处理方法
检查项汇总
割接说明
监控与告警
支持的监控指标
告警通知功能
通过控制台配置指标告警和事件告警
通过 API 配置指标告警和事件告警
运维管理
配置系统维护时间
任务状态扭转说明
实践教程
本地数据库同步上云
构建双向同步数据结构
构建多对一同步数据结构
构建多活数据中心
数据同步冲突策略如何选择
使用 CLB 代理将其他账号下的数据库迁移至本账号下
通过云联网方式迁移自建数据库至腾讯云数据库
DTS 性能调优最佳实践
常见问题
数据迁移
数据同步
数据订阅 Kafka 版常见问题
数据订阅正则表达式
错误处理
常见错误处理
连通性测试不通过
校验项结果不通过或者出现警告
云联网接入配置源数据时无法选择子网
迁移慢或者进度卡住
数据同步有延时
数据订阅延迟过高
数据消费异常
API 文档
History
Introduction
API Category
Making API Requests
(NewDTS) Data Migration APIs
Data Sync APIs
Data Consistency Check APIs
(NewDTS) Data Subscription APIs
Data Types
Error Codes
DTS API 2018-03-30
相关协议
服务等级协议

Avro Demo 说明

PDF
聚焦模式
字号
最后更新时间: 2025-09-28 10:42:09

Demo 关键逻辑讲解

Demo 中的文件说明如下,以 Java Demo 为例进行介绍。
consumerDemo-avro-java\\src\\main\\resources\\avro-tools-1.8.2.jar:用来生成 Avro 协议相关代码的工具。
consumerDemo-avro-java\\src\\main\\java\\com\\tencent\\subscribe\\avro:Avro 工具生成代码的目录。
consumerDemo-avro-java\\src\\main\\resources\\Record.avsc:协议定义文件。
Record.avsc 中我们定义了14个结构(Avro 中叫做 schema),其中主要的数据结构为 Record,用于表示 binlog 中的一条数据,Record 的结构如下,其他数据结构可以在 Record.avsc 中查看:
{
"namespace": "com.tencent.subscribe.avro", //Record.avsc 中的最后1个 schema,"name" 显示为 "Record"
"type": "record",
"name": "Record", //"name" 显示为 "Record",表示从 kafka 中消费的数据格式
"fields": [
{
"name": "id", //id 表示全局递增 ID,更多 record 取值解释如下表
"type": "long",
"doc": "unique id of this record in the whole stream"
},
{
"name": "version", //version 表示协议版本
"type": "int",
"doc": "protocol version"
},
{
"name": "messageType", //消息类型
"aliases": [
"operation"
],
"type": {
"namespace": "com.tencent.subscribe.avro",
"name": "MessageType",
"type": "enum",
"symbols": [
"INSERT",
"UPDATE",
"DELETE",
"DDL",
"BEGIN",
"COMMIT",
"HEARTBEAT",
"CHECKPOINT",
"ROLLBACK"          
"STATEMENT",         
 "INIT_DDL",          
  "INIT_INSERT"
]
}
},
{
……
},
}
Record 中的字段类型解释如下:
Record 中的字段名称
说明
id
全局递增 ID。
version
协议版本,当前版本为1。
messageType
消息类型,枚举值:"INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT","STATEMENT","INIT_DDL","INIT_INSERT"。
其中,"INIT_DDL"表示全量数据的 DDL,"DDL"表示增量数据的 DDL,"INIT_INSERT"表示全量数据的 INSERT,"INSERT"表示增量数据的 INSERT。
fileName
当前 record 所在的 binlog 文件名。
position
当前 record 的在 binlog 中结束的偏移量,格式为 End_log_pos@binlog 文件编号。例如,当前 record 位于文件 mysql-bin.000004 中,结束偏移量为2196,则其值为"2196@4"。
safePosition
当前事务在 binlog 中开始的偏移量,格式同上。
timestamp
写 binlog 的时间,unix 时间戳,秒级。
binlog 记录的事务中对应 event header 里面的 timestamp,源端长事务操作可能会导致 timestamp 与 readerTimestamp 有时间差,这种属于正常情况。
gtid
当前的 gtid,如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:9。
transactionId
事务 ID,只有 commit 事件才会生成事务 ID。
serverId
源库 serverId,查看源库 server_id 参考 SHOW VARIABLES LIKE 'server_id'。
threadId
提交当前事务的会话 ID,参考 SHOW processlist;。
sourceType
源库的数据库类型,当前版本只有 MySQL。
sourceVersion
源库版本,查看源库版本参考select version();
schemaName
库名。
tableName
表名。
objectName
格式为:库名.表名。
columns
表中各列的定义。
oldColumns
DML 执行前该行的数据,如果是 insert 消息,该数组为 null。数组中元素共有12种类型:Integer,Character,Decimal,Float,Timestamp,DateTime,TimestampWithTimeZone,BinaryGeometry,TextGeometry,BinaryObject,TextObject,EmptyObject,详见 demo 中定义。
newColumns
DML 执行后该行的数据,如果是 delete 消息,该数组为 null。数组中元素共有12种类型:Integer,Character,Decimal,Float,Timestamp,DateTime,TimestampWithTimeZone,BinaryGeometry,TextGeometry,BinaryObject,TextObject,EmptyObject,详见 demo 中定义。
sql
DDL 的 SQL 语句。
executionTime
DDL 执行时长,单位为秒。
heartbeatTimestamp
心跳消息的时间戳,秒级。只有 heartbeat 消息才有该字段。
syncedGtid
DTS 已解析 GTID 集合,格式形如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13。
fakeGtid
是否为构造的假 GTID,如未开启 gtid_mode,则 DTS 会构造一个 GTID。
pkNames
如果源库的表设有主键,则 DML 消息中会携带该参数,否则不会携带。
readerTimestamp
DTS 处理这条数据的时间,unix 时间戳,单位为毫秒数。
tags
QueryEvent 中的 status_vars,详细参考 QueryEvent
total
如果消息分片,记录分片总数。当前版本 (version = 1) 无意义,预留扩展。
index
如果消息分片,记录当前分片的索引。当前版本 (version = 1) 无意义,预留扩展。
Record 中描述列属性的字段为 "Field",包含如下四个属性:
name:列名。
dataTypeNumber:是 binlog 中记录的数据类型。取值详见 MySQL
isKey:是否主键。
originalType:DDL 中定义的数据类型。

数据库字段映射关系

对于时间类型,转换逻辑如下。
datetime:如果源库有精度,DTS 对源库全量及增量数据的精度解析与源库保持一致。如果源库没有精度,DTS 解析的精度为0。
示例:源库 INSERT 数据 datetime 值为 2024-10-24 12:34:56.123456,消费到的数据为 2024-10-24 12:34:56.123456
time:DTS 解析的精度一定大于等于源端精度,必要时会补0~6位精度。
timestamp:DTS 对源库全量及增量数据的精度解析为毫秒级,即3位精度,对于 timestamp(4)/timestamp(5)/timestamp(6),会丢失毫秒之后的精度。
示例:源库 INSERT 数据 timestamp 值为 2024-10-24 12:34:56.123456,消费到的数据为 2024-10-24 12:34:56.123
说明:
建议用户在消费数据时,不必关注源库的精度,消费程序中对时间类型的字段解析0~6位精度的格式都进行兼容即可。
如下为数据库(如 MySQL)字段类型和 Avro 协议中定义的数据类型之间的映射关系。
MySQL 类型
对应 Avro 中的类型
MYSQL_TYPE_NULL
EmptyObject
MYSQL_TYPE_INT8
Integer
MYSQL_TYPE_INT16
Integer
MYSQL_TYPE_INT24
Integer
MYSQL_TYPE_INT32
Integer
MYSQL_TYPE_INT64
Integer
MYSQL_TYPE_BIT
Integer
MYSQL_TYPE_YEAR
DateTime
MYSQL_TYPE_FLOAT
Float
MYSQL_TYPE_DOUBLE
Float
MYSQL_TYPE_VARCHAR
Character
MYSQL_TYPE_STRING
Character,如果原类型为 binary,则对应 BinaryObject
MYSQL_TYPE_VAR_STRING
Character,如果原类型为 varbinary,则对应 BinaryObject
MYSQL_TYPE_TIMESTAMP
Timestamp
MYSQL_TYPE_DATE
DateTime
MYSQL_TYPE_TIME
DateTime
MYSQL_TYPE_DATETIME
DateTime
MYSQL_TYPE_TIMESTAMP_NEW
Timestamp
MYSQL_TYPE_DATE_NEW
DateTime
MYSQL_TYPE_TIME_NEW
DateTime
MYSQL_TYPE_DATETIME_NEW
DateTime
MYSQL_TYPE_ENUM
TextObject
MYSQL_TYPE_SET
TextObject
MYSQL_TYPE_DECIMAL
Decimal
MYSQL_TYPE_DECIMAL_NEW
Decimal
MYSQL_TYPE_JSON
TextObject
MYSQL_TYPE_BLOB
BinaryObject
MYSQL_TYPE_TINY_BLOB
BinaryObject
MYSQL_TYPE_MEDIUM_BLOB
BinaryObject
MYSQL_TYPE_LONG_BLOB
BinaryObject
MYSQL_TYPE_GEOMETRY
BinaryObject

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈