tencent cloud

Stream Compute Service

MongoDB

PDF
Modo Foco
Tamanho da Fonte
Última atualização: 2023-11-08 14:51:17

Overview

The ‌Flink connector mongodb allows batch data writing from Flink to MongoDB. It currently supports only append (tuple) streams.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported
1.14
Unsupported
1.16
Supported

Defining a table in DDL

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- Here, it should be 'mongodb'.
'database' = 'test', -- The name of the database.
'collection' = 'table1',-- The data collection.
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
'batchSize' = '1024' -- The number of records written per batch.
);

Limits

The Flink connector mongodb can be used as a MongoDB sink only. It supports using a TencentDB for MongoDB table as a result table.

WITH parameters

Option
Description
Required
Remarks
connector
The result table type.
Yes
Here, it should be mongodb.
database
The name of the database.
Yes
-
collection
The data collection.
Yes
-
uri
The MongoDB connection string.
Yes
-
batchSize
The number of records written per batch.
No
Default value: 1024.
maxConnectionIdleTime
The connection timeout period.
No
Default value: 60000 (ms).

Example

CREATE TABLE random_source (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- The number of records generated per second.
'fields.user_id.kind' = 'sequence', -- Whether a bounded sequence (if yes, the output automatically stops after the sequence ends).
'fields.user_id.start' = '1', -- The start value of the sequence.
'fields.user_id.end' = '10000', -- The end value of the sequence.
'fields.item_id.kind' = 'random', -- A random number without range.
'fields.item_id.min' = '1', -- ‍The minimum random number.
'fields.item_id.max' = '1000', -- The maximum random number.
'fields.category_id.kind' = 'random', -- A random number without range.
'fields.category_id.min' = '1', -- The minimum random number.
'fields.category_id.max' = '1000', -- The maximum random number.
'fields.behavior.length' = '5' -- The random string length.
);

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- Here, it should be 'mongodb'.
'database' = 'test', -- The name of the database.
'collection' = 'table1',-- The data collection.
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
'batchSize' = '1024' -- The number of records written per batch.
);

insert into mongodb select * from random_source;

Notes

Upsert

‌The Flink connector mongodb as a sink does not support upsert streams.

User permissions

The user of the MongoDB database must have the permission to write data to the database.

Ajuda e Suporte

Esta página foi útil?

comentários