一、自定义函数简介
1.1、函数类型
UDF:用户定义函数
UDF操作作用于单个数据行,并且产生一个数据行作为输出。大多数函数都属于这一类(比如数学函数和字符串函数)
UDAF:用户定义聚集函数
UDAF 接受多个输入数据行,并产生一个输出数据行。像COUNT和MAX这样的函数就是聚集函数。
UDTF:用户定义表生成函数
UDTF 操作作用于单个数据行,并且产生多个数据行。比如explode。
二、自定义UDF
定义一个udf,实现根据输入的日期,输出一个时段, 2:00-5:00凌晨,5:00-12:00为上午,12:00-14:00为中午,14:00-17:00为下午,17:00-19:00为傍晚,19:00-23:00为晚上,23:00-2:00为深夜
-
继承UDF
-
定义evaluate方法
测试数据:
2019-12-12 10:38:26
2019-12-12 17:00:00
2016-01-22 19:23:22
2018-04-02 03:12:00
2.1、步骤:创建临时函数或永久函数
1、创建临时函数
1.上传自定义udf的jar到Linux (将自定义函数打包成datetime.jar并长传到/opt下)
2.在Hive CLI执行:
add jar /opt/dateudf.jar;
3.在Hive CLI执行:
create temporary function datetotime as 'demo.DateUDF';
(注意:创建临时函数只有退出hive就不能再使用)
2、创建永久函数
1.把自定义函数的jar上传到hdfs中
2.创建永久函数:
create function datetotime as 'demo.DateUDF' using jar 'hdfs://master:8020/user/root/dateudf.jar’;
2.2、删除自定义函数
删除函数:
drop [temporary] function [if exists] [dbname.]函数名;
例:删除永久函数(临时的就没必要了,因为一退出就没了)
三、自定义UDAF
求平均数
1.需继承UDAF;
2.内部静态类需继承UDAFEvaluator抽象类,重写方法
init()
,
iterate()
,
terminatePartial()
,
merge()
,
terminate()
。
init() 初始化 一般负责初始化内部字段,通常初始化用来存放最终结果的变量
iterate() 每次都会对一个新的值进行聚合计算时都调用该方法,一般会根据计算结果更新用来存放最终结果的变量,如果计算正确或者输入值合法就返回true
terminatePartial() 这个方法直译过来是"终止部分",部分聚合结果的时候调用该方法 必须返回一个封装了聚合计算当前状态的对象,类似于 MapReduce的combiner
merge() 接受来自 terminatePartial的返回结果,进行合并,hive合并两部分聚合的时候回调用这个方法
terminate() 终止方法 返回最终聚合函数结果
测试数据:
1,700,1200
2,1201,1400
3,1401,2000
4,2001,3000
5,3001,9999
自定义函数:
package demo;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
public class UDAFAverage extends UDAF{
public static class AvgStat{
private int mcount;
private double msum;
}
public static class AvgEvaluator implements UDAFAverage{
AvgStat avgStat;
public AvgEvaluator() {
super();
avgStat=new AvgStat();
init();
}
@Override
public void init() {
avgStat.mcount=0;
avgStat.msum=0;
}
/**
* 介绍原始数据并进行内部轮转
* @return
*/
public boolean iterate(Double o) {
if(o!=null) {
avgStat.mcount++;
avgStat.msum+=o;
}
return true;
}
/**
* 接收iterate遍历结束后的输出结果进行初次聚集,类似Combiner
* @return
*/
public AvgStat terminatePartial() {
if(avgStat.mcount==0)
return null;
else return avgStat;
}
public boolean merge(AvgStat otheravgState) {
if(otheravgState!=null) {
avgStat.mcount+=otheravgState.mcount;
avgStat.msum+=otheravgState.msum;
}
return true;
}
/**
* terminate返回最终的聚集函数结果 * * @return
*/
public Double terminate() {
if(avgStat.mcount==0)
return null;
else return avgStat.msum/avgStat.mcount;
}
}
}
接下来就是创建临时函数或永久函数了。。。
四、自定义UDTF
定义一个UDTF,可以将“商品1:价格1,商品2:价格2”格式的一列数据解析成商品,价格两个字段
1.继承GenericUDTF,实现initialize, process, close三个方法
2.其中initialize方法主要是判断输入类型并确定返回的字段类型。
3.process方法对udft函数输入的每一行进行操作,通过调用forward方法返回一行或多行数据。
4.close方法在process调用结束后调用,用于进行其它一些额外操作,只执行一次。
测试数据:
shop1:20,shop2:30
shop3:40,shop4:30,shop5:10
自定义函数:
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class UDTFExplode extends GenericUDTF{
/**
* 进行输入类型判断,定义输出字段和类型
* 辅助类objectInspector帮助使用者访问需要序列化或者反序列化的对象
*/
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
if(argOIs.length!=1) {
throw new UDFArgumentLengthException("UDTFExplode takes only one argument");
}
if(argOIs[0].getCategory()!=ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("UDTFExplode takes string as a parameter");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("shop");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("volume");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
/**
* 处理输入的数据:商品1:销量1,商品2:销量2....
* 返回多行,每行新增两个字段shop,volumn
*/
@Override
public void process(Object[] arg0) throws HiveException {
String[] input = arg0[0].toString().split(",");
for(String shop_vol:input) {
String[] input_split = shop_vol.split(":");
forward(input_split);
}
}
}
接下来就是创建临时函数或永久函数了。。。
4.1、UDTF使用方法
1、直接在select中使用
(udtfexplode:自定义函数)
select udtfexplode(datetime) as (shop,volume) from test;
2、和lateral view一起使用,执行过程相当于单独执行了两次抽取,然后合并到一个表里。
select datetime,t.shop,t.volume from test lateral view udtfexplode(datetime) t as shop,volume;