
CREATE DATABASEcdc_database;CREATE TABLEcdc_database.cdc_source(idBIGINT,classVARCHAR(128),scoreINT, PRIMARY KEY(id));
CREATE DATABASE cdc_database;CREATE TABLE cdc_database.cdc_sink(id LONG, class STRING, score INT, PRIMARY KEY(id)) using tc_iceberg;CREATE TABLE cdc_database.cdc_compute(class STRING, avg_score INT, PRIMARY KEY(class)) using tc_iceberg;
CREATE CATALOG tc_iceberg_catalog WITH ('type'='mixed_iceberg','catalog-type'='hive',uri'='thrift://xxx:xxx', -- Enter the Catalog access address exposed by DLC external access.'table-formats'='MIXED_ICEBERG');CREATE TABLEmysql_cdc_source(idBIGINT,classSTRING,scoreINT,PRIMARY KEY (id) NOT ENFORCED -- If the database table to be synchronized defines a primary key, the key also needs to be defined here.) WITH (connector' = 'mysql-cdc' -- Fixed value 'mysql-cdc'.hostname' = 'xxx', -- IP address of the database.port' = 'xxx', -- Port for database access.username' = 'xxx', -- Username for database access (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).password' = 'xxx', -- Password for database access.database-name' = 'cdc_database', -- Database to be synchronized.table-name' = 'cdc_source' -- Data table name to be synchronized.);INSERT INTOtc_iceberg_catalog.cdc_database.cdc_sinkSELECT * FROMmysql_cdc_source;
CREATE CATALOG tc_iceberg_catalog WITH ('type'='mixed_iceberg','catalog-type'='hive',uri'='thrift://xxx:xxx', -- Enter the Catalog access address exposed by DLC external access.'table-formats'='MIXED_ICEBERG');INSERT INTOtc_iceberg_catalog.cdc_database.cdc_computeSELECT class, avg(score) AS avg_scoreFROMtc_iceberg_catalog.cdc_database.cdc_sourceGROUP BYclass;
./flink run --class com.tencent.dlc.tciceberg.flink.FlinkSQLDemo /data/jars/flink-demo-1.0-SNAPSHOT.jar
INSERT INTOcdc_database.cdc_sourceVALUES(1, 'class1', 80);INSERT INTOcdc_database.cdc_sourceVALUES(2, 'class1', 85);INSERT INTOcdc_database.cdc_sourceVALUES(3, 'class2', 85);INSERT INTOcdc_database.cdc_sourceVALUES(4, 'class2', 90);DELETE FROMcdc_database.cdc_sourceWHERE id = 1;UPDATEcdc_database.cdc_sourceSETscore= 100 where id = 3;
SELECT * FROM cdc_database.cdc_sink;SELECT * FROM cdc_database.cdc_compute;
<?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>fs.lakefs.impl</name><value>org.apache.hadoop.fs.lakefs.CosFileSystem</value></property><property><name>fs.cosn.impl</name><value>org.apache.hadoop.fs.CosFileSystem</value></property><!-- Correctly configured availability zone --><property><name>fs.cosn.bucket.region</name><value>ap-xxx</value></property><property><name>fs.cosn.posix_bucket.fs.impl</name><value>org.apache.hadoop.fs.CosFileSystem</value></property><property><name>fs.cosn.credentials.provider</name><value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value></property><property><name>qcloud.dlc.endpoint</name><value>dlc.tencentcloudapi.com</value></property><property><name>fs.cosn.posix_bucket.fs.userinfo.region</name><value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value></property><!-- Configure user's Secret ID --><property><name>fs.cosn.posix_bucket.fs.userinfo.secretId</name><value>xxx</value></property><!-- Configure user's Secret KEY --><property><name>fs.cosn.posix_bucket.fs.userinfo.secretKey</name><value>xxx</value></property></configuration>
<properties><flink.version>1.16.3</flink.version></properties><dependencies<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>
public class FlinkSQLDemo {public static void main(String[] args) {// Create an execution environment and configure checkpointStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(60000);env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");env.getCheckpointConfig().setCheckpointTimeout(60000);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// Replace with the SQL to be executed.String sql = "SQL to be excuted...";tEnv.executeSql(sourceSql);}}
Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback