tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

方案4:迁移未消费数据

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 16:21:51

简介

客户在迁移上云过程中,需要把旧集群上未消费完的消息迁移到新集群对应的 topic 中时。可参考本教程进行操作,即可将旧集群的未消费数据同步到新集群中。

前提条件

1. 保证原集群所有消费/生产已停止。
2. 保证原集群待迁移的消息保留时间足够长,即在迁移过程中避免topic消息过期自动删除。
3. 同时迁移脚本是 Python 脚本,需要安装 Python2,且 python2 版本>2.7.1,推荐2.7.5。
4. 下载迁移工具 migrateToCkafkaTool。工具包目录如下,进入 migrateToCkafkaTool 目录下,修改 data-migrate.py 文件的配置后,执行 python data-migrate.py 即可。


工具原理

脚本会扫描老集群的所有 group 列表,并取出 group 订阅中的且仍有未消费消息的 topic 列表。脚本将取出未消费完 group 订阅 topic 的 group 提交位置topic 末端位置(如果一个 topic 被多个 group 订阅,那么 group 提交位置将取最小的那个)。然后将此区间位置的消息消费后再生产到新集群的对应 topic 分区中。

操作演示

1. 在目标集群新建对应的 Topic

假设原集群是:ckafka-47bd7goz, 目标新集群是:ckafka-kzamzogr。如下图所示:新集群已经建好了相同分区数的 topic。即 test1,test2,test3,test4。

原集群 ckafka-47bd7goz 有两个 group,test123-group 和 test34-group,它们分别订阅主题 test1,test2,test3 和 test3,test4。




2. 下载工具包

下载迁移工具后,打开脚本填入原集群和新集群地址配置后,checkFlag 设置为0,运行脚本先预检查一下将要迁移的 topic 和位置。


运行脚本后,将输出一些信息,当前目录会同时写入一份文本日志。




3. 查看输出信息

通过屏幕输出或者文本日志文件检查 Prepare to migrate 的信息,这是将要迁移的位点信息。

以 test3 为例,它同时被 test123-group 和 test34-group 订阅,检查原集群的订阅情况。






按照预定逻辑,一个 topic 被多个 group 订阅应该从提交最小的那个位置开始同步,即187800,检查输出信息与预期一致。


还有一种情况是原集群主题 test1 由于消息已经过期,但是 group 的提交位置在过期消息的区间,因此同步只会从 test1 还未过期的最早消息位置同步。



以 test1 的0分区为例,脚本会提示 test1 主题0分区的5226位置(topic 存活消息最小位置)已经超过 group 订阅的提交 offset 的3713位置(该位置的消息已过期),因此同步开始的位置设置到了5226。又由于5226同样也是该分区目前最大的 offset(该分区目前存活消息总数为0)代表无消息可迁移,因此输出 skip migrate...的文本信息,代表跳过迁移本分区的数据。



4. 开始迁移

经上步检查过输出的信息确认无误后,修改 checkFlag=1 开始迁移。



5. 检查迁移后数据是否数量一致

以 test3 为例,预期迁移 test123-group 未消费的76522条消息,已经全部成功写入新集群的test3主题中,迁移数据完成。



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈