tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

数据处理规则说明

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:56:36

概览

在通过 CKafka 连接器处理数据流入流出任务时,通常需要对数据进行简易的清洗操作,如格式化原始数据、解析特定字段、数据格式转换等等。开发者往往需要自己搭建一套数据清洗的服务(ETL)。
Logstash 是一款免费且开放的服务器端数据处理管道,能够从多个数据源采集数据,转换数据,然后将数据发送到相应的“存储库”中。 logstash 拥有丰富的过滤器插件,这使得 logstash 成为了被广泛使用的一款功能强大的数据转换工具。
然而搭建、配置、维护自己的 logstash 服务会增大开发和运维的难度,为此 CKafka 提供了一套对标 logstash 的数据处理服务,开发者仅需通过控制台交互界面就可以新建自己的数据处理任务。数据处理服务允许用户编辑相应的数据处理规则,支持构建链式处理,同时可以预览数据处理的结果,帮助用户轻松高效的构建一套数据处理服务,满足数据清洗和转换的需求。




功能对标清单

Logstash
连接器数据处理服务
功能
Codec.json
Filter.grok
Filter.mutate.split
Filter.date
Filter.json
Filter.mutate.convert
Filter.mutate.gsub
Filter.mutate.strip
Filter.mutate.join
Filter.mutate.rename
Filter.mutate.update
Filter.mutate.replace
Filter.mutate.add_field
Filter.mutate.remove_field
Filter.mutate.copy
Filter.mutate.merge

TODO
Filter.mutate.uppercase

TODO
Filter.mutate.lowercase

TODO

操作方法介绍

数据解析

通过选择相应的数据解析模式 ,并一键单击即可预览:




日期格式处理

1. 输入带日期格式的原始数据,以下为一个示例。
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131
}
2. 解析结果如下:

3. CKafka 连接器处理方式:
3.1 通过预设系统当前时间给某字段赋值

3.2 在数据处理模块通过 处理 value 功能来对日期数据进行处理。



处理模式选择转换时间格式,选择好时间格式、时区和日期格式,并确认

4. 单击测试,可以看到转换后的时间格式。





解析内部 JSON 结构

1. 输入带嵌套 JSON 格式的原始数据,以下为一个示例。
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131
}
2. 解析结果如下:



3. 通过对该字段选择 MAP 操作来对其进行解析,从而把特定字段解析为 JSON 格式:




数据修改

输入原始数据,以下为一个示例。
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131
}
解析结果如下:



连接器处理方式如下:
方式一:通过选择相应的处理 value 功能一键定义规则。



方式二:通过选择数据类型一键更改相应字段的数据格式。



更改前:



更改后:



方式三:通过 JSONPath 语法实现 join 的拼接功能。如使用 $.concat($.data.Response.SubnetSet[0].VpcId, \\"#\\", $.data.Response.SubnetSet[0].SubnetId, \\"#\\", $.data.Response.SubnetSet[0].CidrBlock) 语法拼接 Vpc 和子网的属性,并且通过 # 字符加以分割。关于 JSONPath 语法的详细介绍请参见 JsonPath 说明




字段修改

在通过 CKafka 连接器进行数据处理的过程中,可以使用多种方式对解析后的数据字段进行编辑修改,以获得您理想的数据。如:
在 KEY 栏可以对字段名称进行修改。
在 VALUE 栏可以选择复制某字段的值。
在下方单击添加可以新增字段。
在右侧单击

按钮可以删除字段。





实际案例演示

案例1:多级字段解析

输入 message:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131,
}
目标 message :
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"hostname": "test-server",
"ip": "6.6.6.6",
"userId": 888,
"userName": "testUser"
}
连接器配置方法:
1.1 处理链 1 配置如下:



1.2 处理链 1 结果如下:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"hostname": "test-server",
"ip": "6.6.6.6"
}
1.3 处理链 2 配置如下:



1.4 处理链 2 结果如下:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"hostname": "test-server",
"ip": "6.6.6.6",
"userId": 888,
"userName": "testUser"
}

案例2:非 JSON 数据解析

输入 message :
region=Shanghai$area=a1$server=6.6.6.6$user=testUser$timeStamp=2022-02-26T22:25:33.210Z
目标 message:
{
"region": "Shanghai",
"area": "a1",
"server": "6.6.6.6",
"user": "testUser",
"timeStamp": "2022-02-27 06:25:33",
"processTimeStamp": "2022-06-27 11:14:49"
}
连接器配置方法:
1.1 使用分隔符 $ 对原始 message 进行解析



1.2 初步解析结果:
{
"0": "region=Shanghai",
"1": "area=a1",
"2": "server=6.6.6.6",
"3": "user=testUser",
"4": "timeStamp=2022-02-26T22:25:33.210Z"
}
1.3 使用分隔符 = 对结果二次解析:



1.4 二次解析结果:
{
"0": "region=Shanghai",
"1": "area=a1",
"2": "server=6.6.6.6",
"3": "user=testUser",
"4": "timeStamp=2022-02-26T22:25:33.210Z",
"0.region": "Shanghai",
"1.area": "a1",
"2.server": "6.6.6.6",
"3.user": "testUser",
"4.timeStamp": "2022-02-26T22:25:33.210Z"
}
1.5 对字段进行编辑、删减,调整时间戳格式,并新增当前系统时间字段:





最终结果:
{
"region": "Shanghai",
"area": "a1",
"server": "6.6.6.6",
"user": "testUser",
"timeStamp": "2022-02-27 06:25:33",
"processTimeStamp": "2022-06-27 11:14:49"
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈