tencent cloud

云数据库 PostgreSQL

动态与公告
产品动态
产品简介
产品概述
产品特性
产品优势
应用场景
信息安全说明
地域和可用区
产品功能列表
大版本生命周期说明
MSSQL 兼容版
产品计费
计费概述
实例类型与规格
购买方式
退费说明
欠费说明
备份空间收费说明
快速入门
创建 PostgreSQL 实例
连接 PostgreSQL 实例
管理 PostgreSQL 实例
数据导入
通过 DTS 迁移数据
内核能力介绍
内核版本概述
内核版本更新动态
查看内核版本
自研内核功能介绍
数据库审计
审计服务说明
开通审计服务
查看审计日志
修改审计服务
审计性能说明
用户指南
实例管理
升级实例
CPU 弹性扩容
只读实例
账号管理
数据库管理
参数管理
日志管理及分析
备份与恢复
数据迁移
插件管理
网络管理
访问管理
数据安全
租户及资源隔离
安全组
监控与告警
标签
AI 实践
使用 tencentdb_ai 插件调用大模型
使用 tencentdb_ai 插件构建 AI 应用
结合 Supabase 快速构建基于云数据库 PostgreSQL 的后端服务
实践教程
跨库访问
如何在 PostgreSQL 中自动创建分区
基于 pg_roaringbitmap 实现超大规模标签查找
一条 SQL 实现查询附近的人
如何配置云数据库 PostgreSQL 作为 GitLab 外部数据源
通过 cos_fdw 插件支持分级存储能力
通过 pgpool 实现读写分离
通过 auto_explain 插件实现慢 SQL 分析
使用 pglogical 进行逻辑复制
使用 Debezium 采集 PostgreSQL 数据
在 CVM 本地搭建 PostgreSQL 异地灾备环境
只读实例与只读组实践教程
如何使用云函数定时操作数据库
表膨胀处理
性能白皮书
测试方法
测试结果
API 文档
History
Introduction
API Category
Making API Requests
Instance APIs
Read-only Replica APIs
Backup and Recovery APIs
Parameter Management APIs
Security Group APIs
Performance Optimization APIs
Account APIs
Specification APIs
Network APIs
Data Types
Error Codes
常见问题
相关协议
Service Level Agreement
Terms of Service
词汇表
联系我们
文档云数据库 PostgreSQL实践教程使用 Debezium 采集 PostgreSQL 数据

使用 Debezium 采集 PostgreSQL 数据

PDF
聚焦模式
字号
最后更新时间: 2026-02-03 11:07:38
业务应用场景中,常常需要实时捕获数据库的数据变更并将其同步至其他系统。该过程可通过 Debezium 实现, Debezium 用于监控数据库变化和捕捉数据变动事件,并以事件流的形式导出。
本文将说明如何使用 Debezium 采集云数据库 PostgreSQL 中的数据。

前提条件

准备处于同一 VPC 下的云数据库 PostgreSQL 实例和云服务器实例。

步骤1:部署环境

1.云服务器配置 java 环境

Debezium 属于 java 应用,需要在云服务器配置 java 环境,为其正常运行提供基础。
依次执行下方命令,下载 jdk18 安装包并解压。
#下载jdk8
[root@VM-10-18-tencentos ~]# wget --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" \\
https://download.oracle.com/java/18/archive/jdk-18.0.2_linux-x64_bin.tar.gz
#解压安装包
[root@VM-10-18-tencentos ~]# tar -zxvf jdk-18.0.2_linux-x64_bin.tar.gz -C /usr/local/
#重命名目录
[root@VM-10-18-tencentos ~]# sudo mv /usr/local/jdk-18.0.2 /usr/local/jdk18
执行下列命令,进入配置文件内容。
[root@VM-10-18-tencentos ~]# vim /etc/profile
按 i 键进入编辑模式,在文件末尾添加以下内容:
export JAVA_HOME=/usr/local/jdk18
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib
添加完毕后,按 esc 键退出编辑模式,再输入 :wq 保存修改并退出文件内容。
执行以下命令使配置立即生效。
[root@VM-10-18-tencentos ~]# source /etc/profile
可通过以下命令检查 java 是否配置成功。
[root@VM-10-18-tencentos ~]# java -version
java version "18.0.2" 2022-07-19
Java(TM) SE Runtime Environment (build 18.0.2+9-61)
Java HotSpot(TM) 64-Bit Server VM (build 18.0.2+9-61, mixed mode, sharing)
如果显示 java 版本信息,说明配置成功。

2.本地 Kafka 部署

您可选择手动在官网 Apache Kafka 下载自己需要的版本的二进制包( Binary download ),然后上传到 CVM 上。具体请参见 如何将本地文件拷贝到云服务器
也可直接执行以下命令下载,版本号可根据实际需要自行替换。
[root@VM-10-18-tencentos ~]# wget https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
将 kafka 安装包下载到 CVM 之后,依次执行以下命令完成安装。
#创建kafka的安装目录
[root@VM-10-18-tencentos ~]# mkdir -p /data/zookeeper
#解压kafka安装包
[root@VM-10-18-tencentos ~]# tar -zxvf kafka_2.13-3.7.2.tgz -C /data/
#重命名解压后的目录
[root@VM-10-18-tencentos ~]# cd /data/
[root@VM-10-18-tencentos data]# mv kafka_2.13-3.7.2 kafka_dev
执行下行命令,进入 Zookeeper 配置文件。
root@VM-10-18-tencentos data]# cd /data/kafka_dev/config
[root@VM-10-18-tencentos config]# vim /data/kafka_dev/config/zookeeper.properties
进入文件内后,按i键进入编辑模式,找到 dataDir ,将其修改为 /data/zookeeper ,确保 dataDir 指向正确的存储路径。
dataDir =/data/zookeeper
修改完成后,按 esc 键退出编辑模式,再直接输入 :wq 保存修改并退出文件内容。

3.修改 Kafka 配置文件

执行以下命令,创建 kafka 日志目录。
[root@VM-10-18-tencentos config]# mkdir -p /data/kafka_dev/logs/
执行以下命令,进入 kafka 配置文件。
[root@VM-10-18-tencentos config]# vim connect-distributed.properties
进入文件后,按 i 键进入编辑模式,修改以下内容。若云服务器与云数据库处于同一 VPC 下,建议填写云服务器的内网 IP 地址。
listeners=PLAINTEXT://部署kafka所在机器的ip:9092 #若该项所在行首有#号,需将#删去并修改
log.dirs=/data/kafka_dev/logs/connect.log
zookeeper.connect=部署kafka所在机器的ip:2181
执行以下命令,进入 kafka connect 的配置文件 connect-distributed.properties 。
[root@VM-10-18-tencentos config]# vim connect-distributed.properties
进入文件后,按 i 键进入编辑模式,修改以下内容。若云服务器与云数据库处于同一 VPC 下,建议填写云服务器的内网 IP 地址。
group.id=connect-cluster
bootstrap.servers=部署kafka所在机器的ip:9092
# 定义插件路径
plugin.path=/data/kafka_connect/plugins

4.启动 Zookeeper 和 Kafka

使用以下命令启动 zookeeper 。
[root@VM-10-18-tencentos config]# cd /data/kafka_dev
[root@VM-10-18-tencentos kafka_dev]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &> zookeeper.log &
可输入以下命令确认 zookeeper 任务是否正常在后台运行。
[root@VM-10-18-tencentos kafka_dev]# jobs
若返回信息包含 zookeeper 和 running ,则正常运行。
再执行以下命令启动 kafka 。
[root@VM-10-18-tencentos kafka_dev]# nohup bin/kafka-server-start.sh config/server.properties &> kafka.log &
可输入以下命令确认 kafka 任务是否正常在后台运行。
[root@VM-10-18-tencentos kafka_dev]# jobs
若返回信息包含 kafka 和 running,则正常运行。

步骤2:在 PostgreSQL 中创建逻辑复制发布

逻辑发布( Publication )定义了哪些表的数据变更会被发布,Debezium 通过绑定到逻辑发布上的逻辑复制槽( Failover Slot )来捕获变更数据。对于逻辑复制槽的具体说明请参见 逻辑复制槽故障转移(Failover Slot)。因此,您需要创建 publication 和 failover slot ,才能实现数据的捕获和同步。

1.开启逻辑复制

进入控制台,找到需要采集数据的实例,在实例详情页面单击参数设置,将 wal_level 参数默认值修改为为 logical ,参数值修改后需要重启实例才能生效。

修改后,可登录实例,使用以下查询语句查看 wal_level 是否修改成功。
show wal_level;


2.创建 Publication (逻辑发布)

使用类型为 pg_tencentdb_superuser 的账号,登录需要发布的数据库控制台。执行以下命令创建 publication 。
CREATE PUBLICATION pg_demo_publication FOR ALL TABLES;
其中, pg_demo_publication 指该 publication 的名称,您可自行定义, FOR ALL TABLES 指将当前数据库中的全部表都进行发布。若您需要指定发布哪几张表,则可选择执行以下命令:
CREATE PUBLICATION pg_demo_publication FOR table_name1, table_name2;
您可执行以下命令查看刚刚创建的 publication ,确认要发布的表。
SELECT * FROM pg_publication_tables WHERE pubname = ‘pg_demo_publication’;
若希望确认有哪些操作会进行发布,可执行以下命令查看。
SELECT * FROM pg_publication WHERE pubname = ‘pg_demo_publication’;
pg_publication 表用于存储所有已创建的 publication 信息。其中, puballtables 列为 true ,则表示发布数据库中的所有表; pubinsert 列为 true ,表示发布表的 insert 操作,其他列相同。
执行以下命令,创建 tencentdb_failover_slot 插件。
CREATE EXTENSION tencentdb_failover_slot;
执行以下命令,创建 logical_failover_slot 。
SELECT pg_create_logical_failover_slot(‘failover_alot_name’,’pgoutput’);
创建完成后,可使用以下命令查看 Failover Slot 信息。
SELECT * FROM pg_failover_slots;

步骤3:开启 Debezium

1.安装 Debezium 插件

登录云服务器,依次执行以下命令下载 debezium-connector-postgresql 插件,并解压到指定路径。
[root@VM-10-18-tencentos ~]# mkdir -p /data/kafka_connect/plugins
[root@VM-10-18-tencentos ~]# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.7.3.Final/debezium-connector-postgres-2.7.3.Final-plugin.tar.gz
[root@VM-10-18-tencentos ~]# tar -zxvf debezium-connector-postgres-2.7.3.Final-plugin.tar.gz -C /data/kafka_connect/plugins

2.启动 kafka connect

启动 kafka connect 前,请确保已经启动了 kafka 。启动及确认方法请参见 步骤1
执行以下命令启动 kafka connect 。
[root@VM-10-18-tencentos ~]# cd /data/kafka_dev
[root@VM-10-18-tencentos kafka_dev]# nohup bin/connect-distributed.sh config/connect-distributed.properties &> connect.log &

3.创建 debezium connector

在云服务器执行下面的命令,进行 debezium connector 的创建。需要自行根据实际情况填写的项已为您标出。
说明:
若云服务器与云数据库 PostgreSQL 处于同一 VPC 下,建议填写云服务器机器、云数据库的内网 IP 。
curl -XPOST "http://部署kafka所在云服务器的ip:8083/connectors/" \\
-H 'Content-Type: application/json' \\
-d '{
"name": "test_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "PostgreSQL数据库所在机器的IP",
"database.port": "5432",
"database.user": "有发布权限的PostgreSQL用户名",
"database.password": "发布用户的密码",
"database.dbname": "创建publication的数据库",
"database.server.name": "pg_demo",
"slot.name": "pg_demo_failover_slot",
"topic.prefix": "pg_demo",
"publication.name": "pg_demo_publication",
"publication.autocreate.mode": "all_tables",
"plugin.name": "pgoutput"
}
}'
需要您自行填写及可自定义的参数说明如下:
参数
说明
name
连接器的名称,必须唯一。
database.hostname
云数据库的 IP 地址。建议您填写内网 IP。
database.user
用于连接云数据库的用户名。
该用户需要有足够权限完成发布。推荐使用类型为 pg_tecenten_superuser 的用户。
database.password
用户的密码。
database.dbname
创建 publication 的数据库名称。
slot.name
逻辑复制槽的名称。
请填写之前创建的逻辑复制槽名称。
publication.name
逻辑发布的名称。
请填写之前创建的逻辑发布名称。
您可登录控制台,使用以下命令查看 publication 。
SELECT * FROM pg_publication;
可使用以下命令查看 failover Slot 信息。
SELECT * FROM pg_failover_slots;
创建完毕后,您可通过在云服务器输入以下命令查看连接状态。
curl "http://部署kafka所在机器的ip:8083/connectors/pg_demo_connector/status"
返回的信息中包含 running 则为正常运行。

步骤4:测试数据变更

执行以下命令在 CVM 中登录云数据库。其中 -h 后填写为云数据库 IP。若云服务器与云数据库处于同一 VPC 下,建议填写为内网 IP。
[root@VM-10-18-tencentos kafka_dev]# su – postgres
[postgres@VM-10-18-tencentos ~]$ /usr/local/pgsql/bin/psql -h *.*.*.* -p 5432 -U dbadmin -d postgres
Password for user dbadmin:
psql (16.4, server 16.8)
Type "help" for help.
postgres=>
创建一张新表,插入数据进行测试。
postgres=> CREATE TABLE linktest (
id SERIAL PRIMARY KEY
);
CREATE TABLE
postgres=> insert into linktest values(1);
INSERT 0 1
观察 kafka 日志,若 kafka 正常,则连接建立成功。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈