tencent cloud

日志服务

动态与公告
产品动态
公告
新手指引
产品简介
产品概述
产品优势
地域和访问域名
规格与限制
基本概念
购买指南
计费概述
产品定价
按量计费(后付费)
欠费说明
清理日志服务资源
成本优化
常见问题
快速入门
一分钟入门指南
入门指南
使用 Demo 日志快速体验 CLS
操作指南
资源管理
权限管理
日志采集
指标采集
日志存储
指标存储
检索分析(日志主题)
检索分析(指标主题)
仪表盘
数据处理
投递与消费
监控告警
云产品中心
DataSight 独立控制台
历史文档
实践教程
日志采集
检索分析
仪表盘
监控告警
投递和消费
成本优化
开发者指南
通过 iframe 内嵌 CLS(旧方案)
通过 Grafana 使用 CLS
API 文档
History
Introduction
API Category
Making API Requests
Topic Management APIs
Log Set Management APIs
Index APIs
Topic Partition APIs
Machine Group APIs
Collection Configuration APIs
Log APIs
Metric APIs
Alarm Policy APIs
Data Processing APIs
Kafka Protocol Consumption APIs
CKafka Shipping Task APIs
Kafka Data Subscription APIs
COS Shipping Task APIs
SCF Delivery Task APIs
Scheduled SQL Analysis APIs
COS Data Import Task APIs
Data Types
Error Codes
常见问题
健康监测问题解释
采集相关
检索分析相关
其他问题
服务等级协议
CLS 政策
隐私协议
数据处理和安全协议
联系我们
词汇表

消费 Demo-流处理

PDF
聚焦模式
字号
最后更新时间: 2025-12-03 18:30:49
本文介绍使用流处理计算框架 Flink、腾讯云 Oceanus 来消费日志。

腾讯云 Oceanus 消费 CLS 日志

1. 在 Oceanus 控制台 新建 SQL 作业。如下图所示:



2. 编写 SQL 语句。
CREATE TABLE `nginx_source`
( # 日志中字段
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka 分区
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'topic' = '您的消费主题',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# 请替换为您的消费组名称
'properties.group.id' = '您的消费组名称',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
# 用户名是日志集合ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);


Flink 消费 CLS 日志

开启日志的 Kafka 消费协议

参见 操作步骤 开启日志的 Kafka 消费协议,并获取消费的服务域名和 Topic。

确认 flink-connector-kafka 依赖

确保 flink lib 中有 flink-connector-kafka 后,直接在 sql 中注册 Kafka 表即可使用。依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.14.4</version>
</dependency>

注册 flink 表

CREATE TABLE `nginx_source`
(
#日志中的字段
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
# kafka 分区
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'topic' = '您的消费主题',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# 请替换为您的消费组名称
'properties.group.id' = '您的消费组名称',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
# 用户名是日志集合ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);
注意:
flink 版本 sasl 认证配置对应包:
1.16版本以下:org.apache.kafka.common.security.plain.PlainLoginModule。
1.16版本及以上:org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule。

查询使用

执行成功后,即可查询使用。
select count(*) , host from nginx_source group by host;



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈