tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

数据库 TDSQL-MySQL

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 14:53:51

介绍

tdsql-subscribe connector 是针对腾讯云 TDSQL-MySQL 数据订阅的专有 connector,通过 数据订阅 功能接入 TDSQL-MySQL 的增量 binlog 数据,使用前请确保数据订阅任务已经配置成功。
注意
数据库 TDSQL Connector 目前处于 Beta 版本,如有需求请您 工单 联系我们。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
不支持
1.16
不支持

使用范围

tdsql-subscribe connector 支持用作数据源表(Source),不可以作为数据流的目的表(Sink)。

DDL 定义

当 tdsql-subscribe connector 作为 source 时,with 参数大部分与 Kafka connector 参数类似,连接参数都可以在订阅任务中找到。 值得注意的是,在使用 tdsql-subscribe connector 时,format 必须指定为 protobuf格式,因为数据订阅中,发送到 Kafka 的消息格式为 protobuf; 相较于通常使用的 Kafka connector,tdsql-subscribe connector 会多了一些认证信息,认证信息也是源于订阅任务。

用作数据源(Source)

protobuf 格式输入

CREATE TABLE `DataInput` (
`id` INT,
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'tdsql-subscribe', -- 注意选择对应的内置 Connector
'tdsql.database.name' = 'test_case_2022_06_0*', -- 对订阅消息进行过滤,消费数据库名满足 test_case_2022_06_0* 正则的订阅数据
'tdsql.table.name' = 'test_0*', -- 对订阅消息进行过滤,消费数据表满足 test_0* 正则的订阅数据
'topic' = 'topic-subs-5xop97nffk-tdsqlshard-xxx', -- 替换为订阅任务消费的 Topic
'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:3212', -- 替换为您的订阅任务 Kafka 连接地址
'properties.group.id' = 'consumer-grp-subs-xxx-kk',
'format' = 'protobuf', -- 只能是protobuf格式
'properties.security.protocol'='SASL_PLAINTEXT', -- 认证协议
'properties.sasl.mechanism'='SCRAM-SHA-512', -- 认证方式
'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="account-subs-xxx-username" password="psw";' --用户名和密码
);
CREATE TABLE `jdbc_upsert_sink_table` (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT
) WITH (
-- 指定数据库连接参数
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.28.28.138:3306/testdb', -- 请替换为您的实际 MySQL 连接参数
'table-name' = 'sink', -- 需要写入的数据表
'username' = 'user', -- 数据库访问的用户名(需要提供 INSERT 权限)
'password' = 'psw' -- 数据库访问的密码
);
INSERT INTO jdbc_upsert_sink_table SELECT * FROM DataInput;

WITH 参数

参数值
必填
默认值
描述
connector
固定值为 'tdsql-subscribe'
topic
要读的 Kafka Topic 名
properties.bootstrap.servers
逗号分隔的 Kafka Bootstrap 地址
properties.group.id
Kafka 消费时的 Group ID
format
Kafka 消息的输入格式。目前只支持 protobuf
scan.startup.mode
group-offsets
Kafka consumer 的启动模式。
可以是 latest-offsetearliest-offsetspecific-offsetsgroup-offsetstimestamp 的任何一种'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',使用 'specific-offsets' 启动模式时需要指定每个 partition 对应的 offsets。
'scan.startup.timestamp-miles' = '1631588815000',使用 'timestamp' 启动模式时需要指定启动的时间戳(单位毫秒)
scan.startup.specific-offsets
如果 scan.startup.mode 的值为'specific-offsets',则必须使用本参数指定具体起始读取的偏移量。例如 'partition:0,offset:42;partition:1,offset:300'
scan.startup.timestamp-millis
如果scan.startup.mode 的值为'timestamp',则必须使用本参数来指定开始读取的时间点(毫秒为单位的 Unix 时间戳)
tdsql.database.name
tdsql 数据库名称,配置此参数,可以消费 tdsql 指定数据库的 binlog,前提是订阅任务包含了此数据库的 binlog。参数支持正则,例如 test_case_2022_06_0*
tdsql.table.name
tdsql 数据表名称,配置此参数,可以消费 tdsql 指定数据表的 binlog,前提是订阅任务包含了此数据表的 binlog。参数支持正则,例如 test_0*test_1,test_2
说明
如果需要配置 tdsql.database.nametdsql.table.name 参数,订阅任务建议配置 订阅全实例 ,如果有多个 Oceanus 任务消费不同 tdsql 数据库表时,多个 Oceanus 任务需要使用订阅任务的不同消费组,可在订阅任务中创建不同消费组。

注意事项

1. 如果订阅任务配置了多库多表或同库多表,需要保证表结构是相同的,才能正确接入订阅任务的数据。
2. 源表中文编码目前仅支持 utf8gbk 两种。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈