tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

数据库 PostgreSQL CDC

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 14:52:23

介绍

Postgres 的 CDC 源表(即 Postgres 的流式源表)用于依次读取 PostgreSQL 数据库全量快照数据和变更数据,保证不多读也不少读一条数据。即使发生故障,也能采用 Exactly Once 方式处理。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
不支持
1.16
支持

使用范围

PostgreSQL CDC 只支持作为源表。支持的 PostgreSQL 版本为9.6及以上版本。

DDL 定义

CREATE TABLE postgres_cdc_source_table (
id INT,
name STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
'hostname' = 'yourHostname', -- 数据库的 IP
'port' = '5432', -- 数据库的访问端口
'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
'password' = 'yourPassWord', -- 数据库访问的密码
'database-name' = 'yourDatabaseName', -- 需要同步的数据库
'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
);

WITH 参数

参数
说明
是否必填
备注
connector
源表类型
固定值为 postgres-cdc
hostname
Postgres 数据库的 IP 地址或者 Hostname
-
username
Postgres 数据库服务的用户名
有特定权限(包括 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT)的 Postgres 用户
password
Postgres 数据库服务的密码
-
database-name
Postgres 数据库名称
-
schema-name
Postgres Schema 名称
Schema 名称支持正则表达式以读取多个 Schema 的数据
table-name
Postgres 表名
表名支持正则表达式以读取多个表的数据
port
Postgres 数据库服务的端口号
默认值为5432
decoding.plugin.name
Postgres Logical Decoding 插件名称
根据 Postgres 服务上安装的插件确定。支持的插件列表如下:
decoderbufs(默认值)
wal2json
wal2json_rds
wal2json_streaming
wal2json_rds_streaming
pgoutput
debezium.*
Debezium 属性参数
从更细粒度控制 Debezium 客户端的行为。例如'debezium.slot.name' = 'xxxx',以避免出现 PSQLException: ERROR: replication slot "dl_test" is active for PID 19997 详情请参见 配置属性

类型映射

Postgres CDC 和 Flink 字段类型对应关系如下:
Postgres CDC 字段类型
Flink 字段类型
SMALLINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGER
INT
SERIAL
BIGINT
BIGINT
BIGSERIAL
REAL
FLOAT
FLOAT4
FLOAT8
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
BOOLEAN
DATE
DATE
TIME [(p)] [WITHOUT TIMEZONE]
TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEA
BYTES

代码示例

CREATE TABLE postgres_cdc_source_table (
id INT,
name STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
'hostname' = 'yourHostname', -- 数据库的 IP
'port' = '5432', -- 数据库的访问端口
'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
'password' = 'yourPassWord', -- 数据库访问的密码
'database-name' = 'yourDatabaseName', -- 需要同步的数据库
'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
);

CREATE TABLE `print_table` (
`id` INT,
`name` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select * from postgres_cdc_source_table;

注意事项

用户权限

用来同步的用户至少具有 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 权限。
CREATE ROLE debezium_user REPLICATION LOGIN;
GRANT USAGE ON SCHEMA schema_name TO debezium_user;
GRANT USAGE ON DATABASE schema_name TO debezium_user;
GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈