tencent cloud

Feedback

Consuming Subscribed Data with Kafka Client (Avro)

Last updated: 2023-02-06 16:30:10

    Overview

    In data subscription (Kafka Edition, where the current Kafka Server version is 2.6.0), you can consume the subscribed data through Kafka 0.11 or later available at DOWNLOAD. This document provides client consumption demos for Java, Go, and Python for you to quickly test the process of data consumption and understand the method of data parsing.
    When configuring the subscription task, you can select different formats of subscribed data, including ProtoBuf, Avro, and JSON. ProtoBuf and Avro adopt the binary format with a higher consumption efficiency, while JSON adopts the easier-to-use lightweight text format. The reference demo varies by the selected data format.
    This document provides a demo of the Avro format. The demo already contains the Avro protocol file, so you don't need to download it separately.
    Notes
    Currently, data consumption over the Avro protocol is supported only for TencentDB for MySQL and TDSQL-C for MySQL.

    Notes

    The demo only prints out the consumed data and does not contain any usage instructions. You need to write your own data processing logic based on the demo. You can also use Kafka clients in other languages to consume and parse data.
    Currently, data subscription to Kafka for consumption can be implemented over the Tencent Cloud private network but not the public network. In addition, the subscribed database instance and the data consumer must be in the same region.
    The Kafka built in DTS has a certain upper limit for processing individual messages. When a single row of data in the source database exceeds 10 MB, this row may be discarded.

    Downloading a consumption demo

    Demo Language
    TencentDB for MySQL and TDSQL-C MySQL
    Go
    Java
    Python

    Instructions for the Java demo

    Compilation environment: Maven or Gradle and JDK 8. You can choose a desired package management tool. The following takes Maven as an example. Runtime environment: Tencent Cloud CVM (which can access the private network address of the Kafka server only if it is in the same region as the subscribed instance). Install JRE 8. Directions:
    1. Create a data subscription task (NewDTS) as instructed in Creating MySQL or TDSQL for MySQL Data Subscription.
    2. Create one or multiple consumer groups. For more information, see Adding Consumer Group.
    3. Download the Java demo and decompress it.
    4. Access the decompressed directory. Maven model and pom.xml files are placed in the directory for your use as needed. Package with Maven by running mvn clean package.
    5. Run the demo. After packaging the project with Maven, go to the target folder target and run java -jar consumerDemo-avro-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx --trans2sql.
    broker is the private network access address for data subscription to Kafka, and topic is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.
    group, user, and password are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.
    trans2sql indicates whether to enable conversion to SQL statement. In Java code, if this parameter is carried, the conversion will be enabled.
    Notes
    If trans2sql is carried, javax.xml.bind.DatatypeConverter.printHexBinary() will be used to convert byte values to hex values. You should use JDK 1.8 or later to avoid incompatibility. If you don't need SQL conversion, comment this parameter out.
    6. Observe consumption.
    

    Instructions for the Go demo

    Compiling environment: Go 1.12 or later, with the Go module environment configured. Runtime environment: Tencent Cloud CVM (which can access the private network address of the Kafka server only if it is in the same region as the subscribed instance). Directions:
    1. Create a data subscription task (NewDTS) as instructed in Creating MySQL or TDSQL for MySQL Data Subscription.
    2. Create one or multiple consumer groups. For more information, see Adding Consumer Group.
    3. Download the Go demo and decompress it.
    4. Access the decompressed directory and run go build -o subscribe ./main/main.go to generate the executable file subscribe.
    5. Run ./subscribe --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=true.
    broker is the private network access address for data subscription to Kafka, and topic is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.
    group, user, and password are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.
    trans2sql indicates whether to enable conversion to SQL statement.
    6. Observe consumption.
    

    Instructions for the Python 3 demo

    Compilation and runtime environment: Tencent Cloud CVM (which can access the private network address of the Kafka server only if it is in the same region as the subscribed instance). Install Python 3 and pip3 (for dependency package installation). Use pip3 to install the dependency package:
    pip install flag
    pip install kafka-python
    pip install avro
    Directions:
    1. Create a data subscription task (NewDTS) as instructed in Creating MySQL or TDSQL for MySQL Data Subscription.
    2. Create one or multiple consumer groups. For more information, see Adding Consumer Group.
    3. Download the Python 3 demo and decompress it.
    4. Run python main.py --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=1.
    broker is the private network access address for data subscription to Kafka, and topic is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.
    group, user, and password are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.
    trans2sql indicates whether to enable conversion to SQL statement.
    5. Observe consumption.
    

    Key demo logic description

    Files in the demo are as described below, with the Java demo as an example.
    consumerDemo-avro-java\\src\\main\\resources\\avro-tools-1.8.2.jar: The tool used to generate Avro protocol code.
    consumerDemo-avro-java\\src\\main\\java\\com\\tencent\\subscribe\\avro: The directory where the Avro tool generates code.
    consumerDemo-avro-java\\src\\main\\resources\\Record.avsc: The protocol definition file.
    14 structures (called schemas in Avro) are defined in Record.avsc. The main data structure is record, which is used to represent a data record in binlog. The record structure is as follows. Other data structures can be viewed in Record.avsc.
    {
    "namespace": "com.tencent.subscribe.avro", // The last schema in `Record.avsc`, with `name` displayed as `Record`.
    "type": "record",
    "name": "Record", // `name` is displayed as `Record`, indicating the format of the data consumed from Kafka.
    "fields": [
    {
    "name": "id", // `id` indicates a globally incremental ID. More record values are explained as follows:
    "type": "long",
    "doc": "unique id of this record in the whole stream"
    },
    {
    "name": "version", // `version` indicates the protocol version.
    "type": "int",
    "doc": "protocol version"
    },
    {
    "name": "messageType", // Message type
    "aliases": [
    "operation"
    ],
    "type": {
    "namespace": "com.tencent.subscribe.avro",
    "name": "MessageType",
    "type": "enum",
    "symbols": [
    "INSERT",
    "UPDATE",
    "DELETE",
    "DDL",
    "BEGIN",
    "COMMIT",
    "HEARTBEAT",
    "CHECKPOINT",
    "ROLLBACK"
    ]
    }
    },
    {
    ......
    },
    }
    Fields in a record are as explained below:
    Field
    Description
    id
    The globally incremental ID.
    version
    The protocol version, which is v1 currently.
    messageType
    The message type. Enumerated values: "INSERT", "UPDATE", "DELETE", "DDL", "BEGIN", "COMMIT", "HEARTBEAT", "CHECKPOINT".
    fileName
    The name of the binlog file where the current record is located.
    position
    The end offset of the current record in the binlog in the format of End_log_pos@binlog file number. For example, if the current record is in file mysql-bin.000004 and the end offset is 2196, then the value of this parameter will be 2196@4.
    safePosition
    The start offset of the current transaction in the binlog, which is in the same format as described above.
    timestamp
    The time when the data was written to the binlog, which is a UNIX timestamp in seconds.
    gtid
    The current GTID, such as c7c98333-6006-11ed-bfc9-b8cef6e1a231:9.
    transactionId
    The transaction ID, which is generated only for COMMIT events.
    serverId
    The server ID of the source database, which can be viewed by running SHOW VARIABLES LIKE 'server_id'.
    threadId
    The ID of the session that committed the current transaction, which can be viewed by running SHOW processlist;.
    sourceType
    The source database type, which currently can only be MySQL.
    sourceVersion
    The source database version, which can be viewed by running: select version();.
    schemaName
    The database name.
    tableName
    The table name.
    objectName
    Format: Database name.table name.
    columns
    The definitions of columns in the table.
    oldColumns
    The data of the row before DML execution. If the message is an INSERT message, the array will be null. There are 12 types of elements in the array, i.e., Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. For more information, see the definitions in the demo.
    newColumns
    The data of the row after DML execution. If the message is a DELETE message, the array will be null. There are 12 types of elements in the array, i.e., Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. For more information, see the definitions in the demo.
    sql
    The DDL SQL statement.
    executionTime
    The DDL execution duration in seconds.
    heartbeatTimestamp
    The timestamp of the heartbeat message in seconds, which is present only for heartbeat messages.
    syncedGtid
    The collection of GTIDs parsed by DTS in the format of c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13.
    fakeGtid
    Whether the current GTID is forged. If gtid_mode is not enabled, DTS will forge a GTID.
    pkNames
    If the table in the source database has a primary key, this parameter will be carried in the DML message; otherwise, it will not be carried.
    readerTimestamp
    The time when DTS processed the current data record, which is a UNIX timestamp in milliseconds.
    tags
    The status_vars in QueryEvent. For more information, see binary_log::Query_event Class Reference.
    total
    The total number of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension.
    index
    The index of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension.
    The field describing column attributes in a record is Field, including the following four attributes:
    name: The column name.
    dataTypeNumber: The type of the data recorded in the binlog. For values, see COM_QUERY Response.
    isKey: Whether the current key is the primary key.
    originalType: The type defined in DDL.

    Database field mappings

    The following lists the mappings between database (such as MySQL) field types and data types defined in the Avro protocol.
    Type in MySQL
    Corresponding Type in Avro
    MYSQL_TYPE_NULL
    EmptyObject
    MYSQL_TYPE_INT8
    Integer
    MYSQL_TYPE_INT16
    Integer
    MYSQL_TYPE_INT24
    Integer
    MYSQL_TYPE_INT32
    Integer
    MYSQL_TYPE_INT64
    Integer
    MYSQL_TYPE_BIT
    Integer
    MYSQL_TYPE_YEAR
    DateTime
    MYSQL_TYPE_FLOAT
    Float
    MYSQL_TYPE_DOUBLE
    Float
    MYSQL_TYPE_VARCHAR
    Character
    MYSQL_TYPE_STRING
    Character. If the original type is binary, this type will correspond to BinaryObject.
    MYSQL_TYPE_VAR_STRING
    Character. If the original type is varbinary, this type will correspond to BinaryObject.
    MYSQL_TYPE_TIMESTAMP
    Timestamp
    MYSQL_TYPE_DATE
    DateTime
    MYSQL_TYPE_TIME
    DateTime
    MYSQL_TYPE_DATETIME
    DateTime
    MYSQL_TYPE_TIMESTAMP_NEW
    Timestamp
    MYSQL_TYPE_DATE_NEW
    DateTime
    MYSQL_TYPE_TIME_NEW
    DateTime
    MYSQL_TYPE_DATETIME_NEW
    DateTime
    MYSQL_TYPE_ENUM
    TextObject
    MYSQL_TYPE_SET
    TextObject
    MYSQL_TYPE_DECIMAL
    Decimal
    MYSQL_TYPE_DECIMAL_NEW
    Decimal
    MYSQL_TYPE_JSON
    TextObject
    MYSQL_TYPE_BLOB
    BinaryObject
    MYSQL_TYPE_TINY_BLOB
    BinaryObject
    MYSQL_TYPE_MEDIUM_BLOB
    BinaryObject
    MYSQL_TYPE_LONG_BLOB
    BinaryObject
    MYSQL_TYPE_GEOMETRY
    BinaryObject
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support