tencent cloud

Data Transfer Service

Release Notes and Announcements
Release Notes
Announcements
Product Introduction
Overview
Data Migration
Data Sync
Data Subscription (Kafka Edition)
Strengths
Supported Regions
Specification Description
Purchase Guide
Billing Overview
Configuration Change Description
Payment Overdue
Refund
Getting Started
Data Migration Guide
Data Sync Guide
Data Subscription Guide (Kafka Edition)
Preparations
Business Evaluation
Network Preparation
Adding DTS IP Addresses to the Allowlist of the Corresponding Databases
DTS Service Permission Preparation
Database and Permission Preparation
Configuring Binlog in Self-Built MySQL
Data Migration
Databases Supported by Data Migration
Cross-Account TencentDB Instance Migration
Migration to MySQL Series
Migrating to PostgreSQL
Migrating to MongoDB
Migrating to SQL Server
Migrating to Tencent Cloud Distributed Cache
Task Management
Data Sync
Databases Supported by Data Sync
Cross-Account TencentDB Instance Sync
Sync to MySQL series
Synchronize to PostgreSQL
Synchronization to MongoDB
Synchronize to Kafka
Task Management
Data Subscription (Kafka Edition)
Databases Supported by Data Subscription
MySQL series Data Subscription
Data Subscription for TDSQL PostgreSQL
MongoDB Data Subscription
Task Management
Consumption Management
Fix for Verification Failure
Check Item Overview
Cutover Description
Monitoring and Alarms
Supported Monitoring Indicators
Supported Events
Configuring Metric Alarms and Event Alarms via the Console
Configuring Indicator Monitoring and Event Alarm by APIs
Ops Management
Configuring Maintenance Time
Task Status Change Description
Practical Tutorial
Synchronizing Local Database to the Cloud
Creating Two-Way Sync Data Structure
Creating Many-to-One Sync Data Structure
Creating Multi-Site Active-Active IDC Architecture
Selecting Data Sync Conflict Resolution Policy
Using CLB as Proxy for Cross-Account Database Migration
Migrating Self-Built Databases to Tencent Cloud Databases via CCN
Best Practices for DTS Performance Tuning
FAQs
Data Migration
Data Sync
FAQs for Data Subscription Kafka Edition
Regular Expressions for Subscription
Error Handling
Common Errors
Failed Connectivity Test
Failed or Alarmed Check Item
Inability to Select Subnet During CCN Access
Slow or Stuck Migration
Data Sync Delay
High Data Subscription Delay
Data Consumption Exception
API Documentation
History
Introduction
API Category
Making API Requests
(NewDTS) Data Migration APIs
Data Sync APIs
Data Consistency Check APIs
(NewDTS) Data Subscription APIs
Data Types
Error Codes
DTS API 2018-03-30
Service Agreement
Service Level Agreements

ProtoBuf Demo Description (Flink)

PDF
Mode fokus
Ukuran font
Terakhir diperbarui: 2024-07-08 16:52:02

Key Logic Description

Message production logic

This section describes the message production logic to help you better understand the consumption logic. The demo uses Protobuf for serialization and contains a Protobuf definition file. In the file, three key structures are defined as follows: Envelope is the final Kafka message structure; Entry is the structure of a single subscription event; Entries is the collection of Entry. Their relationship is shown below:



The production process is as follows:
1. Pull binlog messages and encode each binlog event into an Entry.
message Entry { // An `Entry` is the structure of an individual subscription event. An event is similar to a binlog event in MySQL.
Header header = 1; // The event header
Event event = 2; // The event body
}


message Header {
int32 version = 1; // The protocol version of the `Entry`
SourceType sourceType = 2; // The source database type, such as MySQL and Oracle
MessageType messageType = 3; // The message type, i.e., event type, such as BEGIN, COMMIT, and DML
uint32 timestamp = 4; // The event timestamp in the source binlog
int64 serverId = 5; // The `serverId` of the source database
string fileName = 6; // The filename of the source binlog
uint64 position = 7; // The event offset in the source binlog file
string gtid = 8; // The GTID of the current transaction
string schemaName = 9; // The modified schema
string tableName = 10; // The modified table
uint64 seqId = 11; // The globally incremental serial number
uint64 eventIndex = 12; // If a large event is sharded, the shard number starts from 0. This parameter is meaningless on the current version and is reserved for future use.
bool isLast = 13; // Whether the current shard is the last shard of a sharded event; if so, the value is `true`. This parameter is meaningless on the current version and is reserved for future use.
repeated KVPair properties = 15;
}


message Event {
BeginEvent beginEvent = 1; // The BIGIN event in the binlog
DMLEvent dmlEvent = 2; // The DML event in the binlog
CommitEvent commitEvent = 3; // The COMMIT event in the binlog
DDLEvent ddlEvent = 4; // The DDL event in the binlog
RollbackEvent rollbackEvent = 5; // The rollback event. This parameter is meaningless on the current version.
HeartbeatEvent heartbeatEvent = 6; // The heartbeat event regularly sent by the source database
CheckpointEvent checkpointEvent = 7; // The checkpoint event added to the subscription backend, which is generated automatically once every 10 seconds and is used for Kafka production and consumption offset management.
repeated KVPair properties = 15;
}
2. Multiple Entry structures are merged to reduce the number of messages, and the structure of binlog events becomes Entries after the merge. The Entries.items field refers to the Entry sequence list. The reasonable number of merged Entry structures should be smaller than that of a single Kafka message. If a single binlog event has exceeded the size limit, Entry structures will not be merged anymore, so there will be only one Entry in the Entries structure.
message Entries {
repeated Entry items = 1; // `Entry` list
}
3. Encode Entries with Protobuf to generate a binary sequence.
4. Put the binary sequence in the data field of an Envelope. If a single binlog event is oversize, the binary sequence may exceed the size limit of a single Kafka message. In this case, you can separate the binary sequence into multiple segments and put each segment in an Envelope.
message Envelope {
int32 version = 1; // The protocol version, which determines how the data content is decoded.
uint32 total = 2;
uint32 index = 3;
bytes data = 4; // Here, `version` is 1, indicating that the data is `Entries` serialized in the Protobuf format.
repeated KVPair properties = 15;
}
5. Encode one or multiple Envelope structures generated in the previous step in sequence and deliver the Envelope structures to Kafka partitions. Multiple Envelope structures in the same Entries are delivered to the same partition in sequence.

Message consumption logic

This section describes the message consumption logic.
1. To use Flink to consume messages, you need to create a FlinkKafkaConsumer, specify a consumption topic, and customize a message deserializer based on the Protobuf protocol.
// Create a FlinkKafkaConsumer
FlinkKafkaConsumer<RecordMsgObject> consumer =
new FlinkKafkaConsumer<>(topic, new DeserializeProtobufToRecordMsgObject(), props);
2. Run DeserializeProtobufToRecordMsgObject to deserialize the original message to a RecordMsgObject object.

// Customize a message deserializer to deserialize the original message to a `RecordMsgObject` object
@Override
public RecordMsgObject deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
RecordMsgObject obj = new RecordMsgObject();
obj.topic = record.topic();
obj.partition = record.partition();
obj.offset = record.offset();
obj.partitionSeq = getPartitionSeq(record);
obj.key = new String(record.key());
obj.headers = record.headers();
// Here the binary value of `Envelope` will be received
obj.value = record.value();
return obj;
}
3. Group the received messages by partition with the grouping logic implemented by SubscribeMsgProcess
//Put the received messages in different partitions
stream.keyBy(RecordMsgObject::getPartition)
.process(new SubscribeMsgProcess(trans2sql)).setParallelism(1);
4. Use Protobuf to decode the binary sequence received in SubscribeMsgProcess into Envelope.
// Obtain an `Envlope` by deserialization. In this demo, only one `Envlope` can store the data of the whole binlog event by default.
SubscribeProtobufData.Envelope envelope = SubscribeProtobufData.Envelope.parseFrom(record.value);
if (1 != envelope.getVersion()) {
throw new IllegalStateException(String.format("unsupported version: %d", envelope.getVersion()));
}
Note
In this demo, the size of a single binlog event is not greater than that of a single Kafka message by default. When you use this demo for consumption, if the binlog size exceeds the Kafka message size, the split Envelope must be concatenated with the Flink's advanced feature "state processor API" so that the message body is complete. You need to handle this based on your business scenario. For more information, see Flink Documentation.
5. Use Protobuf to decode the binary sequence in the data field of the received Envelope into Entries.
// Deserialize `Entries`
ByteString envelopeData = envelope.getData();
SubscribeProtobufData.Entries entries;
if (1 == envelope.getTotal()) {
entries = SubscribeProtobufData.Entries.parseFrom(envelopeData.toByteArray());
} else {
entries = SubscribeProtobufData.Entries.parseFrom(shardMsgMap.get(shardId).toByteArray());
shardMsgMap.remove(shardId);
}
6. Process Entries.items in sequence, and print the original Entry structure or convert it into a SQL statement.
// Traverse each `Entry` and print the SQL statement based on the type of `Entry`
for (SubscribeProtobufData.Entry entry : entries.getItemsList()) {
onEntry(record.partition, record.offset, ps, entry, trans2sql);
}

Table API & Flink SQL

This demo only demonstrates the Flink client mode where DataStream API is used and doesn't apply to scenarios where Table API & Flink SQL is used. There are two ways to use the Table API & Flink SQL client mode
1. Convert DataStream into Table. For more information, see DataStream API Integration.
2. Customize a connector based on Table API & Flink SQL. For more information, see User-defined Sources & Sinks.

Database Field Mapping and Storage

This section describes the mappings between database field types and data types defined in the Protobuf protocol. A field value in the source database is structured as follows in the Protobuf protocol.
message Data {
DataType dataType = 1;
string charset = 2; // The encoding (string) type of DataType_STRING, with the value stored in `bv`
string sv = 3; // The string value of DataType_INT8/16/32/64/UINT8/16/32/64/Float32/64/DataType_DECIMAL
bytes bv = 4; // The value of DataType_STRING/DataType_BYTES
}
The field DataType refers to the type of stored fields. The values are as enumerated below:
enum DataType {
NIL = 0; // The value is `NULL`
INT8 = 1;
INT16 = 2;
INT32 = 3;
INT64 = 4;
UINT8 = 5;
UINT16 = 6;
UINT32 = 7;
UINT64 = 8;
FLOAT32 = 9;
FLOAT64 = 10;
BYTES = 11;
DECIMAL = 12;
STRING = 13;
NA = 14; // The value does not exist (N/A).
}
The bv field stores the binary representation of STRING and BYTES; the sv field stores the string representation of INT8/16/32/64/UINT8/16/32/64/DECIMAL; the charset field stores the encoding type of STRING.
Mapping between the TDSQL for MySQL original type and DataType is as shown below (the MYSQL_TYPE_INT8/16/24/32/64 modified by UNSIGNED is respectively mapped to UINT8/16/32/32/64):
Note
DATE, TIME, and DATETIME types don't support time zone.
The TIMESTAMP type supports time zone. Fields of this type will have their current time zone converted to Universal Time Coordinated (UTC) for storage, and vice versa for query.
The MYSQL_TYPE_TIMESTAMP and MYSQL_TYPE_TIMESTAMP_NEW fields carry the time zone information, which you can convert on your own when consuming data. For example, the format of the time data output by DTS is a string with time zone, such as 2021-05-17 07:22:42 +00:00, where +00:00 indicates the UTC time. You need to take into account the time zone information when parsing and converting the data.
TDSQL for MySQL Field Type
Protobuf DataType Value
MYSQL_TYPE_NULL
NIL
MYSQL_TYPE_INT8
INT8
MYSQL_TYPE_INT16
INT16
MYSQL_TYPE_INT24
INT32
MYSQL_TYPE_INT32
INT32
MYSQL_TYPE_INT64
INT64
MYSQL_TYPE_BIT
INT64
MYSQL_TYPE_YEAR
INT64
MYSQL_TYPE_FLOAT
FLOAT32
MYSQL_TYPE_DOUBLE
FLOAT64
MYSQL_TYPE_VARCHAR
STRING
MYSQL_TYPE_STRING
STRING
MYSQL_TYPE_VAR_STRING
STRING
MYSQL_TYPE_TIMESTAMP
STRING
MYSQL_TYPE_DATE
STRING
MYSQL_TYPE_TIME
STRING
MYSQL_TYPE_DATETIME
STRING
MYSQL_TYPE_TIMESTAMP_NEW
STRING
MYSQL_TYPE_DATE_NEW
STRING
MYSQL_TYPE_TIME_NEW
STRING
MYSQL_TYPE_DATETIME_NEW
STRING
MYSQL_TYPE_ENUM
STRING
MYSQL_TYPE_SET
STRING
MYSQL_TYPE_DECIMAL
DECIMAL
MYSQL_TYPE_DECIMAL_NEW
DECIMAL
MYSQL_TYPE_JSON
BYTES
MYSQL_TYPE_BLOB
BYTES
MYSQL_TYPE_TINY_BLOB
BYTES
MYSQL_TYPE_MEDIUM_BLOB
BYTES
MYSQL_TYPE_LONG_BLOB
BYTES
MYSQL_TYPE_GEOMETRY
BYTES


Bantuan dan Dukungan

Apakah halaman ini membantu?

masukan