tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

步骤2:迁移 Topic 上云

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 16:21:51

操作场景

本文档为您介绍如何利用 CKafka 提供的迁移工具将自建 Kafka 集群的 Topic 迁移到 CKafka 的实例中。

前提条件

操作步骤

1. 下载 迁移工具 并解压到可以连通自建实例的 broker 和 zk 的机器上。
2. 在 ckafka-migrate.py 文件中填写配置参数。
说明:
请保证后面迁移操作所在的机器和 CKafka 以及自建 Kafka 集群的网络互通。
云API 的密钥对应的用户需要拥有 CKafka 的写权限,建议使用主账号的密钥对。
# your local broker ip:port list
# 自建实例的broker列表 ["broker1:port1","ip2:port2"]
bootstrapServers = ["$ip:$port"]

# your local zk ip:port list
# 自建实例的zk列表 ["zk1:port1","zk2:port2"],非必设配置,未设置则用bootstrapServers方式获取源集群信息
sourceZk = ["$ip:$port"]
# 如果zk有auth鉴权,需要设置该参数,格式为 [("digest", "$user:$password")]
zkAuthData = []
# 如果zk路径有前缀,需要设置该参数,例如 "/cluster1"
zkPathPrefix = ""

# ckafka instanceId
# 云上实例id "ckafka-xxx"
instanceId = "$yourinstanceId"
# topic regex,just migrate match topics
# topic名称正则表达式,非空则只迁移匹配到的topic
topicRegex = ""

# your secretId and secretKey
# 腾讯云账号的密钥对
secretId = "$yoursecretId"
secretKey = "$yoursecretKey"

# your cloud instance region
# 云上实例的地域 ckafka已开区地域码请参考文档地域与可用区https://www.tencentcloud.com/document/product/597/44597?from_cn_redirect=1

region = "ap-tokyo"

# if you make sure the migrate topic List,please modify checkFlag = 1
# 检查标记,设0只显示将要迁移的topic列表不做真正迁移,请先以0运行检查将要迁移的topic列表,确认无误后修改为1开始迁移
# 0:列出迁移topic列表后脚本终止
# 1:列出迁移topic列表并开始迁移
checkFlag = 0

# force transfer your cloud-topic config to migrate
# 如果为0转换本地topic到云上topic时,属性不一致不会迁移上云。如果为1,会强制转换topic属性和云上最近的值
# 例如云上topic的副本只支持1,2,3副本,如果本地的某topic副本数为5,则不会迁移上云。如果force设置为1,则会取和5最接近的3副本,在云上创建3副本topic。
# 0:本地和云上topic副本数或topic属性不兼容时,跳过不兼容的topic或者topic属性
# 1:本地和云上topic副本数或topic属性不兼容时,强制迁移至云上,云上topic的属性将根据自建topic的属性做出最接近的修正 (不会修改本地自建源kafka的任何数据)
force = 0
参数
说明
bootstrapServers
自建实例的broker列表,["ip1:port1","ip2:port2"]。
sourceZk
自建实例的zookeeper列表, ["zk1:port1","zk2:port2"],非必设配置,未设置则用bootstrapServers方式获取源集群信息
zkAuthData
如果zk有auth鉴权,需要设置该参数,格式为 [("digest", "$user:$password")]。
zkPathPrefix
如果zk路径有前缀,需要设置该参数,例如 "/cluster1"。
instanceId
您在 购买云上实例 中购买的 CKafka 实例的 ID,在控制台的实例列表页面复制。
secretId
腾讯云账号的密钥对-ID。
secretKey
腾讯云账号的密钥对-密码。
region
您在 购买云上实例 中选择的部署地域,脚本内注释附带各地域码。
checkFlag
检查标记,设置为0时只显示将要迁移的 Topic 列表并不开始迁移,设置为非0时开始迁移 Topic。
topicRegex
Topic 名称正则表达式,设置为空时迁移所有的 Topic,非空时则只迁移匹配到的 Topic。
force
是否强制迁移,如果为0转换本地 Topic 到云上 Topic 时,属性不一致不会迁移上云。如果为1,会强制转换 Topic 属性和云上规定最接近的值。
3. 将 ckafka-migrate.py 的 checkFlag 参数设为0,运行脚本 python ckafka-migrate.py,根据输出结果检查需要迁移的Topic列表。
说明:
如果缺少部分自建的 Topic,可能是自建Topic命名不符合规则或者Topic副本数、Topic属性数值与云上数值范围无法兼容。



4. 把 ckafka-migrate.py 的 checkFlag 参数设为1,运行脚本 python ckafka-migrate.py,开始迁移 Topic。


5. 登录 CKafka 控制台,在迁移上云页面查看任务列表,等待 Topic 迁移完毕。 任务列表如下:

迁移成功界面如下:




帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈