tencent cloud

数据传输服务

动态与公告
产品动态
公告
产品简介
产品概述
数据迁移功能描述
数据同步功能描述
数据订阅(Kafka 版)功能描述
产品优势
支持的地域
规格说明
购买指南
计费概述
变更配置说明
欠费说明
退费说明
快速入门
数据迁移操作指导
数据同步操作指导
数据订阅操作指导(Kafka 版)
准备工作
业务评估
网络准备
添加 DTS IP 地址至对接数据库白名单
DTS 服务权限准备
数据库及权限准备
配置自建 MySQL 系的 Binlog
数据迁移
数据迁移支持的数据库
云数据库跨账号实例间迁移
迁移至 MySQL 系列
迁移至 PostgreSQL
迁移至 MongoDB
迁移至 SQL Server
迁移至腾讯云分布式缓存数据库
任务管理
数据同步
数据同步支持的数据库
云数据库跨账号实例间同步
同步至 MySQL 系列
同步至 PostgreSQL
同步至 MongoDB
同步至 Kafka
任务管理
数据订阅(Kafka 版)
数据订阅支持的数据库
MySQL 系列数据订阅
TDSQL PostgreSQL 数据订阅
MongoDB 数据订阅
任务管理
消费管理
前置校验不通过处理方法
检查项汇总
割接说明
监控与告警
支持的监控指标
告警通知功能
通过控制台配置指标告警和事件告警
通过 API 配置指标告警和事件告警
运维管理
配置系统维护时间
任务状态扭转说明
实践教程
本地数据库同步上云
构建双向同步数据结构
构建多对一同步数据结构
构建多活数据中心
数据同步冲突策略如何选择
使用 CLB 代理将其他账号下的数据库迁移至本账号下
通过云联网方式迁移自建数据库至腾讯云数据库
DTS 性能调优最佳实践
常见问题
数据迁移
数据同步
数据订阅 Kafka 版常见问题
数据订阅正则表达式
错误处理
常见错误处理
连通性测试不通过
校验项结果不通过或者出现警告
云联网接入配置源数据时无法选择子网
迁移慢或者进度卡住
数据同步有延时
数据订阅延迟过高
数据消费异常
API 文档
History
Introduction
API Category
Making API Requests
(NewDTS) Data Migration APIs
Data Sync APIs
Data Consistency Check APIs
(NewDTS) Data Subscription APIs
Data Types
Error Codes
DTS API 2018-03-30
相关协议
服务等级协议

ProtoBuf Demo 说明

PDF
聚焦模式
字号
最后更新时间: 2023-11-21 20:36:25

Demo 关键逻辑讲解

消息生产逻辑

下文首先对消息生产逻辑进行简要说明,有助于用户理解消费逻辑。 我们采用 Protobuf 进行序列化,各语言 Demo 中均附带有 Protobuf 定义文件。文件中定义了几个关键结构:Envelope 是最终发送的 Kafka 消息结构;Entry 是单个订阅事件结构;Entries 是 Entry 的集合。主要数据结构关系如下所示:



生产过程如下:
1. 拉取 Binlog 消息,将每个 Binlog Event 编码为一个 Entry 结构体。
message Entry { //Entry 是单个订阅事件结构,一个事件相当于 MySQL 的一个 binlog event
Header header = 1; //事件头
Event event = 2; //事件体
}


message Header {
int32 version = 1; //Entry 协议版本
SourceType sourceType = 2; //源库的类型信息,包括 MySQL,Oracle 等类型
MessageType messageType = 3; //消息的类型,也就是 Event 的类型,包括 BEGIN、COMMIT、DML 等
uint32 timestamp = 4; //Event 在原始 binlog 中的时间戳
int64 serverId = 5; //源的 serverId
string fileName = 6; //源 binlog 的文件名称
uint64 position = 7; //事件在源 binlog 文件中的偏移量
string gtid = 8; //当前事务的 gtid
string schemaName = 9; //变更影响的 schema
string tableName = 10; //变更影响的 table
uint64 seqId = 11; //全局递增序列号
uint64 eventIndex = 12; //如果大的 event 分片,每个分片从0开始编号,当前版本无意义,留待后续扩展用
bool isLast = 13; //当前 event 是否 event 分片的最后一块,是则为 true,当前版本无意义,留待后续扩展用
repeated KVPair properties = 15;
}


message Event {
BeginEvent beginEvent = 1; //binlog 中的 begin 事件
DMLEvent dmlEvent = 2; //binlog 中的 dml 事件
CommitEvent commitEvent = 3; //binlog 中的 commit 事件
DDLEvent ddlEvent = 4; //binlog 中的 ddl 事件
RollbackEvent rollbackEvent = 5; //rollback 事件,当前版本无意义
HeartbeatEvent heartbeatEvent = 6; //源库定时发送的心跳事件
CheckpointEvent checkpointEvent = 7; //订阅后台添加的 checkpoint 事件,每10秒自动生成一个,用于 Kafka 生产和消费位点管理
repeated KVPair properties = 15;
}
2. 为减少消息量,将多个 Entry 合并,合并后的结构为 Entries,Entries.items 字段即为 Entry 顺序列表。合并的数量以合并后不超过 Kafka 单个消息大小限制为标准。对单个 Event 就已超过大小限制的,则不再合并,Entries 中只有唯一 Entry 。
message Entries {
repeated Entry items = 1; //entry list
}
3. 对 Entries 进行 Protobuf 编码得到二进制序列。
4. 将 Entries 的二进制序列放入 Envelope 的 data 字段。当存在单个 Binlog Event 过大时,二进制序列可能超过 Kafka 单个消息大小限制,此时我们会将其分割为多段,每段装入一个 Envelope。 Envelope.total 和 Envelope.index 分别记录总段数和当前 Envelope 的序号(从0开始)。
message Envelope {
int32 version = 1; //protocol version, 决定了 data 内容如何解码
uint32 total = 2;
uint32 index = 3;
bytes data = 4; //当前 version 为1, 表示 data 中数据为 Entries 被 PB 序列化之后的结果
repeated KVPair properties = 15;
}
5. 对上一步生成的一个或多个 Envelope 依次进行 Protobuf 编码,然后投递到 Kafka 分区。同一个 Entries 分割后的多个 Envelope 顺序投递到同一个分区。

消息消费逻辑

下文对消费逻辑进行简要说明。我们提供的三种语言的 Demo 均遵循相同的流程。
1. 创建 Kafka 消费者。
2. 启动消费。
3. 依次消费原始消息,并根据消息中的分区找到分区对应的 partitionMsgConsumer 对象,由该对象对消息进行处理。
4. partitionMsgConsumer 将原始消息反序列化为 Envelope 结构。
// 将 Kafka 消息的 Value 值转换为 Envelope
envelope := subscribe.Envelope{}
err := proto.Unmarshal(msg.Value, &envelope)
5. partitionMsgConsumer 根据 Envelope 中记录的 index 和 total 连续消费一条或者多条消息,直到 Envlope.index 等于 Envelope.total-1(参见上面消费生产逻辑,表示收到了一个完整的 Entries )。
6. 将收到的连续多条 Envelope 的 data 字段顺序组合到一起。将组合后的二进制序列用 Protobuf 解码为 Entries 。
if envelope.Index == 0 {
pmc.completeMsg = envelope
} else {
// 对进行过拆分的 Entries 二进制序列做拼接
pmc.completeMsg.Data = append(pmc.completeMsg.Data, envelope.Data...)
}
if envelope.Index < envelope.Total-1 {
return nil
}
// 将 Envelope.Data 反序列化为 Entries
entries := subscribe.Entries{}
err = proto.Unmarshal(pmc.completeMsg.Data, &entries)

7. 对 Entries.items 依次处理,打印原始 Entry 结构或者转化为 SQL 语句。
8. 当消费到 Checkpoint 消息时,做一次 Kafka 位点提交。Checkpoint 消息是订阅后台定时写入 Kafka 的特殊消息,每10秒一个。

数据库字段映射和存储

本节介绍数据库字段类型和序列化协议中定义的数据类型之间的映射关系。 源数据库(如MySQL)字段值在 Protobuf 协议中用如下所示的 Data 结构来存储。
message Data {
DataType dataType = 1;
string charset = 2; //DataType_STRING 的编码类型, 值存储在 bv 里面
string sv = 3; //DataType_INT8/16/32/64/UINT8/16/32/64/Float32/64/DataType_DECIMAL 的字符串值
bytes bv = 4; //DataType_STRING/DataType_BYTES 的值
}
其中 DataType 字段代表存储的字段类型,可取枚举值如下图所示。
enum DataType {
NIL = 0; //值为 NULL
INT8 = 1;
INT16 = 2;
INT32 = 3;
INT64 = 4;
UINT8 = 5;
UINT16 = 6;
UINT32 = 7;
UINT64 = 8;
FLOAT32 = 9;
FLOAT64 = 10;
BYTES = 11;
DECIMAL = 12;
STRING = 13;
NA = 14; //值不存在 (N/A)
}
其中 bv 字段存储 STRING 和 BYTES 类型的二进制表示,sv 字段存储 INT8/16/32/64/UINT8/16/32/64/DECIMAL 类型的字符串表示,charset 字段存储 STRING 的编码类型。
MySQL/TDSQL 原始类型与 DataType 映射关系如下(对 UNSIGNED 修饰的 MYSQL_TYPE_INT8/16/24/32/64 分别映射为 UINT8/16/32/32/64):
说明
DATETIMEDATETIME 类型不支持时区。
TIMESTAMP 类型支持时区,该类型字段表示:存储时,系统会从当前时区转换为 UTC(Universal Time Coordinated)进行存储;查询时,系统会从 UTC 转换为当前时区进行查询。
综上,如下表中 "MYSQL_TYPE_TIMESTAMP" 和 "MYSQL_TYPE_TIMESTAMP_NEW" 字段会携带时区信息,用户在消费数据时可自行转换。(例如,DTS 输出的时间格式是带时区的字符串"2021-05-17 07:22:42 +00:00",其中,"+00:00"表示 UTC 时间,用户在解析和转换的时候需要考虑时区信息。)
MySQL 字段类型(TDSQL 支持与 MySQL 相同的类型)
对应的 Protobuf DataType 枚举值
MYSQL_TYPE_NULL
NIL
MYSQL_TYPE_INT8
INT8
MYSQL_TYPE_INT16
INT16
MYSQL_TYPE_INT24
INT32
MYSQL_TYPE_INT32
INT32
MYSQL_TYPE_INT64
INT64
MYSQL_TYPE_BIT
INT64
MYSQL_TYPE_YEAR
INT64
MYSQL_TYPE_FLOAT
FLOAT32
MYSQL_TYPE_DOUBLE
FLOAT64
MYSQL_TYPE_VARCHAR
STRING
MYSQL_TYPE_STRING
STRING
MYSQL_TYPE_VAR_STRING
STRING
MYSQL_TYPE_TIMESTAMP
STRING
MYSQL_TYPE_DATE
STRING
MYSQL_TYPE_TIME
STRING
MYSQL_TYPE_DATETIME
STRING
MYSQL_TYPE_TIMESTAMP_NEW
STRING
MYSQL_TYPE_DATE_NEW
STRING
MYSQL_TYPE_TIME_NEW
STRING
MYSQL_TYPE_DATETIME_NEW
STRING
MYSQL_TYPE_ENUM
STRING
MYSQL_TYPE_SET
STRING
MYSQL_TYPE_DECIMAL
DECIMAL
MYSQL_TYPE_DECIMAL_NEW
DECIMAL
MYSQL_TYPE_JSON
BYTES
MYSQL_TYPE_BLOB
BYTES
MYSQL_TYPE_TINY_BLOB
BYTES
MYSQL_TYPE_MEDIUM_BLOB
BYTES
MYSQL_TYPE_LONG_BLOB
BYTES
MYSQL_TYPE_GEOMETRY
BYTES



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈