trans2canal
parameter in the demo startup parameters. Currently, this feature is supported only in Java.mvn clean package
.target
and run the following code:
java -jar consumerDemo-json-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx --trans2sql --trans2canal
broker
is the private network access address for data subscription to Kafka, and topic
is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.group
, user
, and password
are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.trans2sql
indicates whether to enable conversion to SQL statement. In Java code, if this parameter is carried, the conversion will be enabled.trans2canal
indicates whether to print the data in Canal format. If this parameter is carried, the conversion will be enabled.go build -o subscribe ./main/main.go
to generate the executable file subscribe
../subscribe --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=true
.broker
is the private network access address for data subscription to Kafka, and topic
is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.group
, user
, and password
are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.trans2sql
indicates whether to enable conversion to SQL statement.pip3
to install the dependency package:pip install flagpip install kafka-python
python main.py --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=1
.broker
is the private network access address for data subscription to Kafka, and topic
is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.group
, user
, and password
are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.trans2sql
indicates whether to enable conversion to SQL statement.Record
definition file.
In the demo for Java, the path of the definition file is consumerDemo-json-java\\src\\main\\java\\json\\FlatRecord.java
.Field in Record | Description |
id | The globally incremental ID. |
version | The protocol version, which is v1 currently. |
messageType | The message type. Enumerated values: "INSERT", "UPDATE", "DELETE", "DDL", "BEGIN", "COMMIT", "HEARTBEAT", "CHECKPOINT". |
fileName | The name of the binlog file where the current record is located. |
position | The end offset of the current record in the binlog in the format of End_log_pos@binlog file number . For example, if the current record is in file mysql-bin.000004 and the end offset is 2196, then the value of this parameter will be 2196@4 . |
safePosition | The start offset of the current transaction in the binlog, which is in the same format as described above. |
timestamp | The time when the data was written to the binlog, which is a UNIX timestamp in seconds. |
gtid | The current GTID, such as c7c98333-6006-11ed-bfc9-b8cef6e1a231:9. |
transactionId | The transaction ID, which is generated only for COMMIT events. |
serverId | The server ID of the source database, which can be viewed by running SHOW VARIABLES LIKE 'server_id' . |
threadId | The ID of the session that committed the current transaction, which can be viewed by running SHOW processlist; . |
sourceType | The source database type, which currently can only be MySQL. |
sourceVersion | The source database version, which can be viewed by running:
select version(); . |
schemaName | Database name. |
tableName | Table name. |
objectName | Format: Database name.table name. |
columns | The definitions of columns in the table. |
oldColumns | The data of the row before DML execution. If the message is an INSERT message, the array will be null. |
newColumns | The data of the row after DML execution. If the message is a DELETE message, the array will be null. |
sql | The DDL SQL statement. |
executionTime | The DDL execution duration in seconds. |
heartbeatTimestamp | The timestamp of the heartbeat message in seconds, which is present only for heartbeat messages. |
syncedGtid | The collection of GTIDs parsed by DTS in the format of c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13 . |
fakeGtid | Whether the current GTID is forged. If gtid_mode is not enabled, DTS will forge a GTID. |
pkNames | If the table in the source database has a primary key, this parameter will be carried in the DML message; otherwise, it will not be carried. |
readerTimestamp | The time when DTS processed the current data record, which is a UNIX timestamp in milliseconds. |
tags | |
total | The total number of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension. |
index | The index of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension. |
varchar
are all converted to UTF-8 encoding.yyyy-dd-mm hh:MM:ss.milli
.binary
and blob
are output as strings equal to their hex values, such as "0xfff".
Was this page helpful?