tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

数据库 HBase

PDF
聚焦模式
字号
最后更新时间: 2024-08-22 10:44:12

介绍

HBase Connector 提供了对 HBase 集群的读写支持。Oceanus 已经提供了内置的flink-connector-hbase Connector 组件。

版本说明

Flink 版本
说明
1.11
支持 hbase 版本为:1.4.x
1.13
支持 hbase 版本为:1.4.x、2.2.x、2.3.x
1.14
支持 hbase 版本为:1.4.x、2.2.x
1.16
支持 hbase 版本为: 1.4.x、2.2.x

使用范围

可以作为源表,维表,以及Tuple、Upsert 数据流的目的表。

DDL 定义

CREATE TABLE hbase_table (
rowkey INT,
cf ROW < school_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2
'table-name' = 'hbase_sink_table', -- Hbase 表名
'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址
);

WITH 参数

参数
说明
是否必填
备注
connector
表类型
hbase-1.4 或者 hbase-2.2
如果您用了 hbase 2.3.x 版本,那么,connector 参数值需要替换为 hbase-2.2
table-name
HBase 表名
-
zookeeper.quorum
HBase 的 zookeeper 地址
-
zookeeper.znode.parent
HBase 在 zookeeper 中的根目录
-
null-string-literal
HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为 null-string-literal,并写入 HBase
默认为 null
sink.buffer-flush.max-size
写入 HBase 前,内存中缓存的数据量(字节)大小。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用
默认值为2MB,支持字节单位 B、KB、MB 和 GB,不区分大小写。设置为0表示不进行缓存
sink.buffer-flush.max-rows
写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用
默认值为1000,设置为0表示不进行缓存
sink.buffer-flush.interval
将缓存数据周期性写入到 HBase 的间隔,可以控制写入 HBase 的延迟。仅作为 Sink 时使用
默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0表示关闭定期写入

类型映射

HBase 将所有的数据存为字节数组。读写操作时需要将数据进行序列化和反序列化。Flink 与 HBase 的数据转换关系如下:
Flink 字段类型
HBase 转换
CHAR / VARCHAR / STRING
byte[] toBytes(String s) String toString(byte[] b)
BOOLEAN
byte[] toBytes(boolean b)boolean toBoolean(byte[] b)
BINARY / VARBINARY
byte[]
DECIMAL
byte[] toBytes(BigDecimal v)BigDecimal toBigDecimal(byte[] b)
TINYINT
new byte[] { val } bytes[0]
SMALLINT
byte[] toBytes(short val)short toShort(byte[] bytes)
INT
byte[] toBytes(int val)int toInt(byte[] bytes)
BIGINT
byte[] toBytes(long val)long toLong(byte[] bytes)
FLOAT
byte[] toBytes(float val)float toFloat(byte[] bytes)
DOUBLE
byte[] toBytes(double val)double toDouble(byte[] bytes)
DATE
将日期转换成自1970.01.01以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组
TIME
将时间转换成自00:00:00以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组
TIMESTAMP
将时间戳转换成自1970-01-01 00:00:00以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组
ARRAY
不支持
MAP / MULTISET
不支持
ROW
不支持

代码示例

包含 HBase 维表的实时计算作业代码,示例如下:
CREATE TABLE datagen_source_table (
id INT,
name STRING,
`proc_time` AS PROCTIME()
) with (
'connector'='datagen',
'rows-per-second'='1'
);

CREATE TABLE hbase_table (
rowkey INT,
cf ROW < school_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2
'table-name' = 'hbase_sink_table', -- Hbase 表名
'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址
);

CREATE TABLE blackhole_sink(
id INT,
name STRING
) with (
'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT id, cf.school_name as name FROM datagen_source_table src
JOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;

注意事项

HBase Connector 一般会使用 DDL 语句中定义的主键,以 upsert 模式工作,与外部系统交换变更日志信息。因此,必须在 HBase 的 rowkey 字段上定义主键(必须声明 rowkey 字段)。如果未声明 PRIMARY KEY 子句,则 HBase 连接器默认将 rowkey 作为主键。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈