tencent cloud

Feedback

Consuming Subscribed Data with Kafka Client (JSON)

Last updated: 2023-09-05 14:34:05
    In data subscription (Kafka Edition) where the current Kafka Server version is 2.6.0, you can consume the subscribed data through Kafka 0.11 or later available at DOWNLOAD. This document provides client consumption demos for Java, Go, and Python for you to quickly test the process of data consumption and understand the method of data parsing.
    When configuring the subscription task, you can select different formats of subscribed data, including ProtoBuf, Avro, and JSON. ProtoBuf and Avro adopt the binary format with a higher consumption efficiency, while JSON adopts the easier-to-use lightweight text format. The reference demo varies by the selected data format.
    This document provides a demo of the JSON format. The demo already contains the JSON protocol file, so you don't need to download it separately.
    Note
    Currently, data consumption over the JSON protocol is supported only for TencentDB for MySQL, TDSQL-C for MySQL, and TencentDB for MongoDB.

    Note

    The demo only prints out the consumed data and does not contain any usage instructions. You need to write your own data processing logic based on the demo. You can also use Kafka clients in other languages to consume and parse data.
    Currently, data subscription to Kafka for consumption can be implemented over the Tencent Cloud private network but not the public network. In addition, the subscribed database instance and the data consumer must be in the same region.
    The Kafka built in DTS has a certain upper limit for processing individual messages. When a single row of data in the source database exceeds 10 MB, this row may be discarded.
    If you have used or are familiar with the open-source subscription tool Canal, you can choose to convert the consumed JSON data to a Canal-compatible data format for subsequent processing. The demo already supports this feature, and you can implement it by adding the trans2canal parameter in the demo startup parameters. Currently, this feature is supported only in Java.

    Downloading Consumption Demos

    Demo Language
    TencentDB for MySQL and TDSQL-C for MySQL
    TencentDB for MongoDB
    Go
    Java
    Python

    Instructions for Java Demo

    Compiling environment: Maven and JDK8. You can choose a desired package management tool. The following takes Maven as an example. Runtime environment: Tencent Cloud CVM (which can access the private network address of the Kafka server only if it is in the same region as the subscribed instance). Install JRE 8. The steps are as follows:
    1. Create a data subscription task (NewDTS) as instructed in Creating MySQL or TDSQL for MySQL Data Subscription.
    2. Create one or multiple consumer groups. For more information, see Adding Consumer Group.
    3. Download the Java demo and decompress it.
    4. Access the decompressed directory. Maven model and pom.xml files are placed in the directory for your use as needed. Package with Maven by running mvn clean package.
    5. Run the demo After packaging the project with Maven, go to the target folder target and run the following code: java -jar consumerDemo-json-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx --trans2sql --trans2canal
    broker is the private network access address for data subscription to Kafka, and topic is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.
    group, user, and password are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.
    trans2sql indicates whether to enable conversion to SQL statement. In Java code, if this parameter is carried, the conversion will be enabled.
    trans2canal indicates whether to print the data in Canal format. If this parameter is carried, the conversion will be enabled.
    6. Observe consumption.
    

    Instructions for Go Demo

    Compiling environment: Go 1.12 or later, with the Go module environment configured. Runtime environment: Tencent Cloud CVM (which can access the private network address of the Kafka server only if it is in the same region as the subscribed instance). The steps are as follows:
    1. Create a data subscription task (NewDTS) as instructed in Creating MySQL or TDSQL for MySQL Data Subscription.
    2. Create one or multiple consumer groups. For more information, see Adding Consumer Group.
    3. Download the Go demo and decompress it.
    4. Access the decompressed directory and run go build -o subscribe ./main/main.go to generate the executable file subscribe.
    5. Run ./subscribe --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=true.
    broker is the private network access address for data subscription to Kafka, and topic is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.
    group, user, and password are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.
    trans2sql indicates whether to enable conversion to SQL statement.
    6. Observe consumption.
    

    Instructions for Python3 Demo

    Compilation and runtime environment: Tencent Cloud CVM (which can access the private network address of the Kafka server only if it is in the same region of the subscribed instance). Python3 and pip3 have been installed for dependency package installation. Use pip3 to install the dependency package:
    pip install flag
    pip install kafka-python
    The steps are as follows:
    1. Create a data subscription task (NewDTS) as instructed in Creating MySQL or TDSQL for MySQL Data Subscription.
    2. Create one or multiple consumer groups. For more information, see Adding Consumer Group.
    3. Download Python3 demo and decompress it.
    4. Run python main.py --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=1.
    broker is the private network access address for data subscription to Kafka, and topic is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.
    group, user, and password are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.
    trans2sql indicates whether to enable conversion to SQL statement.
    5. Observe consumption.
    

    Protocol File Description

    The demo for each programming language uses JSON for serialization and contains a Record definition file. In the demo for Java, the path of the definition file is consumerDemo-json-java\\src\\main\\java\\json\\FlatRecord.java.

    Fields in the record

    
    Field in Record
    Description
    id
    The globally incremental ID.
    version
    The protocol version, which is v1 currently.
    messageType
    The message type. Enumerated values: "INSERT", "UPDATE", "DELETE", "DDL", "BEGIN", "COMMIT", "HEARTBEAT", "CHECKPOINT".
    fileName
    The name of the binlog file where the current record is located.
    position
    The end offset of the current record in the binlog in the format of End_log_pos@binlog file number. For example, if the current record is in file mysql-bin.000004 and the end offset is 2196, then the value of this parameter will be 2196@4.
    safePosition
    The start offset of the current transaction in the binlog, which is in the same format as described above.
    timestamp
    The time when the data was written to the binlog, which is a UNIX timestamp in seconds.
    gtid
    The current GTID, such as c7c98333-6006-11ed-bfc9-b8cef6e1a231:9.
    transactionId
    The transaction ID, which is generated only for COMMIT events.
    serverId
    The server ID of the source database, which can be viewed by running SHOW VARIABLES LIKE 'server_id'.
    threadId
    The ID of the session that committed the current transaction, which can be viewed by running SHOW processlist;.
    sourceType
    The source database type, which currently can only be MySQL.
    sourceVersion
    The source database version, which can be viewed by running: select version();.
    schemaName
    Database name.
    tableName
    Table name.
    objectName
    Format: Database name.table name.
    columns
    The definitions of columns in the table.
    oldColumns
    The data of the row before DML execution. If the message is an INSERT message, the array will be null.
    newColumns
    The data of the row after DML execution. If the message is a DELETE message, the array will be null.
    sql
    The DDL SQL statement.
    executionTime
    The DDL execution duration in seconds.
    heartbeatTimestamp
    The timestamp of the heartbeat message in seconds, which is present only for heartbeat messages.
    syncedGtid
    The collection of GTIDs parsed by DTS in the format of c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13.
    fakeGtid
    Whether the current GTID is forged. If gtid_mode is not enabled, DTS will forge a GTID.
    pkNames
    If the table in the source database has a primary key, this parameter will be carried in the DML message; otherwise, it will not be carried.
    readerTimestamp
    The time when DTS processed the current data record, which is a UNIX timestamp in milliseconds.
    tags
    The status_vars in QueryEvent. For more information, see binary_log::Query_event Class Reference.
    total
    The total number of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension.
    index
    The index of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension.

    MySQL column attributes in the record

    name: The column name.
    dataTypeNumber: The type of the data recorded in the binlog. For values, see MySQL official document.
    isKey: Whether the current key is the primary key.
    originalType: The type defined in DDL.

    MySQL data type conversion logic

    In the JSON protocol, all MySQL data types are converted to strings.
    String types such as varchar are all converted to UTF-8 encoding.
    Numeric types are all converted to strings equal to the value, such as "3.0".
    Time types are output in the format of yyyy-dd-mm hh:MM:ss.milli.
    Timestamp types are output as the number of milliseconds.
    Binary types such as binary and blob are output as strings equal to their hex values, such as "0xfff".
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support