新功能发布记录
Broker 版本升级记录
公告
数据同步 | 实例级别数据同步 | Topic 级别数据同步 |
数据源 | 内网连接:腾讯云 CKafka 集群 公网连接:腾讯云 CKafka 集群/自建 Kafka 集群 跨网连接:自建 Kafka 集群/其他云厂商集群 | 腾讯云 CKafka 集群 |
数据目标 | 内网连接:腾讯云 CKafka 集群 公网连接:腾讯云 CKafka 集群/自建 Kafka 集群 | 腾讯云 CKafka 集群 |
操作步骤 | 1. 新建数据源连接 2. 新建数据目标连接 3. 新建数据同步任务 4. 配置数据源 5. 配置数据目标 6. 查看数据同步进度 | 1. 新建数据复制任务 2. 配置数据源 3. 处理数据 4. 配置数据目标 5. 查看数据同步进度 |
限制类型 | 限制项 | 限制数量 |
连接维度 | 单个 uin 的连接数量 | 150 |
任务维度 | 单个 uin 的任务数量 | 150 |
| 单个任务的并发数 | min(数据源端的Topic总分区数,20) |
数据类型 | 规则说明 |
同步元数据 | 分为初始化同步和定时同步两个环节。 初始化同步:任务启动时,会查看下游是否存在和上游对应的Topic,如果没有,则会在下游新建Topic(尽可能和上游的配置一样);如果下游本身就存在对应的Topic,则不会触发初始化同步。 定时同步:任务启动之后,会周期性(3分钟)地将上游的部分元数据配置同步到下游。 说明: 定时同步暂不支持同步副本数。 定时同步分区数时,分区数只能单向增加不能减少,如果下游实例分区已经大于上游,则不会同步分区数。 出于稳定性考虑,目标 Topic 的 retention.ms 和 retention.bytes 这两项元数据的值为-1时(注意:-1为 Kafka 内部定义,代表无限存储)才会同步,其他情况下这两项元数据均不会定时同步。 对于 Topic 级别的配置,出于稳定性考虑,默认情况下,新增的 Topic 只在任务的初始化状态下,元数据会完整同步一次,后续 Topic 配置发生变更则不再同步到下游 原因:用户在不知道存在数据同步任务的情况下,若修改了上游 Topic 配置(如缩短消息时间),此时下游同步该变更,会导致下游在消息未消费情况下,出现大量丢失数据的情况。 同步变更配置的解决方案:如果客户需要变更存量任务中的 Topic 并且同步配置,建议客户手动同步修改上下游的 Topic 配置,避免出现丢数据或稳定性问题。 |
同步消息数据 | 将上游 Kafka 中存储的消息数据同步到下游 Kafka 对应的 Topic 中,如果开启了同步到相同分区,则消息会固定同步到下游对应的相同分区中 |
同步消费位点 | 将上游 Kafka 中存储的消息数据同步到下游 Kafka 对应 Topic 中时,同时同步相关的消费组和消费组对该 Topic 所提交的 Offset 信息,注意该 Offset 是映射后的对应关系。 |
同步数据类型 | 参数 | 限制条件 | 默认值 |
元数据 | 分区数 | 1. 当需要同步元数据的时候,有两个参数不符合条件会无法同步: 目标 Topic 分区数大于源 Topic 分区数,分区数不能同步。 目标 Topic 与源 Topic 副本数不一致,副本数不能同步。 2. 源实例的 Topic 名称长度超过128位时, 目标创建的 Topic 会取前128位作为该 Topic 的名称。 | / |
| 副本数 | | / |
| retention.ms | | 604800000(7天) |
| cleanup.policy | | delete |
| min.insync.replicas | | 1 |
| unclean.leader.election.enable | | false |
| segment.ms | | 604800000 |
| retention.bytes | | 默认值取决于 Kafka 配置 |
| max.message.bytes | | 1048588 |
| 消费组 | 若目标实例关闭了自动创建 ConsumerGroup,则消费分组无法进行同步。 | / |
消息数据 | / | 支持消息数据同步到相同分区。 | / |
消费位点 | / | 1. 当需要同步消费位点的时候,如下情况可能导致位点对齐不准: 源端和目标端存在同名 Topic,目标端 Topic 存在其它的消息写入方。 源端和目标端存在同名 Topic,任务重新建立。由于每次任务建立时,数据都会将新建任务启动时读取的最新位置同步至下游,不同步历史数据,历史数据会在这种情况被丢弃。 2. 0.10.2.1 及以下版本实例不支持同步消费位点,若数据同步任务上下游中包含此版本,则不支持创建同步消费位点的任务。 | / |
Kafka 类型 | 说明 |
腾讯云 CKafka | 如果客户端和 CKafka 集群部署在同一个私有网络 VPC 内,则网络默认互通。此时可以直接在所属地域和 CKafka 实例的下拉框中选择提前创建好的 CKafka 实例。 |
公网连接 | 如果客户端和 Kafka 集群部署在不同网络环境,可以使用公网实现跨网络生产和消费。公网连接支持自建 Kafka 集群和腾讯云 CKafka 集群。使用公网连接时,建议配置安全策略保障数据传输安全。 Broker 地址:输入用户 Kafka 的 Broker 地址。如果有多个 Broker,仅需输入其中 1 个 Broker 的 IP 地址和端口,形式如 127.0.0.1:5664,连接器会打通所有 Broker 的网络。在数据同步期间,必须保证填入的 IP 可用。 ACL 配置:如果连接的源集群开启了 ACL,需要在此处配置相应的访问信息(ACL 用户名和密码)。ACL 配置仅支持静态配置用户(PLAIN 机制),暂不支持动态配置用户(SCRAM 机制)。 |
跨网连接 | 跨网连接可以实现将不同云厂商 Kafka 数据和元数据同步到腾讯云 CKafka,同时也支持将自建 Kafka 数据和元数据同步到腾讯云 CKafka。 VPC 网络:选择客户自建 Kafka 集群的 VPC ID 或者跨云打通的 VPC ID。 子网:选择客户自建 Kafka 集群的 VPC 子网或者跨云打通的 VPC 子网。 云联网 ID:跨云同步时通常需要经过云联网打通专线。 跨云资源 ID:通常为用户连接器上游的实例 ID,标识跨云同步链路中的唯一资源。新建连接时,将自动探测该资源 ID 下的节点信息,执行网络打通,并关联相关的路由规则。删除该连接时,该资源 ID 下自动打通的路由规则将会删除。 Broker 地址:输入用户 Kafka 的 Broker 地址。如果有多个 Broker,仅需输入其中 1 个 Broker 的 IP 地址和端口,形式如 127.x.x.1:5664,连接器会打通所有 Broker 的网络。在数据同步期间,必须保证填入的 IP 可用。 ACL 配置:如果连接的源集群开启了 ACL,需要在此处配置相应的访问信息(ACL 用户名和密码)。ACL 配置仅支持静态配置用户(PLAIN 机制),暂不支持动态配置用户(SCRAM 机制)。 |
参数 | 说明 |
任务名称 | 输入任务名称,用于区分不同的数据同步任务,任务名称需符合命名规则:只能包含字母、数字、下划线、“-”、“.”。 |
数据源类型 | 选择整个 Kafka 实例。 |
连接所属地域 | 在下拉框中选择提前配置好的数据源连接所在的地域。 |
Kafka 连接 | 在下拉框中选择提前配置好的数据源连接。 |
同步数据类型 | 只同步元数据:同步源实例内 Topic 和 Consumer Group 的元数据。 同步元数据和消息数据:同步源实例内 Topic 和 Consumer Group 的元数据,以及 Topic 内的消息数据。 同步元数据、消息数据和消费位点:同步源实例内 Topic 和 Consumer Group 的元数据、 Topic 内的消息数据、以及源实例消费组的消费位点。源实例消费组的消费位点更新会同步更新到目标实例的同名消费组。 |
起始位置 | 若选择了同步元数据和消息数据或者同步元数据、消息数据和消费位点,需要配置 Topic offset(偏移量),用于设置转储时对历史消息的处理策略。支持两种方式: 从最新位置开始消费:即最大偏移量,从当前最新的数据开始消费(跳过历史消息)。 从最开始位置开始消费:即最小偏移量,从最早的数据开始消费(同步全部历史消息)。 |
Topic 同步范围 | 若选择了同步元数据和消息数据或者同步元数据、消息数据和消费位点,需要配置同步数据的 Topic 范围。 同步元数据和消息数据:支持选择全部 Topic 或者指定部分 Topic。若选择部分 Topic,需通过正则表达式匹配 Topic,表达式验证通过后可以点击进入下一步。 注意: 当使用正则表达式同步部分 Topic 时,不会同步对应 Topic 的消费组。 同步元数据、消息数据和消费位点:只支持选择全部 Topic。 |



参数 | 说明 |
任务名称 | 输入任务名称,用于区分不同的数据同步任务,任务名称需符合命名规则:只能包含字母、数字、下划线、“-”、“.”。 |
数据源类型 | 选择 CKafka 实例内 Topic。 |
数据源所属地域 | 在下拉列表中,选择数据源实例所属的地域。 |
CKafka 实例 | 在下拉列表中,选择已准备好的数据源 CKafka 实例。 |
源 Topic | 在下拉列表中,选择已准备好的数据源 Topic。 若数据源实例设置了 ACL 策略,请确保具备选中的数据源 Topic 的读写权限。 |
起始位置 | 配置 Topic offset(偏移量),用于设置转储时对历史消息的处理策略。支持如下三种方式: 从最新位置开始消费:即最大偏移量,从当前最新的数据开始消费(跳过历史消息)。 从最开始位置开始消费:即最小偏移量,从最早的数据开始消费(处理全部历史消息)。 从时间点位置开始消费:从用户自定义的时间点开始消费。 |


解析模式 | 说明 |
JSON | 解析标准 JSON 格式的数据,支持嵌套字段,输出格式为键值对。 |
分隔符 | 按指定分隔符解析非结构化文本,分隔符支持 空格、制表符、,、;、|、:、自定义。 |
正则提取 | 说明: 当输入的正则表达式中含有类似 (?<name>expr) 或者 (?P<name>expr) 捕获组时,会将正则表达式视为模式串进行匹配,当消息成功匹配模式串时将解析捕获组内容;否则,会将整个输入的正则表达式视为捕获组,并提取消息中所有匹配的内容。 |
JSON 对象数组-单行输出 | 数组内每个对象的格式一致,解析时仅解析第一个对象,输出结果为单条的 JSON,是 map 类型。 |
JSON 对象数组-多行输出 | 数组内每个对象的格式一致,解析时仅解析第一个对象,输出结果为数组类型。 |

操作 | 说明 |
映射 | 可以选择已有的 KEY,最终输出的 VALUE 值由指定的 KEY 映射而来。 |
JSONPATH | |
系统预设-当前时间 | 可以选择系统预设的 VALUE ,目前支持 DATE(时间戳)。 |
自定义 | 可以输入自定义 VALUE。 |

输出行内容 | 说明 |
VALUE | 只输出上方测试结果中的 VALUE 值,用分隔符隔开。VALUE 间分隔符默认为“无”选项。 |
KEY&VALUE | 输出上方测试结果中的 KEY 和 VALUE,KEY 和 VALUE 间分隔符和 VALUE 间分隔符均不能为“无”。 |
处理方式 | 说明 |
丢弃 | 适合用于生产环境,任务运行失败时将会忽略当前失败消息。建议使用 "保留" 模式测试无误后,再将任务编辑成 "丢弃" 模式用于生产。 |
保留 | 适合用于测试环境,任务运行失败时将会终止任务不会重试,并且在事件中心中记录失败原因。 |
投递到死信队列 | 需指定死信队列 Topic,适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因投递到指定的 CKafka Topic 中。 |


文档反馈