tencent cloud

Cloud Log Service

Consume Demo - Stream Processing

Download
Focus Mode
Font Size
Last updated: 2026-05-11 10:42:11
This article describes how to use Tencent Cloud Oceanus and Flink, two stream processing frameworks, to customize log consumption.

Tencent Cloud Oceanus

1. Open the Oceanus console, create a new dependency, and upload the flink-connector-cls-1.0.0.jar package.
2. Create a new SQL job. Note that the Flink version should be set to Flink-1.16.
3. Configure job parameters, select the jar package dependency uploaded in Step 1, and confirm. Other job parameters can be adjusted as needed.
4.
Edit the create table statement.
CREATE TABLE `cls_source_table` (
`key1` STRING,
`key2` STRING,
`key3` STRING,
`key4` STRING,
`__TIMESTAMP__` BIGINT
) WITH (
'connector' = 'cls',
'region' = 'region, such as ap-guangzhou',
'logset_id' = 'logset ID, only one logset is supported',
'topic_ids' = 'log topic ID, multiple topics should be separated by commas',
'access_id' = 'Your Secret_id, please go to Tencent Cloud CAM to obtain.',
'access_key' = 'Your Secret_key, please go to Tencent Cloud CAM to obtain.',
'consumer_group_name' = 'consumer group name, such as flink-connector-cls-5bXXXXb2',
'offset_start_time' = 'begin',
'internal' = 'true',
'format' = 'json',
'scan.parallelism' = '3'
);
Parameter description:
The table fields must match the log fields on the CLS side to take effect. Among them, __TIMESTAMP__ represents the log timestamp and can be used as needed. Other metadata fields such as __FILENAME__, __SOURCE__, and __TAG__ have not been added yet.
connector is a mandatory item and must be cls.
For more information, see Custom Consumer Parameters Description.
Note:
Currently, the connector itself supports job recovery based on checkpoints. This requires triggering the snapshot creation process every time it stops.
After recovery based on snapshots, consumption can continue from the checkpoint position to guarantee Exactly Once semantics. In the event of an abnormal crash where checkpoints are not properly triggered, recovery based on previous checkpoints may result in duplicate data, only guaranteeing At Least Once semantics.

Self-built Flink

1. Install OpenJDK version 8. (CentOS/RHEL systems)
1.1 To avoid conflicts, please check whether the system has Java installed.
java -version
If not installed, run the following command to install OpenJDK 8.
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
1.2 Verify the installation result (critical step to verify the correct version).
java -version
javac -version # Ensure javac is successfully installed (the devel package includes the compiler, required by Flink)
2. Taking Flink-1.16.3 version as an example, download, install, and start the Flink cluster.
2.1 Download Flink and configure permissions.
# 1. Download Flink 1.16.3 (scala 2.12 version, mainstream version)
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz

# 2. Extract it to the specified directory (recommended: /usr/local/)
sudo tar zxvf flink-1.16.3-bin-scala_2.12.tgz -C /usr/local/

# 3. Rename the directory (to simplify subsequent operations)
sudo mv /usr/local/flink-1.16.3 /usr/local/flink

# 4. Grant operation permissions to regular users (to avoid permission issues during subsequent startups)
sudo chown -R $USER:$USER /usr/local/flink
2.2 Start the Flink cluster.
# 1. Go to the Flink installation directory
cd /usr/local/flink

# 2. Start the cluster (single-node: 1 JobManager + 1 TaskManager)
./bin/start-cluster.sh
2.3 Verify the startup status (critical step).
Method 1: After startup, you can use the jps command included with JDK to view Java processes and check whether the TaskManager process has been started.
jps
Normal output should include:
- StandaloneSessionClusterEntrypoint (JobManager process)
- TaskManagerRunner (TaskManager process)
- Jps (its own process)

Method 2: Access the Web UI (to verify cluster accessibility).
Browser access: http://server IP address:8081. If you can see the Flink console and the number of Task Managers is 1, it indicates that the cluster has started successfully.
3. Start the SQL client to create tables, enter the Flink SQL CLI, and edit the table creation statement.
3.1 Go to the Flink installation directory (ensure the path is correct).
cd /usr/local/flink
3.2 Start the SQL Client (interactive command-line tool).
./bin/sql-client.sh
After successful startup, it will display the following content:

4. Stop the Flink cluster.
./bin/stop-cluster.sh
5. Go to the Flink Web UI to view task execution status: http://localhost:8081/#/overview.

Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback