tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

数据库 MongoDB

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 14:51:30

介绍

Flink connector mongodb 目前支持通过 Flink 将数据批量写入到 mongodb 中,目前仅支持 append 流。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
不支持
1.16
支持

DDL定义

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'batchSize' = '1024' -- 每次批量写入的条数
);

使用范围

Flink connector mongodb 目前仅支持 mongodb sink。支持将腾讯云数据库 MongoDB 作为结果表使用。

WITH参数

参数
说明
是否必填
备注
connector
结果表类型
固定值 mongodb
database
数据库名称
-
collection
数据集合
-
uri
MongoDB 连接串
-
batchSize
每次批量写入的条数
默认1024
maxConnectionIdleTime
连接超时时长
默认值为60000,单位为毫秒

代码示例

CREATE TABLE random_source (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 每秒产生的数据条数
'fields.user_id.kind' = 'sequence', -- 有界序列(结束后自动停止输出)
'fields.user_id.start' = '1', -- 序列的起始值
'fields.user_id.end' = '10000', -- 序列的终止值
'fields.item_id.kind' = 'random', -- 无界的随机数
'fields.item_id.min' = '1', -- 随机数的最小值
'fields.item_id.max' = '1000', -- 随机数的最大值
'fields.category_id.kind' = 'random', -- 无界的随机数
'fields.category_id.min' = '1', -- 随机数的最小值
'fields.category_id.max' = '1000', -- 随机数的最大值
'fields.behavior.length' = '5' -- 随机字符串的长度
);

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'batchSize' = '1024' -- 每次批量写入的条数
);

insert into mongodb select * from random_source;

注意事项

Upsert

MongoDB sink 暂不支持 upsert。

用户权限

MongoDB 的 User 必须拥有 database 的写权限。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈