tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

消息队列 Upsert Kafka

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 11:20:52

介绍

Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
作为 Source,Upsert Kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 Sink,Upsert Kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
支持
1.16
支持

DDL 定义

CREATE TABLE kafka_upsert_sink_table (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
-- 定义 Upsert Kafka 参数
'connector' = 'upsert-kafka', -- 选择 connector
'topic' = 'topic', -- 替换为您要消费的 Topic
'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址
'key.format' = 'json', -- 定义 key 数据格式
'value.format' = 'json' -- 定义value 数据格式
);
说明
Upsert Kafka 确保在 DDL 中定义主键。

WITH 参数

参数
是否必选
默认值
数据类型
描述
connector
必选
(none)
String
指定要使用的连接器,Upsert Kafka 连接器使用:'upsert-kafka'
topic
必选
(none)
String
用于读取和写入的 Kafka topic 名称。
properties.bootstrap.servers
必选
(none)
String
以逗号分隔的 Kafka brokers 列表。
properties.*
可选
(none)
String
该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档 中的参数名。
Flink 会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。例如,您可以通过 'properties.allow.auto.create.topics' = 'false 来禁止自动创建 topic。 但是,某些选项,例如 'key.deserializer''value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
key.format
必选
(none)
String
用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv''json''avro'
key.fields-prefix
optional
(none)
String
为'key.fields'的所有字段定义自定义前缀,以避免与 'value.fields' 字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,则表 schema 和 'key.fields' 将使用前缀名称。构建'key.fields'格式的数据类型时候,将删除前缀并使用 key format 中非前缀名称。请注意,此选项要求 'value.fields-include'必须设置为 'EXCEPT_KEY'
value.format
必选
(none)
String
用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv''json''avro'
value.fields-include
可选
'ALL'
String
控制哪些字段应该出现在 value 中。可取值:
ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。
EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
sink.parallelism
可选
(none)
Integer
定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
sink.buffer-flush.max-rows
可选
0
Integer
缓存刷新前,最多能缓存多少条记录。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows''sink.buffer-flush.interval' 两个选项为大于零的值。
sink.buffer-flush.interval
可选
0
Duration
缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。
可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows''sink.buffer-flush.interval' 两个选项为大于零的值。

代码示例

CREATE TABLE `kafka_json_source_table` (
`id` INT,
`name` STRING
) WITH (
-- 定义 Kafka 参数
'connector' = 'kafka',
'topic' = 'Data-Input', -- 替换为您要消费的 Topic
'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID

-- 定义数据格式 (JSON 格式)
'format' = 'json',
'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。
'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
);

CREATE TABLE kafka_upsert_sink_table (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
-- 定义 Upsert Kafka 参数
'connector' = 'upsert-kafka', -- 选择 connector
'topic' = 'topic', -- 替换为您要消费的 Topic
'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址
'key.format' = 'json', -- 定义 key 数据格式
'value.format' = 'json' -- 定义value 数据格式
);

-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO kafka_upsert_sink_table
SELECT * FROM kafka_json_source_table;

SASL 认证授权

SASL/PLAIN 用户名密码认证授权

1. 参考 消息队列 CKafka - 配置 ACL 策略,设置 Topic 按用户名密码访问的 SASL_PLAINTEXT 认证方式。
2. 参考 消息队列 CKafka - 添加路由策略,选择 SASL_PLAINTEXT 接入方式,并以该接入方式下的网络地址访问 Topic。
3. 作业配置 with 参数。
CREATE TABLE `YourTable` (
...
) WITH (
...
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
...
);
说明
username实例 ID + # + 刚配置的用户名password 是刚配置的用户密码。

SASL/GSSAPI Kerberos 认证授权

腾讯云 CKafka 暂时不支持 Kerberos 认证,您的自建 Kafka 如果开启了 Kerberos 认证,可参考如下步骤配置作业。
1. 获取您的自建 Kafka 集群的 Kerberos 配置文件,如果您基于腾讯云 EMR 集群自建,获取 krb5.conf、emr.keytab 文件,路径如下。
/etc/krb5.conf
/var/krb5kdc/emr.keytab
2. 对步骤1中获取的文件打 jar 包。
jar cvf kafka-xxx.jar krb5.conf emr.keytab
3. 校验 jar 的结构(可以通过 vim 命令查看 vim kafka-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
4. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
5. 获取 kerberos principal,用于作业 高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab

# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
6. 作业 with 参数配置。
CREATE TABLE `YourTable` (
...
) WITH (
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.sasl.kerberos.service.name' = 'hadoop',
...
);
说明:
参数 properties.sasl.kerberos.service.name 的值必须与您选取的 principal 匹配,如果您选择的为 hadoop/${IP}@EMR-OQPO48B9,那么取值为 hadoop。
7. 作业 高级参数 配置。
security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FD
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
security.kerberos.login.contexts: KafkaClient
fs.hdfs.hadoop.security.authentication: kerberos


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈