tencent cloud

Consumption Demo - Stream Processing
Last updated: 2025-12-03 18:30:49
Consumption Demo - Stream Processing
Last updated: 2025-12-03 18:30:49
This article introduces how to use the stream processing compute framework Flink and Tencent Cloud Oceanus to consume logs.

Tencent Cloud Oceanus Consumption CLS Logs

1. Create a SQL job in the Oceanus console.
2. Write SQL statements.
CREATE TABLE `nginx_source`
( # Fields in the log
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka partition
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
# cls kafka protocol consumption topic name provided by the console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console
'topic' = 'Your consumption topics',
# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, fill in according to your actual situation
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# Replace with your consumer group name
properties.group.id' = 'Consumer group ID',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
# username is logset ID, for example ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. Use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations. Note that jaas.config must end with a semicolon; an error will be reported if not filled in.
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);


Consumption of CLS Log by Flink

Enabling Kafka Consumption Protocol for Logs

See Operation Steps to enable Kafka consumption protocol for logs and obtain the service domain and Topic for consumption.

Confirming flink-connector-kafka dependency

Once you've ensured that the flink lib contains flink-connector-kafka, you can directly register kafka tables in sql to use them. The dependencies are as follows:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.14.4</version>
</dependency>

Registering a Flink table

CREATE TABLE `nginx_source`
(
#Fields in the log
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
# kafka partition
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
# cls kafka protocol consumption topic name provided by the console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console
'topic' = 'Your consumption topics',
# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, fill in according to your actual situation
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# Replace with your consumer group name
properties.group.id' = 'Consumer group ID',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
# username is logset ID, for example ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. It is recommended to use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations. Note that the jaas.config must end with a semicolon; an error will be reported if not filled in.
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);
Note:
Flink version sasl authentication configuration corresponds to package:
Version 1.16 or earlier: org.apache.kafka.common.security.plain.PlainLoginModule
Version 1.16 or later: org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule

Querying

After successful execution, you can use the statement for query.
select count(*) , host from nginx_source group by host;



Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback