tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

数据库 Redis

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 14:54:39

介绍

Redis Connector 提供了对 Redis 写入和维表的支持。

版本说明

Flink 版本
说明
1.11
支持(写入,维表)
1.13
支持(写入,维表)
1.14
支持(写入、维表)
1.16
支持(写入、维表)

使用范围

可以作为维表的使用。
可以作为 Tuple、Upsert 数据流的目的表。

DDL 定义

set 命令(字符串键)

-- 第1列为 key,第2列为 value。Redis 命令为 set key value
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'set', -- set 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);

lpush 命令(列表键)

-- 第1列为 key,第2列为 value。Redis 命令为 lpush key value
CREATE TABLE `redis_lpush_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'lpush', -- lpush 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);


sadd 命令(集合键)

-- 第1列为 key,第2列为 value。Redis 命令为 sadd key value
CREATE TABLE `redis_sadd_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'sadd', -- sadd 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);


hset 命令(哈希键)

-- 第1列为 hash_key,第2列为 hash_value。Redis 命令为 hset key hash_key hash_value。
CREATE TABLE `redis_hset_sink_table` (
`hash_key` STRING,
`hash_value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'hset', -- hset 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>',
'additional-key' = '<key>' -- 哈希键
);


HSET_WITH_KEY 命令(哈希键)

-- 第1列为 key, 第2列为 hash_key,第3列为 hash_value。Redis 命令为 hset key hash_key hash_value。
CREATE TABLE `redis_hset_sink_table` (
`key` STRING,
`hash_key` STRING,
`hash_value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'hset', -- hset 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);


hmset 命令

-- 第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为hmset key hash_key hash_value
CREATE TABLE `redis_hmset_sink_table` (
`key` STRING,
`fieldKey` STRING,
`fieldValue` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'hmset', -- hmset 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>'
);

zadd 命令(有序集合键)

-- 第1列为 hash_key,第2列为 hash_value。zadd key score value。
CREATE TABLE `redis_zadd_sink_table` (
`value` STRING,
`score` DOUBLE
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'zadd', -- zadd 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>',
'additional-key' = '<key>' -- 有序集合键
);


ZADD_WITH_KEY 命令(有序集合键)

-- 第1列为 key, 第2列为 value,第3列为 score。zadd key score value。
CREATE TABLE `redis_zadd_sink_table` (
`key` STRING,
`value` STRING,
`score` DOUBLE
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'zadd', -- zadd 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>',
);

get 命令(维表)

CREATE TABLE `redis_dimension_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis',
'command' = 'get', -- get命令
'nodes' = '<host>:<port>', -- redis连接地址,集群模式多个节点使用'',''分隔。
'lookup.cache.max-rows' = '500', -- 查询缓存中最多缓存的数据条数。
'lookup.cache.ttl' = '10 min', -- 每条记录的最长缓存时间。
'lookup.max-retries' = '5', -- 查询失败,最多重试次数。
-- 'password' = '<password>', -- 可选参数,密码
-- 'database' = '<database>', -- 可选参数,数据库:默认0
-- 'redis-mode' = 'standalone' -- 可选参数,redis 部署模式,默认standalone。(可选:standalone 单机模式,cluster 集群模式)
);

WITH 参数

参数值
必填
默认值
描述
connector
-
固定值为 redis
command
-
操作命令。取值与对应的键类型如下:
set:字符串键
lpush:类别键
sadd:集合键
hset:哈希键
hset_with_key:哈希键
zadd:有序集合键
zadd_with_key:有序集合键
nodes
-
redis server 连接地址,示例:127.0.0.1:6379。集群架构下多个节点使用','分隔
password
redis 密码,默认值为空,不进行权限验证
database
0
要操作的数据库的 DB number,默认值0
redis-mode
standalone
redis 部署模式
standalone:标准架构,单机
cluster:集群架构,分布式
ignore-delete
false
是否忽略 Retraction 消息
additional-ttl
-
过期时间,单位:秒。示例:60,设置过期是时间为60秒。只有 set 命令支持设置过期时间
additional-key
-
-
用于指定 hset 和 zadd 的 key。执行 hset 和 zadd 命令时必须设置
lookup.cache.max-rows
-
查询缓存中最多缓存的数据条数
lookup.cache.ttl
10s
每条记录的最长缓存时间
lookup.max-retries
1
查询失败最多重试次数
lookup.cache.caching-missing-key
true
是否缓存空结果

代码示例

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'set', -- set 命令
'nodes' = '127.0.0.1:8121', -- redis 连接地址
'password' = '<password>',
'database' = '0'
);
insert into redis_set_sink_table select cast(id as string), name from datagen_source_table;

注意事项

1. 自建 Redis 的集群模式不支持多数据库,集群模式下 database 参数无效。
2. 腾讯云数据库 Redis 的集群架构支持多数据库,可以使用 standalone 模式指定写入云数据库 Redis 集群架构中的非0数据库,例如:
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到Redis
'command' = 'set', -- set命令
'nodes' = '<host>:<port>', -- redis连接地址
'password' = '<password>',
'redis-mode' = 'standalone',
'database' = '1'
);


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈