tencent cloud

文档反馈

方案4:迁移未消费数据

最后更新时间:2024-01-09 14:49:40

    简介

    客户在迁移上云过程中,需要把旧集群上未消费完的消息迁移到新集群对应的 topic 中时。可参考本教程进行操作,即可将旧集群的未消费数据同步到新集群中。

    前提条件

    1. 保证原集群所有消费/生产已停止。
    2. 保证原集群待迁移的消息保留时间足够长,即在迁移过程中避免topic消息过期自动删除。
    3. 同时迁移脚本是 python 脚本,需要安装 python2,且 python2 版本>2.7.1,推荐2.7.5
    4. 下载迁移工具 migrateToCkafkaTool。工具包目录如下,进入 migrateToCkafkaTool 目录下,修改 data-migrate.py 文件的配置后,执行 python data-migrate.py 即可。
    
    
    

    工具原理

    脚本会扫描老集群的所有 group 列表,并取出 group 订阅中的且仍有未消费消息的 topic 列表。脚本将取出未消费完 group 订阅 topic 的 group 提交位置topic 末端位置(如果一个 topic 被多个 group 订阅,那么 group 提交位置将取最小的那个)。然后将此区间位置的消息消费后再生产到新集群的对应 topic 分区中。

    操作演示

    1. 在目标集群新建对应的 Topic

    假设原集群是:ckafka-47bd7goz, 目标新集群是:ckafka-kzamzogr。如下图所示:新集群已经建好了相同分区数的 topic。即 test1、test2、test3、test4。
    原集群 ckafka-47bd7goz 有两个 group,test123-group 和 test34-group,它们分别订阅主题 test1、test2、test3 和 test3、test4。

    2. 下载工具包

    下载迁移工具后,打开脚本填入原集群和新集群地址配置后,checkFlag 设置为0,运行脚本先预检查一下将要迁移的 topic 和位置。
    
    
    
    运行脚本后,将输出一些信息,同时当前目录会同时写入一份文本日志。
    
    
    

    3. 查看输出信息

    通过屏幕输出或者文本日志文件检查 Prepare to migrate 的信息,这是将要迁移的位点信息。
    
    
    以 test3 为例,它同时被 test123-group 和 test34-group 订阅,检查原集群的订阅情况。
    按照预定逻辑,一个 topic 被多个 group 订阅应该从提交最小的那个位置开始同步,即187800,检查输出信息与预期一致。
    
    
    
    还有一种情况是原集群主题 test1 由于消息已经过期,但是 group 的提交位置在过期消息的区间,因此同步只会从 test1 还未过期的最早消息位置同步。
    以 test1 的0分区为例,脚本会提示 test1 主题0分区的5226位置(topic 最存活消息最小位置)已经超过 group 订阅的提交 offset 的3713位置(该位置的消息已过期),因此同步开始的位置设置到了5226。又由于5226同样也是该分区目前最大的 offset(该分区目前存活消息总数为0)代表无消息可迁移,因此输出 skip migrate...的文本信息,代表跳过迁移本分区的数据。
    
    
    

    4. 开始迁移

    经上步检查过输出的信息确认无误后,修改 checkFlag=1 开始迁移。
    
    
    

    5. 检查迁移后数据是否数量一致

    以 test3 为例,预期迁移 test123-group 未消费的76522条消息,已经全部成功写入新集群的test3主题中,迁移数据完成。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持