CREATE TABLE `random_source` (f_sequence INT,f_random INT,f_random_str VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='10', -- 초당 생성된 데이터 행 수'fields.f_sequence.kind'='random', -- 랜덤 숫자'fields.f_sequence.min'='1', -- 최소 순차 번호'fields.f_sequence.max'='10', -- 최대 순차 번호'fields.f_random.kind'='random', -- 랜덤 숫자'fields.f_random.min'='1', -- 최소 랜덤 숫자'fields.f_random.max'='100', -- 최대 랜덤 숫자'fields.f_random_str.length'='10' -- 랜덤 문자열 길이);
-- <bucket name> 및 <folder name>을 실제 버킷 및 폴더 이름으로 바꿉니다.CREATE TABLE `cos_sink` (f_sequence INT,f_random INT,f_random_str VARCHAR) PARTITIONED BY (f_sequence) WITH ('connector' = 'filesystem','path'='cosn://<bucket name>/<folder name>/', --- 데이터가 기록될 디렉터리 경로'format' = 'json', --- 기록된 데이터의 형식'sink.rolling-policy.file-size' = '128MB', --- 최대 파일 크기'sink.rolling-policy.rollover-interval' = '30 min', --- 최대 파일 쓰기 시간'sink.partition-commit.delay' = '1 s', --- 파티션 커밋 지연'sink.partition-commit.policy.kind' = 'success-file' --- 파티션 커밋 방법);
INSERT INTO `cos_sink`SELECT * FROM `random_source`;
flink-connector-cos를 선택하고 고급 매개변수에서 COS URL을 다음과 같이 구성합니다.fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNfs.cosn.impl: org.apache.hadoop.fs.CosFileSystemfs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProviderfs.cosn.bucket.region: <COS 리전>fs.cosn.userinfo.appid: <COS 사용자 appid>
<COS region>을 ap-guangzhou와 같은 실제 COS 리전으로 바꿉니다.<COS user appid>를 실제 APPID로 바꿉니다. 이는 계정 센터에서 볼 수 있습니다.피드백