学习笔记Hive(七)—— 自定义函数

  • Post author:
  • Post category:其他




一、自定义函数简介



1.1、函数类型


UDF:用户定义函数


UDF操作作用于单个数据行,并且产生一个数据行作为输出。大多数函数都属于这一类(比如数学函数和字符串函数)


UDAF:用户定义聚集函数


UDAF 接受多个输入数据行,并产生一个输出数据行。像COUNT和MAX这样的函数就是聚集函数。


UDTF:用户定义表生成函数


UDTF 操作作用于单个数据行,并且产生多个数据行。比如explode。




二、自定义UDF

定义一个udf,实现根据输入的日期,输出一个时段, 2:00-5:00凌晨,5:00-12:00为上午,12:00-14:00为中午,14:00-17:00为下午,17:00-19:00为傍晚,19:00-23:00为晚上,23:00-2:00为深夜


  • 继承UDF

  • 定义evaluate方法


    在这里插入图片描述

    测试数据:
2019-12-12 10:38:26
2019-12-12 17:00:00
2016-01-22 19:23:22
2018-04-02 03:12:00



2.1、步骤:创建临时函数或永久函数


1、创建临时函数


1.上传自定义udf的jar到Linux (将自定义函数打包成datetime.jar并长传到/opt下)

2.在Hive CLI执行:

add jar /opt/dateudf.jar;


3.在Hive CLI执行:

create temporary function datetotime as 'demo.DateUDF';


(注意:创建临时函数只有退出hive就不能再使用)

在这里插入图片描述

在这里插入图片描述


2、创建永久函数


1.把自定义函数的jar上传到hdfs中

2.创建永久函数:

create function datetotime as 'demo.DateUDF' using jar 'hdfs://master:8020/user/root/dateudf.jar’;


在这里插入图片描述

在这里插入图片描述

在这里插入图片描述




2.2、删除自定义函数

删除函数:

drop [temporary] function [if exists] [dbname.]函数名;

例:删除永久函数(临时的就没必要了,因为一退出就没了)

在这里插入图片描述




三、自定义UDAF


求平均数


1.需继承UDAF;

2.内部静态类需继承UDAFEvaluator抽象类,重写方法

init()



iterate()



terminatePartial()



merge()



terminate()

init() 初始化 一般负责初始化内部字段,通常初始化用来存放最终结果的变量
iterate() 每次都会对一个新的值进行聚合计算时都调用该方法,一般会根据计算结果更新用来存放最终结果的变量,如果计算正确或者输入值合法就返回true
terminatePartial() 这个方法直译过来是"终止部分",部分聚合结果的时候调用该方法 必须返回一个封装了聚合计算当前状态的对象,类似于 MapReduce的combiner 
merge() 接受来自 terminatePartial的返回结果,进行合并,hive合并两部分聚合的时候回调用这个方法
terminate() 终止方法 返回最终聚合函数结果

在这里插入图片描述

测试数据:

1,700,1200
2,1201,1400
3,1401,2000
4,2001,3000
5,3001,9999

自定义函数:

package demo;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class UDAFAverage extends UDAF{
	public static class AvgStat{
		private int mcount;
		private double msum;
	}
public static class AvgEvaluator implements UDAFAverage{
	AvgStat avgStat;
	public AvgEvaluator() {
		super();
		avgStat=new AvgStat();
		init();
}
	@Override
	public void init() {
		avgStat.mcount=0;
		avgStat.msum=0;
	
	}
	/**
	 * 介绍原始数据并进行内部轮转
	 * @return
	 */
	public boolean iterate(Double o) {
		if(o!=null) {
			avgStat.mcount++;
			avgStat.msum+=o;
		}
		return true;
	}
	/**
	 * 接收iterate遍历结束后的输出结果进行初次聚集,类似Combiner
	 * @return
	 */
	public AvgStat terminatePartial() {
		if(avgStat.mcount==0)
		return null;
		else return avgStat;

	}
	public boolean merge(AvgStat otheravgState) {
		if(otheravgState!=null) {
		avgStat.mcount+=otheravgState.mcount;
		avgStat.msum+=otheravgState.msum;
		}
		return true;

	}
	/**
	         * terminate返回最终的聚集函数结果 * * @return
	         */
        public Double terminate() {
	        if(avgStat.mcount==0)
	        	return null;
	        else return avgStat.msum/avgStat.mcount;     
	        }
	}
}

接下来就是创建临时函数或永久函数了。。。




四、自定义UDTF

定义一个UDTF,可以将“商品1:价格1,商品2:价格2”格式的一列数据解析成商品,价格两个字段

1.继承GenericUDTF,实现initialize, process, close三个方法

2.其中initialize方法主要是判断输入类型并确定返回的字段类型。

3.process方法对udft函数输入的每一行进行操作,通过调用forward方法返回一行或多行数据。

4.close方法在process调用结束后调用,用于进行其它一些额外操作,只执行一次。

测试数据:

shop1:20,shop2:30
shop3:40,shop4:30,shop5:10

自定义函数:

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class UDTFExplode extends GenericUDTF{

/**
 * 进行输入类型判断,定义输出字段和类型
 * 辅助类objectInspector帮助使用者访问需要序列化或者反序列化的对象
 */
@Override
	public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
		if(argOIs.length!=1) {
			throw new UDFArgumentLengthException("UDTFExplode takes only one argument");
		}
		if(argOIs[0].getCategory()!=ObjectInspector.Category.PRIMITIVE) {
			throw new UDFArgumentException("UDTFExplode takes string as a parameter");
		}
		ArrayList<String> fieldNames = new ArrayList<String>();  
	    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
	    fieldNames.add("shop");
	    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
	    fieldNames.add("volume");
	    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
	    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
	}

@Override
	public void close() throws HiveException {
	// TODO Auto-generated method stub
	
	}
/**
 * 处理输入的数据:商品1:销量1,商品2:销量2....
 * 返回多行,每行新增两个字段shop,volumn
 */
@Override
	public void process(Object[] arg0) throws HiveException {
		String[] input = arg0[0].toString().split(",");
		for(String shop_vol:input) {
				String[] input_split = shop_vol.split(":");
				forward(input_split);
		}
	
	}
}

接下来就是创建临时函数或永久函数了。。。




4.1、UDTF使用方法


1、直接在select中使用


(udtfexplode:自定义函数)

select udtfexplode(datetime) as (shop,volume) from test;


2、和lateral view一起使用,执行过程相当于单独执行了两次抽取,然后合并到一个表里。

select datetime,t.shop,t.volume from test lateral view udtfexplode(datetime) t as shop,volume;

在这里插入图片描述



版权声明:本文为qq_46485161原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。