CREATE DATABASE IF NOT EXISTS <target_database>[COMMENT database_comment][WITH (key1=val1, key2=val2, ...)] -- 指明写入目标库的参数AS DATABASE <source_catalog>.<source_database> -- source_database 是被同步的源数据库INCLUDING { ALL TABLES | TABLE 'table_name' }-- INCLUDING ALL TABLES 表示同步数据库中的所有表-- INCLUDING TABLE 'table' 表示同步数据库中特定的表,支持正则表达式,如 'order_.*';-- 同步多张表时,可以写成 INCLUDING TABLE 'tableA|tableB|tableC'的格式[EXCLUDING TABLE 'table_name']-- EXCLUDING TABLE 'table' 表示不同步数据库中特定的表,支持正则表达式,如 'order_.*';-- 排除多张表时,可以写成 EXCLUDING TABLE 'tableA|tableB|tableC'的格式[/*+ `OPTIONS`('key1'='val1', 'key2'='val2', ... ) */]-- (可选,指明读取source的参数,如指定source serverId的范围,解析debezium时间戳字段类型等)
参数 | 解释 |
target_database | 待写入的目标数据库名 |
database_comment | 待写入的数据库注释 |
WITH参数 | 指明写入目标库的参数,目前会被翻译成下游 sink 表的描述参数 |
<source_catalog>.<source_database> | 声明源 catalog 中需要同步的数据库 |
INCLUDING ALL TABLES | 同步源库中的所有表 |
INCLUDING TABLE | 同步数据库中特定的表,支持正则表达式,如 'order_.*' ; 同步多张表时,可以写成 INCLUDING TABLE 'tableA|tableB'格式 |
EXCLUDING TABLE | 表示不同步数据库中特定的表,支持正则表达式,如 'order_.*'; 排除多张表时,可以写成 EXCLUDING TABLE 'tableA|tableB'格式 |
OPTIONS | 可选,指明读取 Source 时覆盖的参数,如指定 source serverId 的范围等 |
create catalog my_mysql with(...);create database if not exists sink_dbwith ('connector' = 'doris','table.identifier' = 'db1.$tableName_doris'...)including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型
-- 必须设置 table.optimizer.deterministic-operator-uid-for-cdas=true 才可以启用无损加表能力SET table.optimizer.deterministic-operator-uid-for-cdas=true;-- 可选:开启多 Source 复用,降低资源占用,提升稳定性SET table.optimizer.mysql-cdc-source.merge.enabled=true;-- 请替换成实际的 CDAS 语句,但至少要保留 'scan.newly-added-table.enabled' = 'true' 的 OPTIONS 选项create catalog my_mysql with(...);create database if not exists sink_dbwith (...)including all tables/*+ `OPTIONS`('scan.newly-added-table.enabled' = 'true','scan.lastchunk.optimize.enable' = 'true') */;
参数 | 解释 |
oceanus.source.include-metadata.fields | 需要同步的 source 表的元字段,格式为 'table_name:table_name;meta.batch_id:batch_id', 元数据字段定义通过分号;分隔,每个元数据字段格式为 metadataColumn:alias, 第一部分为实际对应的元数据 column,第二部分为重命名后的值。 |
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'xx','password' = 'xxx','base-url' = 'xxx');create database if not exists print_sink_dbcomment 'test_sink'with ('connector' = 'print','print-identifier' = '$tableName')as database `my_mysql`.`test`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai','oceanus.source.include-metadata.fields'='table_name:table_name;database_name:database_name;op_ts:op_ts;meta.table_name:meta_table_name;meta.database_name:meta_database_name;meta.op_ts:meta_op_ts;meta.op_type:meta_op_type;meta.batch_id:meta_batch_id;meta.is_ddl:meta_id_ddl;meta.mysql_type:meta_mysql_type;meta.update_before:meta_update_before;meta.pk_names:meta_pk_names;meta.sql:meta_sql;meta.sql_type:meta_sql_type;meta.ts:meta_ts')*/;
create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'XXX','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port'-- 'jdbc.properties.tinyInt1isBit' = 'false' -- jdbc参数,是否把tinyInt识别为bool. 默认为true.-- 如果表字段包含tinyint(1), 建议把 jdbc.properties.tinyInt1isBit 设置为false.);
jdbc.properties.* 传入对应的 jdbc 参数。SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink'with ('connector' = 'hudi','path' = 'hdfs://namenode:8020/user/hive/warehouse/$tableName_mor') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ( 'type' = 'jdbc', 'default-database' = 'test', 'username' = 'root', 'password' = 'XXX', 'base-url' = 'jdbc:mysql://ip:port' );create database if not exists sink_db comment 'test_sink' with ( 'connector' = 'clickhouse://172.11.11.11:8123', -- 如果ClickHouse集群未配置账号密码可以不指定 --'username' = 'root', -- ClickHouse集群用户名 --'password' = 'root', -- ClickHouse集群的密码 'database-name' = 'testdb', -- 数据写入目的数据库 'table-name' = 'test_table1', -- 数据写入目的数据表 'sink.batch-size' = '1000', 'table.collapsing.field' = 'Sign' ) as databasemy_mysql.test_dbincluding all tables /*+OPTIONS('server-time-zone' = 'Asia/Shanghai') */;
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink' with ('connector' = 'doris','table.identifier' = 'trade_log.$tableName','username' = 'admin','password' = 'xxx','sink.batch.size' = '500','sink.batch.interval' = '1s','fenodes' = 'ip:port') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink' with ('connector' = 'hive','hive-version' = '2.3.6','hive-database' = 'test_100','hive-table' = '$tableName','sink.partition-commit.policy.kind' = 'metastore') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;-- 因为hive sink不支持变更数据,此处的hint会把原始cdc的变更数据转成成append流下发

SET table.optimizer.mysql-cdc-source.merge.enabled=true;-- 注册mysql的catalogcreate catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');-- 注册hive端catalogcreate catalog my_hive with ('type' = 'hive','default-database' = 'default','hive-version' = '2.3.5');create database if not exists `my_hive`.`trade_log`as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;-- 因为hive sink不支持变更数据,此处的hint会把原始cdc的变更数据转成成append流下发
SET table.optimizer.mysql-cdc-source.merge.enabled = true;SET table.optimizer.deterministic-operator-uid-for-cdas=true;-- 注册mysql的catalogcreate catalog `my_mysql` with ('type' = 'jdbc','default-database' = 'testdb', -- 数据库名'username' = '${username}', -- mysql用户名'password' = '${password}', -- mysql密码'base-url' = 'jdbc:mysql://ip:3306');-- 整库同步create database if not exists `my_dlc_database`comment 'test db sync'with ('connector' = 'iceberg-inlong', -- 固定值'catalog-database' = 'test', -- DLC内表所在的数据库名称'catalog-table' = 'my_$tableName', -- DLC内表名称,$tableName会自动替换为要同步的表名,DLC表需要提前创建'default-database' = 'test', -- DLC内表所在的数据库名称'catalog-name' = 'HYBRIS', -- 固定值'catalog-impl' = 'org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog', -- 固定值'qcloud.dlc.managed.account.uid' = '100026378089', -- 固定值,DLC管理账号的uid'qcloud.dlc.secret-id' = '${secret_id}', -- DLC 用户的secretId从https://console.tencentcloud.com/cam/capi中获取'qcloud.dlc.secret-key' = '${secret_key}', -- DLC 用户的secretKey,从https://console.tencentcloud.com/cam/capi中获取'qcloud.dlc.region' = 'ap-guangzhou', -- DLC 所在地域,必须填ap-地域格式'qcloud.dlc.jdbc.url' = 'jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type=SparkSQLTask&database_name=test&datasource_connection_name=DataLakeCatalog®ion=ap-guangzhou&data_engine_name=${engine_name}', -- DLC jdbc接入url,格式见https://www.tencentcloud.com/document/product/1342/61547?from_cn_redirect=1'uri' = 'dlc.internal.tencentcloudapi.com', -- 固定值'user.appid' = '${appid}', -- DLC 用户的 appid'request.identity.token' = '100026378089' -- 固定值,DLC内表接入的token)as database `my_mysql`.`test` including table 'user.*'excluding table 'user_info_20230530|user_behavior_tmp'/* +`OPTIONS`('server-time-zone' = 'Asia/Shanghai','scan.newly-added-table.enabled' = 'true','scan.lastchunk.optimize.enable' = 'true'*/;
pipeline.task-name-length 参数来限制 taskName 的长度,能极大的提高作业稳定性和日志可读性。(适用 Flink-1.13 和 Flink-1.14 版本)。
可以在作业的配置中生效:set pipeline.task-name-length=80;
文档反馈