tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

模拟上下游 Datagen Logger Blackhole

PDF
聚焦模式
字号
最后更新时间: 2024-04-19 12:23:42

调试 Source 和 Sink 介绍

当需要检验作业是否可以正常运行、逻辑是否正确时,为了减少外部系统的部署开销,以及避免干扰因素,我们可以使用一些调试专用的 Connector。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
支持
1.16
支持

Datagen Source

Datagen 是 Flink 自带的随机数据生成器,它可以作为数据源直接引用。详细的使用方式可参考 Flink 官方文档
下面是 Datagen 数据源的一个示例,它生成的数据含有两个字段:第一个字段 id 是一个随机数,第二个字段 name 是一个随机字符串。

DDL 定义

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

WITH 参数

参数
是否必选
默认参数
数据类型
描述
connector
必须
(none)
String
指定要使用的连接器,这里是 'datagen'。
rows-per-second
可选
10000
Long
每秒生成的行数,用以控制数据发出速率。
fields.#.kind
可选
random
String
指定 '#' 字段的生成器。可以是 'sequence' 或 'random'。
fields.#.min
可选
(Minimum value of type)
(Type of field)
随机生成器的最小值,适用于数字类型。
fields.#.max
可选
(Maximum value of type)
(Type of field)
随机生成器的最大值,适用于数字类型。
fields.#.length
可选
100
Integer
随机生成器生成字符的长度,适用于 char、varchar、string。
fields.#.start
可选
(none)
(Type of field)
序列生成器的起始值。
fields.#.end
可选
(none)
(Type of field)
序列生成器的结束值。

Logger Sink

Logger Sink 是腾讯云 Oceanus 提供的一个自定义 Logger 示例,它可以将最终的结果数据写入 TaskManager 的日志文件中,后续可以通过 Flink UI 或者控制台的日志面板查看这些日志的输出。
1. 使用 Logger Sink 前,需要先 下载 JAR 包如果您希望自定义输出逻辑,也可以自行修改并编译构建程序包
2. 将下载的 JAR 包上传到程序包,具体可参考 依赖管理
3. 在 SQL 作业中引用该程序包。

DDL 定义

CREATE TABLE logger_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'logger',
'print-identifier' = 'DebugData'
);

WITH 参数

参数
是否必选
数据类型
描述
connector
必须
String
指定要使用的连接器,这里是 'logger'。
print-identifier
可选
String
日志打印的前缀信息。
all-changelog-mode
可选
Boolean
启用后,不会过滤 -U 数据,可用来模拟 ClickHouse Collapsing 模式的数据流。
records-per-second
可选
Integer
可指定每秒输出多少条数据,起到限流的作用。
mute-output
可选
Boolean
丢弃所有输出,只做条数统计(类似增强版的 Blackhole Sink)。

监控指标说明

Oceanus 为 Logger Sink 增加了很多实用的统计指标。单击 Flink UI 的运行图中的 Logger Sink 算子,即可搜索并查看指标:
numberOfInsertRecords:获取输出的 +I 消息数。
numberOfDeleteRecords:获取输出的 -D 消息数。
numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
numberOfUpdateAfterRecords:获取输出的 +U 消息数。
Flink 内置了输出到 STDOUT(标准输出)的 Print Sink,但是由于打印的格式不符合 Oceanus 日志采集器的规则,目前不能很好地展示在界面上。我们建议使用上述 Logger Sink 来代替 Print Sink。

DDL 定义

CREATE TABLE `print_table` (
`id` INT,
`name` STRING
) WITH (
'connector' = 'print'
);

WITH 参数

参数
是否必选
默认值
数据类型
描述
connector
必选
(none)
String
指定要使用的连接器,此处应为 'print'。
print-identifier
可选
(none)
String
配置一个标识符作为输出数据的前缀。
standard-error
可选
false
Boolean
如果 format 需要打印为标准错误而不是标准输出,则为 True。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈