tencent cloud

Data Lake Compute

Native table (TC-Iceberg) builds a real-time lakehouse

Download
Focus Mode
Font Size
Last updated: 2026-05-27 11:12:04

Overview

Based on native tables (TC-Iceberg), a complete near-real-time lakehouse scenario can be built, including near-real-time lake ingestion and near-real-time pipeline construction of Cloud Dedicated Cluster (CDC) data.

Prerequisites

Data Lake Compute (DLC) is properly enabled, user permission configurations are completed, and managed storage is enabled.
DLC database data optimization is properly configured. For detailed configurations, refer to enabling data optimization.
External access to DLC-managed storage is properly enabled.
Bind the VPC used by external access to Oceanus/EMR computing resources.
A MySQL database is prepared. It is recommended to purchase Tencent Cloud TencentDB for MySQL. For the detailed process, refer to TencentDB for MySQL purchase methods.
Create a database account with SELECT, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
Enable binlog for the MySQL server, configure the binlog format parameter as ROW, and set the binlog_row_image configuration format to FULL.

Operation Process

This practice demonstrates how to build a complete near-real-time lakehouse through two real-time tasks. The first task will synchronize MySQL data to a TC-Iceberg table. The second task will read the data from the first TC-Iceberg table and aggregate and write the data into the second TC-Iceberg table. For the development of real-time tasks, you can use the Oceanus stream computing platform or self-managed Flink. The specific operation process is as follows:


Creating Database Tables

Log in to the MySQL database and execute the following SQL statements to initialize the source database table:
CREATE DATABASE cdc_database;
CREATE TABLE cdc_database.cdc_source(id BIGINT, class VARCHAR(128), score INT, PRIMARY KEY(id));
Log in to DLC. On the data exploration interface, execute the following SQL statements to create target database tables:
CREATE DATABASE cdc_database;
CREATE TABLE cdc_database.cdc_sink(id LONG, class STRING, score INT, PRIMARY KEY(id)) using tc_iceberg;
CREATE TABLE cdc_database.cdc_compute(class STRING, avg_score INT, PRIMARY KEY(class)) using tc_iceberg;

Uploading Task Dependencies

Using the Oceanus Stream Computing Platform

1. Download dependencies:
Hive library: hive-exec-2.3.9.jar.
Related dependencies of metadata acceleration bucket: chdfs_hadoop_plugin_network-2.8.jar.
Cloud Object Storage (COS) access configuration: hdfs-site.xml. Refer to Example 1 at the end of the document.
2. Upload dependencies to the dependency management module in Oceanus.

Using Self-Managed Flink

1. Download dependencies:
COS access configuration: hdfs-site.xml. Refer to Example 1 at the end of the document.
2. Log in to the Flink cluster and upload the prepared Jar to the flink/ib directory.

Writing Tasks

Using the Oceanus Stream Computing Platform

1. Create two Flink SQL jobs on the Oceanus stream computing platform, and configure the job parameters as follows:
Add all the uploaded dependencies above.
Select Flink-1.16 as the Flink version.
Enable Checkpoint, and adjust the time interval to 60 seconds.
2. Replace the job content with the content in the following Near-real-time CDC synchronization SQL and Real-time aggregation SQL, respectively.

Using Self-Managed Flink

1. Create a Maven project named "flink-demo" through IntelliJ IDEA.
2. Add relevant dependencies in the pom file. For detailed dependencies, refer to Example 2.
3. Java synchronization code: The core code is shown in the following steps. For detailed code, see Example 3.
Add two entry classes and replace the SQL statements to be executed with the content in the following Near-real-time CDC synchronization SQL and Real-time aggregation SQL, respectively.
4. Compile and package the flink-demo project through IntelliJ IDEA. The JAR package flink-demo-1.0-SNAPSHOT.jar will be generated in the target folder of the project.

Near-Real-Time CDC Synchronization SQL

CREATE CATALOG tc_iceberg_catalog WITH (
'type'='mixed_iceberg',
'catalog-type'='hive',
uri'='thrift://xxx:xxx', -- Enter the Catalog access address exposed by DLC external access.
'table-formats'='MIXED_ICEBERG'
);

CREATE TABLE mysql_cdc_source (
id BIGINT,
class STRING,
score INT,
PRIMARY KEY (id) NOT ENFORCED -- If the database table to be synchronized defines a primary key, the key also needs to be defined here.
) WITH (
connector' = 'mysql-cdc' -- Fixed value 'mysql-cdc'.
hostname' = 'xxx', -- IP address of the database.
port' = 'xxx', -- Port for database access.
username' = 'xxx', -- Username for database access (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
password' = 'xxx', -- Password for database access.
database-name' = 'cdc_database', -- Database to be synchronized.
table-name' = 'cdc_source' -- Data table name to be synchronized.
);

INSERT INTO tc_iceberg_catalog.cdc_database.cdc_sink SELECT * FROM mysql_cdc_source;

Real-Time Aggregation SQL

CREATE CATALOG tc_iceberg_catalog WITH (
'type'='mixed_iceberg',
'catalog-type'='hive',
uri'='thrift://xxx:xxx', -- Enter the Catalog access address exposed by DLC external access.
'table-formats'='MIXED_ICEBERG'
);

INSERT INTO tc_iceberg_catalog.cdc_database.cdc_compute
SELECT class, avg(score) AS avg_score
FROM tc_iceberg_catalog.cdc_database.cdc_source GROUP BY class;

Starting Tasks

Using the Oceanus Stream Computing Platform

Save the synchronization job, publish the draft, wait for the job to start successfully, then go to Flink UI and confirm that the task status is normal.

Using Self-Managed Flink

1. Log in to one of the instances in the Flink cluster and upload flink-demo-1.0-SNAPSHOT.jar to the /data/jars/ directory (create the directory if it does not exist).
2. Log in to one of the instances in the Flink cluster and execute the following command to submit the synchronization task under the flink/bin directory.
./flink run --class com.tencent.dlc.tciceberg.flink.FlinkSQLDemo /data/jars/flink-demo-1.0-SNAPSHOT.jar

Verifying Data

1. Log in to the MySQL database and insert test data:
INSERT INTO cdc_database.cdc_source VALUES(1, 'class1', 80);
INSERT INTO cdc_database.cdc_source VALUES(2, 'class1', 85);
INSERT INTO cdc_database.cdc_source VALUES(3, 'class2', 85);
INSERT INTO cdc_database.cdc_source VALUES(4, 'class2', 90);
DELETE FROM cdc_database.cdc_source WHERE id = 1;
UPDATE cdc_database.cdc_source SET score = 100 where id = 3;
2. Log in to the DLC console, click Data Explore, and query the target table data with the following SQL statements:
SELECT * FROM cdc_database.cdc_sink;
SELECT * FROM cdc_database.cdc_compute;

Complete Sample Code Reference Example


Example 1


hdfs-site.xml configurations required for accessing TC-Iceberg managed storage:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>
<name>fs.lakefs.impl</name>
<value>org.apache.hadoop.fs.lakefs.CosFileSystem</value>
</property>

<property>
<name>fs.cosn.impl</name>
<value>org.apache.hadoop.fs.CosFileSystem</value>
</property>

<!-- Correctly configured availability zone -->
<property>
<name>fs.cosn.bucket.region</name>
<value>ap-xxx</value>
</property>

<property>
<name>fs.cosn.posix_bucket.fs.impl</name>
<value>org.apache.hadoop.fs.CosFileSystem</value>
</property>

<property>
<name>fs.cosn.credentials.provider</name>
<value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value>
</property>

<property>
<name>qcloud.dlc.endpoint</name>
<value>dlc.tencentcloudapi.com</value>
</property>

<property>
<name>fs.cosn.posix_bucket.fs.userinfo.region</name>
<value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value>
</property>

<!-- Configure user's Secret ID -->
<property>
<name>fs.cosn.posix_bucket.fs.userinfo.secretId</name>
<value>xxx</value>
</property>

<!-- Configure user's Secret KEY -->
<property>
<name>fs.cosn.posix_bucket.fs.userinfo.secretKey</name>
<value>xxx</value>
</property>

</configuration>


Example 2


The pom.xml file includes Flink Demo task dependencies.
<properties>
<flink.version>1.16.3</flink.version>
</properties>

<dependencies
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>


Example 3


Flink SQL example code.
public class FlinkSQLDemo {

public static void main(String[] args) {
// Create an execution environment and configure checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// Replace with the SQL to be executed.
String sql = "SQL to be excuted...";
tEnv.executeSql(sourceSql);
}

}


Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback