tencent cloud

日志服务

动态与公告
产品动态
公告
新手指引
产品简介
产品概述
产品优势
地域和访问域名
规格与限制
基本概念
购买指南
计费概述
产品定价
按量计费(后付费)
欠费说明
清理日志服务资源
成本优化
常见问题
快速入门
一分钟入门指南
入门指南
使用 Demo 日志快速体验 CLS
操作指南
资源管理
权限管理
日志采集
指标采集
日志存储
指标存储
检索分析(日志主题)
检索分析(指标主题)
仪表盘
数据处理
投递与消费
监控告警
云产品中心
DataSight 独立控制台
历史文档
实践教程
日志采集
检索分析
仪表盘
监控告警
投递和消费
成本优化
开发者指南
通过 iframe 内嵌 CLS(旧方案)
通过 Grafana 使用 CLS
API 文档
History
Introduction
API Category
Making API Requests
Topic Management APIs
Log Set Management APIs
Index APIs
Topic Partition APIs
Machine Group APIs
Collection Configuration APIs
Log APIs
Metric APIs
Alarm Policy APIs
Data Processing APIs
Kafka Protocol Consumption APIs
CKafka Shipping Task APIs
Kafka Data Subscription APIs
COS Shipping Task APIs
SCF Delivery Task APIs
Scheduled SQL Analysis APIs
COS Data Import Task APIs
Data Types
Error Codes
常见问题
健康监测问题解释
采集相关
检索分析相关
其他问题
服务等级协议
CLS 政策
隐私协议
数据处理和安全协议
联系我们
词汇表
文档日志服务实践教程投递和消费使用 Flink 消费 CLS 日志

使用 Flink 消费 CLS 日志

PDF
聚焦模式
字号
最后更新时间: 2026-01-07 16:23:32
本文详细描述了如何使用 Flink 实时消费 CLS 日志,使用 Flink-sql 分析 Nginx 日志数据,计算 Web 端的 PV/UV 值,并将结果数据实时写入到自建的数据库 MySQL 数据库。
文中使用的组件/应用及版本如下:
技术组件
版本
Nginx
1.22
CLS 日志服务
-
Java
openjdk version "1.8.0_232"
Scala
2.11.12
Flink sql
flink-1.14.5
MySQL
5.7

操作步骤

步骤1:安装腾讯云 Nginx 网关

1. 购买腾讯云主机 CVM,请参考 通过购买页创建实例
2. Nginx 安装,请参考LINUX安装nginx详细步骤。
3. 成功通过浏览器访问 nginx,并可以下图说明安装成功:



步骤2:采集 Nginx 日志到腾讯云 CLS 日志服务

2. CLS 日志服务采集终端 Loglistener的安装,Loglistener 类似于开源组件 Beats,用来采集日志数据的 Agent。
3. 日志主题开启索引后,可以正常查询到 Nginx 的日志数据,如下图所示:
4. 最后,在 CLS 控制台 开启 kafka 消费,使用 Kafka 协议消费功能,您可以将一个日志主题,当作一个 Kafka Topic 来消费。本文就是使用流计算框架 Flink,实时消费 Nginx 日志数据,将实时计算的结果写入到 MySQL。

步骤3:搭建 MySQL 数据库

参考文档:创建 MySQL 实例
1. 登录数据库:
mysql -h 172.16.1.1 -uroot
2. 新建需要使用的 database 和表,例子中的 database 名为 flink_nginx,表名为 mysql_dest。
create database if not exists flink_nginx;
create table if not exists mysql_dest(
ts timestamp,
pv bigint,
uv bigint
);
1. 部署 Flink 时,建议使用如下版本,否则可能会安装不成功。
2. 安装 Flink 1.14.15 ,并进入 SQL 界面,从 Apache Flink 官网 下载 Flink 二进制代码包并开始安装。
# 解压缩 Flink 二进制包
tar -xf flink-1.14.5-bin-scala_2.11.tgz
cd flink-1.14.5

# 下载 kafka 相关依赖
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.14.5/flink-connector-kafka_2.11-1.14.5.jar
mv flink-connector-kafka_2.11-1.14.5.jar lib
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jar
mv kafka-clients-2.4.1.jar lib

# 下载 MySQL 相关依赖
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.14.5/flink-connector-jdbc_2.11-1.14.5.jar
mv flink-connector-jdbc_2.11-1.14.5.jar lib
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.11/mysql-connector-java-8.0.11.jar
mv mysql-connector-java-8.0.11.jar lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-common/1.14.5/flink-table-common-1.14.5.jar
mv flink-table-common-1.14.5.jar lib

# 启动 Flink
bin/start-cluster.sh
bin/sql-client.sh
3. 当出现以下画面则说明安装成功。注意默认的网页端口是8081。






1. 在 SQL Client 界面中,执行如下 SQL:
-- 建数据源表消费 kafka 数据
CREATE TABLE `nginx_source`
(
`remote_user` STRING, -- 日志中字段,客户端名称
`time_local` STRING, -- 日志中字段,服务器本地时间
`body_bytes_sent` BIGINT, -- 日志中字段,发送给客户端的字节数
`http_x_forwarded_for` STRING, -- 日志中字段,当前端有代理服务器时,记录客户端真实 IP 地址的配
`remote_addr` STRING, -- 日志中字段,客户端 IP 地址
`protocol` STRING, -- 日志中字段,协议类型
`status` INT, -- 日志中字段,HTTP 请求状态码
`url` STRING, -- 日志中字段,url 地址
`http_referer` STRING, -- 日志中字段,访问来源的页面链接地址
`http_user_agent` STRING, -- 日志中字段,客户端浏览器信息
`method` STRING, -- 日志中字段,HTTP 请求方法
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka分区
`ts` AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'YourTopic', -- cls kafka协议消费控制台给出的的主题名称,例如out-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX
'properties.bootstrap.servers' = 'kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096', -- cls kakfa协议消费控制台给出的服务地址,例子中是广州地域的外网消费地址,请按照您的实际情况填写
'properties.group.id' = 'kafka_flink', -- kafka 消费组名称
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="your username" password="your password";',--用户名是日志主题所属的日志集合ID,例如ca5cXXXX-dd2e-4ac0-af12-92d4b677d2c6,密码是用户的secretid#secrectkey组合的字符串,比AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'

);

--- 建立目标表,写入mysql
CREATE TABLE `mysql_dest`
(
`ts` TIMESTAMP,
`pv` BIGINT,
`uv` BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://11.150.2.1:3306/flink_nginx?&serverTimezone=Asia/Shanghai', -- 注意这边的时区设置
'username'= 'username', -- mysql账号
'password'= 'password', -- mysql密码
'table-name' = 'mysql_dest' -- mysql表名
);

--- 查询 kafka 数据源表,计算后写入 mysql 目标表
INSERT INTO mysql_dest (ts,uv,pv)
SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) start_ts, COUNT(DISTINCT remote_addr) uv,count(*) pv
FROM nginx_source
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
2. 在 Flink 的任务监控页,我们可以看到任务的监控数据:


3. 进入 MySql 数据库,即可看到计算 PV、UV 的结果数据实时写入:



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈