tencent cloud

弹性 MapReduce

动态与公告
产品动态
产品公告
安全公告
产品简介
产品概述
产品优势
产品架构
产品功能
应用场景
约束与限制
技术支持范围
产品发行版
购买指南
EMR on CVM 计费说明
EMR on TKE 计费说明
EMR Serverless HBase 计费说明
快速入门
EMR on CVM 快速入门
EMR on TKE 快速入门
EMR on CVM 操作指南
规划集群
管理权限
配置集群
管理集群
管理服务
监控告警
智能管家
EMR on TKE 操作指南
EMR on TKE 简介
配置集群
管理集群
管理服务
监控运维
应用分析
EMR Serverless HBase 操作指南
EMR Serverless HBase 产品简介
配额与限制
规划实例
管理实例
监控告警
开发指南
EMR 开发指南
Hadoop开发指南
Spark 开发指南
HBASE开发指南
Phoenix on Hbase 开发指南
Hive 开发指南
Presto开发指南
Sqoop 开发指南
Hue 开发指南
Oozie 开发指南
Flume 开发指南
Kerberos 开发指南
Knox 开发指南
Alluxio 开发指南
Kylin 开发指南
Livy 开发指南
Kyuubi 开发指南
Zeppelin 开发指南
Hudi 开发指南
Superset 开发指南
Impala 开发指南
Druid 开发指南
Tensorflow 开发指南
Kudu 开发指南
Ranger 开发指南
Kafka 开发指南
Iceberg 开发指南
StarRocks 开发指南
Flink 开发指南
JupyterLab 开发指南
MLflow 开发指南
实践教程
EMR on CVM 运维实践
数据迁移实践
自定义伸缩实践教程
API 文档
History
Introduction
API Category
Cluster Resource Management APIs
Cluster Services APIs
User Management APIs
Data Inquiry APIs
Scaling APIs
Configuration APIs
Other APIs
Serverless HBase APIs
YARN Resource Scheduling APIs
Making API Requests
Data Types
Error Codes
常见问题
EMR on CVM常见问题
服务等级协议
联系我们

自定义函数 UDF

PDF
聚焦模式
字号
最后更新时间: 2024-10-30 11:37:10
本文为您介绍自定义函数 ( UDF ) 及开发和使用流程。

UDF 分类

UDF 分类
描述
UDF(User Defined Scalar Function)
自定义标量函数,通常称为 UDF 。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
UDTF(User Defined Table-valued Function)
自定义表值函数,用来解决一次函数调用输出多行数据场景的,也是唯一一个可以返回多个字段的自定义函数。
UDAF(User Defined Aggregation Function)
自定义聚合函数,其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值,可以与 SQL 中的 Group By 语句联合使用。
更多说明可参考社区文档:UDFUDAFUDTF

开发 UDF

使用 IDE,创建 Maven 工程。工程基本信息如下,您可以自定义 groupId 和 artifactId :
<groupId>org.example</groupId>
<artifactId>hive-udf</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
添加 pom 依赖:
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.3</version>
<exclusions>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
创建一个类,类名您可以自定义,本文以 nvl 为例:
方式 1:继承 UDF 重写 evaluate 方法:
package org.example;

import org.apache.hadoop.hive.ql.exec.UDF;

public class nvl extends UDF {
public String evaluate(final String s) {
if (s == null) { return null; }
return s + ":HelloWorld";
}
}
方式 2(推荐,适用于传参情况复杂的场景):继承 GenericUDF 重写 initialize、evaluate、getDisplayString:
package org.example;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

@Description(name = "nvl",
value = "nvl(value, default_value) - Returns default value if value is null else returns value",
extended = "Example: SELECT nvl(null, default_value);")
public class MyUDF extends GenericUDF {

private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
private ObjectInspector[] argumentOIs;

/**
* 根据函数的入参类型确定出参类型
*/
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
argumentOIs = arguments;
if(arguments.length != 2) {
throw new UDFArgumentException("The operator 'NVL' accepts 2 arguments.");
}
returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
if(!(returnOIResolver.update(arguments[0]) && returnOIResolver.update(arguments[1]))) {
throw new UDFArgumentTypeException(2, "The 1st and 2nd args of function NLV should have the same type, "
+ "but they are different: \\""+arguments[0].getTypeName()+"\\" and \\"" + arguments[1].getTypeName() + "\\"");
}
return returnOIResolver.get();
}

/**
* 计算结果,最后结果的数据类型会根据initialize方法的返回值类型确定函数的返回值类型
*/
public Object evaluate(DeferredObject[] arguments) throws HiveException {
Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(), argumentOIs[0]);
if(retVal == null) {
retVal = returnOIResolver.convertIfNecessary(arguments[1].get(), argumentOIs[1]);
}
return retVal;
}

/**
* 获取要在explain中显示的字符串
*/
public String getDisplayString(String[] children) {
StringBuilder builder = new StringBuilder();
builder.append("if ");
builder.append(children[0]);
builder.append(" is null ");
builder.append("returns ");
builder.append(children[1]);
return builder.toString();
}
}
以方式 2 为例,将自定义的代码打成 jar 包。在 pom.xml 所在目录,执行如下命令制作 jar 包。
mvn clean package -DskipTests
target 目录下会出现 hive-udf-1.0-SNAPSHOT.jar 的 jar 包,即代表完成了 UDF 开发工作。

使用UDF

将生产的 jar 上传至 EMR 集群 Master 节点:
scp ./target/hive-udf-1.0-SNAPSHOT.jar root@${master_public_ip}:/usr/local/service/hive
切换 hadoop 用户并执行以下命令将 jar 上传到 HDFS 中:
su hadoop
hadoop fs -put ./hive-udf-1.0-SNAPSHOT.jar /
查看上传到 HDFS 中的 jar:
hadoop fs -ls /
Found 5 items
drwxr-xr-x - hadoop supergroup 0 2023-08-22 09:20 /data
drwxrwx--- - hadoop supergroup 0 2023-08-22 09:20 /emr
-rw-r--r-- 2 hadoop supergroup 3235 2023-08-22 15:39 /hive-udf-1.0-SNAPSHOT.jar
drwx-wx-wx - hadoop supergroup 0 2023-08-22 09:20 /tmp
drwxr-xr-x - hadoop supergroup 0 2023-08-22 09:20 /user
连接 Hive:
hive
执行以下命令,应用生成的 JAR 包创建函数。
hive> create function nvl as "org.example.MyUDF" using jar "hdfs:///hive-udf-1.0-SNAPSHOT.jar";
说明:
1. nvl 是 UDF 函数的名称。
2. org.example.MyUDF 是项目中创建的类全名。
3. hdfs:///user/hive/warehouse/hiveudf-1.0-SNAPSHOT.jar 为上传 jar 包到 HDFS 的路径。
出现以下信息时,表示创建成功:
Added [/data/emr/hive/tmp/1b0f12a6-3406-4700-8227-37dec721297b_resources/hive-udf-1.0-SNAPSHOT.jar] to class path
Added resources: [hdfs:///hive-udf-1.0-SNAPSHOT.jar]
OK
Time taken: 1.549 seconds
您也可以通过命令 SHOW FUNCTIONS LIKE '*nvl*',验证函数是否创建成功。
执行以下命令,使用 UDF 函数。该函数与内置函数使用方式一样,直接使用函数名称即可访问:
hive> select nvl("tur", "def");
OK
tur
Time taken: 0.344 seconds, Fetched: 1 row(s)
hive> select nvl(null, "def");
OK
def
Time taken: 0.471 seconds, Fetched: 1 row(s)


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈