CREATE DATABASE IF NOT EXISTS <target_database>[COMMENT database_comment][WITH (key1=val1, key2=val2, ...)] -- Specifies the parameters for writing to the target database.AS DATABASE <source_catalog>.<source_database> -- source_database is the source database to be synchronized.INCLUDING { ALL TABLES | TABLE 'table_name' }-- INCLUDING ALL TABLES indicates that all tables in the database are synchronized.-- INCLUDING TABLE 'table' specifies that specific tables in the database are synchronized, supporting regular expressions such as 'order_.*'.-- When synchronizing multiple tables, you can use the format INCLUDING TABLE 'tableA|tableB|tableC'.[EXCLUDING TABLE 'table_name']-- EXCLUDING TABLE 'table' specifies that specific tables in the database are excluded from synchronization, supporting regular expressions such as 'order_.*'.-- When excluding multiple tables, you can use the format EXCLUDING TABLE 'tableA|tableB|tableC'.[/*+ `OPTIONS`('key1'='val1', 'key2'='val2', ... ) */]-- (Optional) Specifies parameters for reading the source, such as defining the range of source server IDs, parsing debezium timestamp field types, etc.
Parameter | Description |
target_database | The name of the target database to be written to. |
database_comment | The database comment to be written. |
WITH parameter | Parameters that specify the target database for writing are currently translated into descriptive parameters for the downstream sink table. |
<source_catalog>.<source_database> | Declare the databases in the source catalog that need to be synchronized. |
INCLUDING ALL TABLES | Synchronize all tables in the source database. |
INCLUDING TABLE | Synchronizes specific tables in the database, supports regular expressions such as 'order_.*'. When synchronizing multiple tables, you can write it in the format INCLUDING TABLE 'tableA|tableB'. |
EXCLUDING TABLE | This syntax excludes specific tables in the database from synchronization and supports regular expressions, such as 'order_.*'. When multiple tables are excluded, you can use the format EXCLUDING TABLE 'tableA|tableB'. |
OPTIONS | Optional. Specifies parameters to override when the Source is read, such as specifying the range of source serverId. |
create catalog my_mysql with(...);create database if not exists sink_dbwith ('connector' = 'doris','table.identifier' = 'db1.$tableName_doris'...)including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- Declares the time zone type for parsing timestamp fields.
-- You must set table.optimizer.deterministic-operator-uid-for-cdas=true to enable the lossless table addition capability.SET table.optimizer.deterministic-operator-uid-for-cdas=true;-- Optional: Enable multi-source reuse to reduce resource consumption and improve stability.SET table.optimizer.mysql-cdc-source.merge.enabled=true;-- Replace this with the actual CDAS statement, but ensure you retain at least the OPTIONS clause with 'scan.newly-added-table.enabled' = 'true'.create catalog my_mysql with(...);create database if not exists sink_dbwith (...)including all tables/*+ `OPTIONS`('scan.newly-added-table.enabled' = 'true','scan.lastchunk.optimize.enable' = 'true') */;
Parameter | Description |
oceanus.source.include-metadata.fields | The metadata fields of the source table to be synchronized are in the format 'table_name:table_name;meta.batch_id:batch_id'. The metadata field definitions are separated by semicolons (;). Each metadata field is in the format metadataColumn:alias, where the first part is the actual corresponding metadata column and the second part is the renamed value. |
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'xx','password' = 'xxx','base-url' = 'xxx');create database if not exists print_sink_dbcomment 'test_sink'with ('connector' = 'print','print-identifier' = '$tableName')as database `my_mysql`.`test`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai','oceanus.source.include-metadata.fields'='table_name:table_name;database_name:database_name;op_ts:op_ts;meta.table_name:meta_table_name;meta.database_name:meta_database_name;meta.op_ts:meta_op_ts;meta.op_type:meta_op_type;meta.batch_id:meta_batch_id;meta.is_ddl:meta_id_ddl;meta.mysql_type:meta_mysql_type;meta.update_before:meta_update_before;meta.pk_names:meta_pk_names;meta.sql:meta_sql;meta.sql_type:meta_sql_type;meta.ts:meta_ts')*/;
create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'XXX','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port'-- 'jdbc.properties.tinyInt1isBit' = 'false' -- A jdbc parameter that specifies whether to interpret tinyInt as bool. The default value is true.-- If a table field contains tinyint(1), we recommend that you set jdbc.properties.tinyInt1isBit to false.);
jdbc.properties.*.SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink'with ('connector' = 'hudi','path' = 'hdfs://namenode:8020/user/hive/warehouse/$tableName_mor') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- Declares the time zone type for parsing timestamp fields.
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ( 'type' = 'jdbc', 'default-database' = 'test', 'username' = 'root', 'password' = 'XXX', 'base-url' = 'jdbc:mysql://ip:port' );create database if not exists sink_db comment 'test_sink' with ( 'connector' = 'clickhouse://172.11.11.11:8123', -- If the ClickHouse cluster is not configured with a username and password, you can omit them. --'username' = 'root', -- The username of the ClickHouse cluster --'password' = 'root', -- The password of the ClickHouse cluster 'database-name' = 'testdb', -- The destination database for data writing 'table-name' = 'test_table1', -- The destination table for data writing 'sink.batch-size' = '1000', 'table.collapsing.field' = 'Sign' ) as databasemy_mysql.test_dbincluding all tables /*+OPTIONS('server-time-zone' = 'Asia/Shanghai') */;
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink' with ('connector' = 'doris','table.identifier' = 'trade_log.$tableName','username' = 'admin','password' = 'xxx','sink.batch.size' = '500','sink.batch.interval' = '1s','fenodes' = 'ip:port') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- Declares the time zone type for parsing timestamp fields.
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink' with ('connector' = 'hive','hive-version' = '2.3.6','hive-database' = 'test_100','hive-table' = '$tableName','sink.partition-commit.policy.kind' = 'metastore') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;-- Because the hive sink does not support change data, the hint here converts the original cdc change data into an append stream for distribution.

SET table.optimizer.mysql-cdc-source.merge.enabled=true;-- Register a mysql catalogcreate catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');-- Register a hive catalogcreate catalog my_hive with ('type' = 'hive','default-database' = 'default','hive-version' = '2.3.5');create database if not exists `my_hive`.`trade_log`as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;-- Because the hive sink does not support change data, the hint here converts the original cdc change data into an append stream for distribution.
SET table.optimizer.mysql-cdc-source.merge.enabled = true;SET table.optimizer.deterministic-operator-uid-for-cdas=true;-- Register a mysql catalogcreate catalog `my_mysql` with ('type' = 'jdbc','default-database' = 'testdb', -- Database name'username' = '${username}', -- mysql username'password' = '${password}', -- mysql password'base-url' = 'jdbc:mysql://ip:3306');-- Full database synchronizationcreate database if not exists `my_dlc_database`comment 'test db sync'with ('connector' = 'iceberg-inlong', -- The fixed value'catalog-database' = 'test', -- The name of the database where the DLC internal table resides'catalog-table' = 'my_$tableName', -- DLC internal table name. $tableName is automatically replaced with the name of the table to be synchronized. The DLC table must be created in advance.'default-database' = 'test', -- The name of the database where the DLC internal table resides'catalog-name' = 'HYBRIS', -- The fixed value'catalog-impl' = 'org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog', -- The fixed value'qcloud.dlc.managed.account.uid' = '100026378089', -- The fixed value, which is the uid of the DLC management account'qcloud.dlc.secret-id' = '${secret_id}', -- The secretId for the DLC user is obtained from https://console.tencentcloud.com/cam/capi'qcloud.dlc.secret-key' = '${secret_key}', -- The secretKey for the DLC user, which is obtained from https://console.tencentcloud.com/cam/capi'qcloud.dlc.region' = 'ap-guangzhou', -- The region for DLC. The value must be in the 'ap-region' format.'qcloud.dlc.jdbc.url' = 'jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type=SparkSQLTask&database_name=test&datasource_connection_name=DataLakeCatalog®ion=ap-guangzhou&data_engine_name=${engine_name}', -- The jdbc access url for DLC. For the format, see https://www.tencentcloud.com/document/product/1342/61547?from_cn_redirect=1'uri' = 'dlc.internal.tencentcloudapi.com', -- The fixed value'user.appid' = '${appid}', -- The appid of the DLC user'request.identity.token' = '100026378089' -- The fixed value, which is the token for DLC internal table access)as database `my_mysql`.`test` including table 'user.*'excluding table 'user_info_20230530|user_behavior_tmp'/* +`OPTIONS`('server-time-zone' = 'Asia/Shanghai','scan.newly-added-table.enabled' = 'true','scan.lastchunk.optimize.enable' = 'true'*/;
pipeline.task-name-length parameter to limit the length of task names, which can significantly improve job stability and log readability. (Applicable to flink-1.13 and flink-1.14 versions).
This parameter can take effect in the job configuration:set pipeline.task-name-length=80;
フィードバック