tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

数据库 MongoDB CDC

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 14:55:00

介绍

MongoDB 的 CDC 源表(即 MongoDB 的流式源表),Connector 会自动跟踪 MongoDB 副本集或分片集群,以获取数据库和集合中的文档更改。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
支持
1.16
支持

使用范围

MongoDB CDC 只支持作为源表,MongoDB CDC 支持4.0、4.2、5.0版本,MongoDB 集群必须是副本集或者分片集群。

DDL 定义

-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);

WITH 参数

参数
说明
是否必填
备注
connector
源表类型
固定值为 mongodb-cdc
hosts
MongoDB 数据库的 IP 端口对
-
username
MongoDB 数据库服务的用户名
-
password
MongoDB 数据库服务的密码
-
database
MongoDB 数据库名称
-
collection
MongoDB Collection 名称
-
connection.options
MongoDB 的 连接选项。有多个时,使用&连接,例如 relicaSet=test&connectTimeoutMS=300000
-
errors.tolerance
是否忽略错误记录,接受 none 或者 all。如果设置为 all, 忽略错误记录
none
errors.log.enable
是否需要把错误操作打印到日志文件
默认值为 true
copy.existing
是否复制库中原有的数据,如果在复制期间对数据有更改,会在数据复制完成后应用更改
默认值为 true
copy.existing.pipeline
当复制原有数据的时候,可以通过这个参数设置筛选条件。例如[{"$match": {"closed": "false"}}],只会复制 closed 为 false 的 记录。用法参考 $match (aggregation)
-
copy.existing.max.threads
执行数据复制时要使用的线程数
默认值为 Processors Count
copy.existing.queue.size
复制数据时要使用的队列的最大大小
默认值为16000
poll.max.batch.size
每次拉取数据的最大数量。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
默认值为1000
poll.await.time.ms
拉取数据的时间间隔。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
默认值为1500
heartbeat.interval.ms
发送心跳消息时间间隔,以毫秒为单位。使用0禁用
默认值为0
说明
Note:当数据流变化慢的时候,建议把 heartbeat.interval.ms 设置为一个合适的值,心跳会推送 resumeToken,防止当 Flink job 从 checkpoint 或者 savepoint 恢复的时候,resumeToken 已经过期。

类型映射

MongoDB 字段类型
Flink 字段类型
-
TINYINT
-
SMALLINT
Int
INT
Long
BIGINT
-
FLOAT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
DateTimestamp
DATE
DateTimestamp
TIME
Date
TIMESTAMP(3)TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
BinData
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

代码示例

CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
CREATE TABLE `print_table` (
`id` STRING,
`name` STRING,
`currency` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select _id, name, price.currency from mongo_cdc_source_table;

注意事项

用户权限

MongoDB 的 User 必须有 changeStream 和 read 权限。
use admin;
db.createUser(
{
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
}
);

并行度

任务的并行度只支持为1。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈