Hive创建UDAF函数

  • Post author:
  • Post category:其他




Hive创建UDAF函数



1. 在之前的maven工程上重新创建一个java类

package com.chinasofti.hive.udf;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

import java.util.ArrayList;
import java.util.List;

/**
 * 实现MySQL中的GROUP_CONCAT 函数的功能,就是Oracle中list函数
 */
public class TestUDAF extends AbstractGenericUDAFResolver {

    // 创建LOG对象,用来写入警告和错误到hive的log。
    static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());

    public TestUDAF(){
    }

    //重写一个方法:getEvaluator,它根据SQL传入的参数类型,返回正确的evaluator。
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {

        if(parameters.length != 1){
            throw new UDFArgumentTypeException(parameters.length - 1,"Exactly one argument is expected.");
        }
        // 判断是不是Hive支持的原始类型
        if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
            throw new UDFArgumentTypeException(0,
                    "Only primitive type arguments are accepted but "
                            + parameters[0].getTypeName() + "was passed as parameter 1.");
        }
        //GenericUDAFMkCollectionEvaluator 的父类是GenericUDAFEvaluator,
        // 用于Hive的通用用户定义聚合函数(GenericUDAF)。
        // 它的父类在下面初始化
        return new GenericUDAFMkCollectionEvaluator();
    }

    public static class GenericUDAFMkColistEvaluator extends GenericUDAFEvaluator{

        private PrimitiveObjectInspector inputOI;
        private StandardListObjectInspector loi;
        private StandardListObjectInspector internalMergeOI;

        // Hive会调用此方法来初始实例化一个UDAF evaluator 类
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);

            if(m == Mode.PARTIAL1){
                inputOI = (PrimitiveObjectInspector) parameters[0];
                return ObjectInspectorFactory.getStandardListObjectInspector(
                        (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(inputOI)
                );
            }else{
                if(!(parameters[0] instanceof StandardListObjectInspector)){
                    inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils.
                            getStandardObjectInspector(parameters[0]);
                    return (PrimitiveObjectInspector) ObjectInspectorFactory.getStandardListObjectInspector(inputOI);
                }else{
                    internalMergeOI = (StandardListObjectInspector) parameters[0];
                    inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();
                    loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
                    return loi;
                }
            }
        }

        // 自定义函数,对象重用
        static class MKArrayAggregationBuffer implements AggregationBuffer{
            List<Object> container;
        }

        // 自定义函数,完成迭代功能
        private void putIntoList(Object p,MKArrayAggregationBuffer myagg){
            Object pCopy = ObjectInspectorUtils.copyToStandardObject(p,this.inputOI);
            myagg.container.add(pCopy);
        }

        // 用于返回存储临时聚合结果的 GenericUDAFEvaluator.AggregationBuffer 对象
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            MKArrayAggregationBuffer ret = new MKArrayAggregationBuffer();
            reset(ret);
            return ret;
        }

        // 重置聚合,该方法在重用相同的聚合时很有用
        public void reset(AggregationBuffer agg) throws HiveException {
            ((MKArrayAggregationBuffer) agg).container = new ArrayList<Object>();

        }

        // 迭代parameters表示的原始数据,并保存到 agg 中
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            assert (parameters.length == 1);
            Object p = parameters[0];

            if(p != null){
                MKArrayAggregationBuffer myagg = (MKArrayAggregationBuffer) agg;
                putIntoList(p,myagg);
            }
        }

        // 以持久的方式返回 agg 表示部分聚合结果,这里的持久化意味着返回值只能是Java基础类型、数组、基础类型包装器、Hadoop的writables、Lists和Maps,
        // 即便实现了java.id.Serializable 也不要使用自定义的类。
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            MKArrayAggregationBuffer myagg = (MKArrayAggregationBuffer) agg;
            ArrayList<Object> ret = new ArrayList<Object>(myagg.container.size());
            ret.addAll(myagg.container);
            return ret;
        }

        // 合并由partial 表示的部分聚合结果到 agg
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            MKArrayAggregationBuffer myagg = (MKArrayAggregationBuffer) agg;
            ArrayList<Object> partialResult = (ArrayList<Object>)internalMergeOI.getList(partial);
            for (Object i:partialResult) {
                putIntoList(i,myagg);
            }
        }

        // 返回由 agg 表示的最终结果
        public Object terminate(AggregationBuffer agg) throws HiveException {
            MKArrayAggregationBuffer myagg = (MKArrayAggregationBuffer) agg;
            ArrayList<Object> ret = new ArrayList<Object>(myagg.container.size());
            ret.addAll(myagg.container);
            return ret;
        }
    }

}



2. 将写好的程序打包

[外链图片转存失败(img-oXtwUXOT-1568774191707)(D:\学习笔记\hadoop\保存图片\HiveUDF\07打包.jpg)]



3. 上传至Linux系统上

[外链图片转存失败(img-OPCwqbsP-1568774191709)(D:\学习笔记\hadoop\保存图片\HiveUDAF函数\01上传至Linux.jpg)]



4. 连接上Hive

这里直接在node1上连接,不通过node-3来连接。

[外链图片转存失败(img-G7MzcHeK-1568774191709)(D:\学习笔记\hadoop\保存图片\HiveUDF\09连接上Hive.jpg)]



5. 使用命令,将jar包添加进Hive中

add JAR /export/data/hivedata/example-hiveudf-3.jar;

[外链图片转存失败(img-VTfY4vuc-1568774191710)(D:\学习笔记\hadoop\保存图片\HiveUDAF函数\02将jar包添加进Hive.jpg)]



6. 创建一个临时函数与添加进来的jar包进行关联

create temporary function collect as 'com.chinasofti.hive.udf.TestUDAF';

[外链图片转存失败(img-z7Jzj1lV-1568774191711)(D:\学习笔记\hadoop\保存图片\HiveUDAF函数\03定义一个函数名与刚添加的jar包对应.jpg)]



7. 测试一下

我靠,忍不了了,这玩意要跑一个MapReduce…

突然觉得开启本地模式也是一件好事 0.0



参考:


Hive API



版权声明:本文为xxydzyr原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。