产品动态
产品公告
CREATE TABLE `random_source` (f_sequence INT,f_random INT,f_random_str VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='10', -- 每秒产生的数据条数'fields.f_sequence.kind'='random', -- 随机数'fields.f_sequence.min'='1', -- 随机数的最小值'fields.f_sequence.max'='10', -- 随机数的最大值'fields.f_random.kind'='random', -- 随机数'fields.f_random.min'='1', -- 随机数的最小值'fields.f_random.max'='100', -- 随机数的最大值'fields.f_random_str.length'='10' -- 随机字符串的长度);
datagen,请根据实际业务需求选择相应数据源。-- 请将<存储桶名称>和<文件夹名称>替换成您实际的存储桶名称和文件夹名称CREATE TABLE `cos_sink` (f_sequence INT,f_random INT,f_random_str VARCHAR) PARTITIONED BY (f_sequence) WITH ('connector' = 'filesystem','path'='cosn://<存储桶名称>/<文件夹名称>/', --- 数据写入的目录路径'format' = 'json', --- 数据写入的格式'sink.rolling-policy.file-size' = '128MB', --- 文件最大的大小'sink.rolling-policy.rollover-interval' = '30 min', --- 文件最大写入时间'sink.partition-commit.delay' = '1 s', --- 分区提交延迟'sink.partition-commit.policy.kind' = 'success-file' --- 分区提交方式);
INSERT INTO `cos_sink`SELECT * FROM `random_source`;
flink-connector-cos,在高级参数中对 COS 的地址进行如下配置:fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNfs.cosn.impl: org.apache.hadoop.fs.CosFileSystemfs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProviderfs.cosn.bucket.region: <COS 所在地域>fs.cosn.userinfo.appid: <COS 所属用户的 appid>
<COS 所在地域>替换为您实际的 COS 地域,例如:ap-guangzhou。<COS 所属用户的 appid>替换为您实际的 APPID,具体请进入 账号中心 查看。文档反馈