tencent cloud

MongoDB CDC
Last updated:2023-11-08 14:54:50
MongoDB CDC
Last updated: 2023-11-08 14:54:50

Overview

The MongoDB CDC source connector automatically tracks MongoDB replica sets or sharded clusters to capture changes in databases and collections.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported
1.14
Supported
1.16
Supported

Use cases

The MongoDB CDC connector can be used only as a source. It supports MongoDB v4.0, v4.2, and v5.0, and the MongoDB cluster must be a replica set or a sharded cluster.

Defining a table in DDL

-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);

WITH parameters

Option
Description
Required
Remarks
connector
The connector to use.
Yes
The value must be 'mysql-cdc'.
hosts
IP and port of the MongoDB database server.
Yes
-
username
Name of the database user to use when connecting to MongoDB.
Yes
-
password
Password of the database user to use when connecting to MongoDB.
Yes
-
database
The name of the MongoDB database to watch for changes.
Yes
-
collection
The name of the collection in the MongoDB database to watch for changes.
Yes
-
connection.options
The ampersand-separated Connection String Options of MongoDB, such as relicaSet=test&connectTimeoutMS=300000.
No
-
errors.tolerance
Whether to ignore error records. Valid values: none and all. If it is set to all, all error records will be ignored.
No
none
errors.log.enable
Whether to print errors in logs.
No
Default value: true.
copy.existing
Whether to copy the existing data in the database. If changes are made to the data during the copying, they will apply after the copying is completed.
No
Default value: true.
copy.existing.pipeline
This option allows setting filters for copying the existing data. For example, if you set it to [{"$match": {"closed": "false"}}], only records whose value of closed is false will be copied. For how to use this option, see $match (aggregation).
No
-
copy.existing.max.threads
The number of threads to use when copying data.
No
Default value: Processors Count.
copy.existing.queue.size
The maximum size of the queue to use when copying data.
No
Default value: 16000.
poll.max.batch.size
The maximum number of change stream documents to include in a single batch when polling for new data. By default, with a check interval of 1.5s, up to 1,000 documents can be included each time.
No
Default value: 1000.
poll.await.time.ms
The amount of time to wait before checking for new results on the change stream. By default, with a check interval of 1.5s, up to 1,000 documents can be included each time.
No
Default value: 1500.
heartbeat.interval.ms
The length of time in milliseconds between sending heartbeat messages. Set it to 0 to disable the feature.
No
Default value: 0.
Note
If data streams change slowly, we recommend you set heartbeat.interval.ms to an appropriate value. A resumeToken is included in a heartbeat message to avoid the use of an expired resumeToken when a Flink job resumes from checkpoint or savepoint.

Data type mapping

MongoDB Type
Flink Type
-
TINYINT
-
SMALLINT
Int
INT
Long
BIGINT
-
FLOAT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
DateTimestamp
DATE
DateTimestamp
TIME
Date
TIMESTAMP(3) TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0) TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
BinData
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

Example

CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
CREATE TABLE `print_table` (
`id` STRING,
`name` STRING,
`currency` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select _id, name, price.currency from mongo_cdc_source_table;

Notes

User permissions

The user of the MongoDB database must have the changeStream and read permissions.
use admin;
db.createUser(
{
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
}
);

Parallelism

The task parallelism must be 1.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback