tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

数据仓库 Hive

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 14:50:56

介绍

Hive Connector 支持数据流的目的表,但只支持 append only,不支持 Upsert 数据流。数据格式支持包括 Text、SequenceFile、ORC 和 Parquet 等。

版本说明

Flink 版本
说明
1.11
支持 hive 版本 1.1.0、2.3.2、2.3.5、3.1.1
配置项 'connector.type' = 'hive'
1.13
支持 hive 版本 1.0.0 ~ 1.2.2、2.0.0 ~ 2.2.0、2.3.0 ~ 2.3.6、3.0.0 ~ 3.1.2
配置项 'connector' = 'hive'
1.14
不支持
1.16
支持 hive 版本 2.0.0 ~ 2.2.0、2.3.0 ~ 2.3.6、3.0.0 ~ 3.1.2
配置项 'connector' = 'hive'

DDL 定义

用作数据目的(Sink)

CREATE TABLE hive_table (
`id` INT,
`name` STRING,
`dt` STRING,
`hr` STRING
) PARTITIONED BY (dt, hr)
with (
'connector' = 'hive', -- Flink 1.13 请使用 'connector' = 'hive'
'hive-version' = '3.1.1',
'hive-database' = 'testdb',
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);

作业配置

在 Hive 数据库中建 Hive 表。
# 在 Hive 的 testdb 数据库创建 hive_table 数据表
USE testdb;
CREATE TABLE `hive_table` (
`id` int,
`name` string)
PARTITIONED BY (`dt` string, `hr` string)
STORED AS ORC;
对 Hive 表的 HDFS 路径开启写权限。
方式一:可登录 EMR Hive 集群节点(具体可参见 Hive 基础操作),对目的库 testdb 库的 hive_table 表执行 chmod 操作。
hdfs dfs -chmod 777 /usr/hive/warehouse/testdb.db/hive_table
方式二:在作业管理 > 作业参数中添加以下高级参数,可以 hadoop 用户角色获取 HDFS 路径权限。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
说明
Flink SQL 中使用 Hive 表 testdb.hive_table,这里 CREATE TABLE 的表名对应 Hive 库的表名(Flink 1.13支持通过hive-table参数配置覆盖该值),库名通过 hive-database 参数指定。

WITH 参数

参数值
必填
默认值
描述
connector.type
Flink-1.11支持,填 'hive' 选择使用 hive connector。
connector
Flink-1.13支持,填 'hive' 选择使用 hive connector。
hive-version
EMR 创建的 Hive 集群对应的版本。
hive-database
数据要写入的 Hive database。
hive-table
Flink-1.13支持,填写后该值会作为Hive库的对应表名
sink.partition-commit.trigger
process-time
分区关闭策略。可选值包括:
process-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间为分区创建时的物理时间。
partition-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间从分区中抽取出来。partition-time 依赖于 watermark 生成,需要配合 wartermark 才能支持自动分区发现。当 watermark 时间超过了 从分区抽取的时间delay 参数配置时间 之和后会提交分区。
sink.partition-commit.delay
0s
分区关闭延迟。当分区在创建超过一定时间之后将被关闭。
sink.partition-commit.policy.kind
用于提交分区的策略。可选值可以组合使用,可选值包括:
success-file:当分区关闭时将在分区对应的目录下生成一个 _success 的文件。
metastore:向 Hive Metastore 更新分区信息。
custom:用户实现的自定义分区提交策略。
partition.time-extractor.timestamp-pattern
分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。
如果时间戳应该从单个分区字段 'dt' 提取,可以配置 '$dt'。
如果时间戳应该从多个分区字段中提取,例如 'year'、'month'、'day' 和 'hour',可以配置 '$year-$month-$day $hour:00:00'。
如果时间戳应该从两个分区字段 'dt' 和 'hour' 提取,可以配置 '$dt $hour:00:00'。
sink.partition-commit.policy.class
分区提交类,配合 sink.partition-commit.policy.kind = 'custom' 使用,类必须实现 PartitionCommitPolicy。
partition.time-extractor.kind
default
分区时间抽取方式。这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。如果用户有自定义的分区时间抽取方法,配置为 custom。
partition.time-extractor.class
分区时间抽取类,这个类必须实现 PartitionTimeExtractor 接口。

代码示例

CREATE TABLE datagen_source_table (
id INT,
name STRING,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);

CREATE TABLE hive_table (
`id` INT,
`name` STRING,
`dt` STRING,
`hr` STRING
) PARTITIONED BY (dt, hr)
with (
'connector' = 'hive', -- Flink 1.13 请使用 'connector' = 'hive'
'hive-version' = '3.1.1',
'hive-database' = 'testdb',
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- streaming sql, insert into hive table
INSERT INTO hive_table
SELECT id, name, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM datagen_source_table;

Hive 配置

获取 Hive 连接配置 jar 包

Flink SQL 任务写 Hive 时需要使用包含 Hive 及 HDFS 配置信息的 jar 包来连接到 Hive 集群。具体获取连接配置 jar 及其使用的步骤如下:
1. ssh 登录到对应 Hive 集群节点。
2. 获取 hive-site.xml 和 hdfs-site.xml,EMR 集群中的配置文件在如下位置。
/usr/local/service/hive/conf/hive-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
3. 修改 hive-site.xml 文件
在hive-site增加如下配置,ip的值取配置文件里 hive.server2.thrift.bind.host 的 value
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:7004</value>
</property>
4. 获取 hivemetastore-site.xmlhiveserver2-site.xml,点击文件名下载。
5. 对获取到的配置文件 打 jar 包。
jar -cvf hive-xxx.jar hive-site.xml hdfs-site.xml hivemetastore-site.xml hiveserver2-site.xml
6. 校验 jar 的结构(可以通过 vi 命令查看 vi hive-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/
META-INF/MANIFEST.MF
hive-site.xml
hdfs-site.xml
hivemetastore-site.xml
hiveserver2-site.xml

在任务中使用配置 jar

引用程序包中选择 Hive 连接配置 jar 包(该 jar 包为在 获取 Hive 连接配置 jar 包 中得到的 hive-xxx.jar,必须在依赖管理上传后才使用)。

Kerberos 认证授权

1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab、core-site.xml、hdfs-site.xml、hive-site.xml 文件,路径如下。
/etc/krb5.conf
/var/krb5kdc/emr.keytab
/usr/local/service/hadoop/etc/hadoop/core-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
/usr/local/service/hive/conf/hive-site.xml
2. 修改 hive-site.xml 文件。在 hive-site.xml 中增加如下配置,IP 的值取配置文件中 hive.server2.thrift.bind.host 的 value。
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:7004</value>
</property>
3. 获取 hivemetastore-site.xmlhiveserver2-site.xml,点击文件名下载。
4. 对获取的配置文件打 jar 包。
jar cvf hive-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml
5. 校验 jar 的结构(可以通过 vim 命令查看 vim hive-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
hdfs-site.xml
core-site.xml
hive-site.xml
hivemetastore-site.xml
hiveserver2-site.xml
6. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
7. 获取 kerberos principal,用于作业 高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab

# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
8. 作业 高级参数 配置。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: ${krb5.conf.fileName}
注意:
历史 Oceanus 集群可能不支持该功能,您可通过 在线客服 联系我们升级集群管控服务,以支持 Kerberos 访问。

注意事项

如果 Flink 作业正常运行,日志中没有报错,但是客户端查不到这个 Hive 表,可以使用如下命令对 Hive 表进行修复(需要将 hive_table_xxx 替换为要修复的表名)。
msck repair table hive_table_xxx;


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈