hive中udtf编写及使用

  • Post author:
  • Post category:其他


1.udtf介绍及编写

1.1.介绍

HIVE中udtf可以将一行转成一行多列,也可以将一行转成多行多列,使用频率较高。本篇文章通过实际案例剖析udtf的编写及使用方法和原理。阅读本篇文章前请先阅读

UDF编写

测试数据

drop table if exists test;
create table test
(
  ind int,
  col string,
  col1 string
) ;
insert into test values (1,'a,b,c','1,2');
insert into test values (2,'j,k',null);
insert into test values (3,null,null) ;

对第一行需要输出如下结果:

Ind Key Value
1 a 1
1 b 2
1 c Null

其它行都要输出类似数据,如果输入数据为null,则没输出。

1.2udtf编写

编写UDTF(User-Defined Table-Generating Functions),需要继承GenericUDTF类,类中部分代码如下:

/**
   * A Generic User-defined Table Generating Function (UDTF)
   *
   * Generates a variable number of output rows for a single input row. Useful for
   * explode(array)...
   */
  public abstract class GenericUDTF {
  ​
    public StructObjectInspector initialize(StructObjectInspector argOIs)
          throws UDFArgumentException {
        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
        ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];
        for (int i = 0; i < inputFields.size(); i++) {
          udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
        }
        return initialize(udtfInputOIs);
    }
    
    /**
       * Give a set of arguments for the UDTF to process.
       *
       * @param args
       *          object array of arguments
       */
    public abstract void process(Object[] args) throws HiveException;
  ​
    /**
       * Called to notify the UDTF that there are no more rows to process.
       * Clean up code or additional forward() calls can be made here.
       */
    public abstract void close() throws HiveException;
  }

继承GenericUDTF需要实现以上方法,其中initialize方法和UDF中类似,主要是判断输入类型并确定返回的字段类型。process方法对udft函数输入的每一样进行操作,通过调用forward方法返回一行或多行数据。close方法在process调用结束后调用,用于进行其它一些额外操作,只执行一次。

package com.practice.hive.udtf;
  ​
  import java.util.List;
  ​
  import com.google.common.collect.Lists;
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
  import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  ​
  /**
   * @author liufeifei
   * @date 2018/06/20
   */
  public class ArrToMapUDTF extends GenericUDTF {
  ​
      private String[] obj = new String[2];
  ​
      /**
       * 返回类型为 String,string
       *
       * @param argOIs
       * @return
       * @throws UDFArgumentException
       */
      @Override
      public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
          List<String> colName = Lists.newLinkedList();
          colName.add("key");
          colName.add("value");
          List<ObjectInspector> resType = Lists.newLinkedList();
          resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
          resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
          // 返回分别为列名 和 列类型
          return ObjectInspectorFactory.getStandardStructObjectInspector(colName, resType);
      }
  ​
      @Override
      public void process(Object[] args) throws HiveException {
          if(args[0] == null) {
              return;
          }
          String arg1 = args[0].toString();
  ​
          String[] arr1 = arg1.split(",");
          String[] arr2 = null;
          if(args[1] != null) {
              arr2 = args[1].toString().split(",");
          }
  ​
          for(int i = 0; i < arr1.length ; i++ ) {
              obj[0] = arr1[i];
              if(arr2 != null && arr2.length > i) {
                  obj[1] = arr2[i];
              } else {
                  obj[1] = null;
              }
              forward(obj);
          }
      }
  ​
      @Override
      public void close() throws HiveException {
      }
  }


2.udtf使用

执行效果

2.1方法一

-- 原始数据
  hive> select * from test;
  OK
  1   a,b,c   1,2
  2   j,k NULL
  3   NULL    NULL
  Time taken: 0.051 seconds, Fetched: 3 row(s)
  ​
  -- 执行效果
  hive> add jar /Users/liufeifei/hive/jar/hive.jar;
  Added [/Users/liufeifei/hive/jar/hive.jar] to class path
  Added resources: [/Users/liufeifei/hive/jar/hive.jar]
  hive> create temporary function get_map as 'com.practice.hive.udtf.ArrToMapUDTF';
  OK
  Time taken: 0.005 seconds
  hive> select get_map(col,col1) from test;
  OK
  a   1
  b   2
  c   NULL
  j   NULL
  k   NULL
  Time taken: 1.008 seconds, Fetched: 5 row(s)

2.2方法二

以上为get_map函数的基本使用方法,该方法局限性为使用时无法引用其它列。结合lateral view关键词使用可以达到预期效果。

hive> select t.ind,t.col,t.col1,t1.key,t1.value
      >   from test t
      > lateral view get_map(col,col1) t1 as key,value;
  OK
  1   a,b,c   1,2 a   1
  1   a,b,c   1,2 b   2
  1   a,b,c   1,2 c   NULL
  2   j,k NULL    j   NULL
  2   j,k NULL    k   NULL
  Time taken: 0.045 seconds, Fetched: 5 row(s)

3.执行原理

针对以上lateral view方法:

该使用方法中涉及到t(test)、t1两张表。lateral view相当于将两张表进行join操作,过程如下:  (

个人理解

)

1. 对t表中数据筛选出


ind

,

col

,

col


1字段

2.对输入的t表中的col,col1列进行udtf操作,将得到的数据集命名为t1,并对列命令为key,value

3.将t表和t1表进行join操作,得到结果数据集

4.如果右表得到的值为空,但是需要保留左表的值,请使用 outer关键词

引用官网原文如下:

片段一:

Lateral view is used in conjunction with user-defined table generating functions such as

explode()

. As mentioned in

Built-in Table-Generating Functions

, a UDTF generates zero or more output rows for each input row.

A lateral view first applies the UDTF to each row of base table and then joins resulting output rows to the input rows to form a virtual table having the supplied table alias.

片段二:

The user can specify the optional

OUTER

keyword to generate rows even when a

LATERAL VIEW

usually would not generate a row.

This happens when the UDTF used does not generate any rows which happens easily with

explode

when the column to explode is empty

. In this case the source row would never appear in the results.


OUTER

can be used to prevent that and rows will be generated with

NULL

values in the columns coming from the UDTF.


文档地址

执行计划如下:

STAGE PLANS:

Stage: Stage-0

Fetch Operator

limit: -1

Processor Tree:

TableScan

alias: t

Statistics: Num rows: 3 Data size: 26 Basic stats: COMPLETE Column stats: NONE

Lateral View Forward

Statistics: Num rows: 3 Data size: 26 Basic stats: COMPLETE Column stats: NONE

Select Operator

expressions: ind (type: int), col (type: string), col1 (type: string)

outputColumnNames: ind, col, col1

Statistics: Num rows: 3 Data size: 26 Basic stats: COMPLETE Column stats: NONE

Lateral View Join Operator

outputColumnNames: _col0, _col1, _col2, _col6, _col7

Statistics: Num rows: 6 Data size: 52 Basic stats: COMPLETE Column stats: NONE

Select Operator

expressions: _col0 (type: int), _col1 (type: string), _col2 (type: string), _col6 (type: string), _col7 (type: string)

outputColumnNames: _col0, _col1, _col2, _col3, _col4

Statistics: Num rows: 6 Data size: 52 Basic stats: COMPLETE Column stats: NONE

ListSink

Select Operator

expressions: col (type: string), col1 (type: string)

outputColumnNames: _col0, _col1

Statistics: Num rows: 3 Data size: 26 Basic stats: COMPLETE Column stats: NONE

UDTF Operator

Statistics: Num rows: 3 Data size: 26 Basic stats: COMPLETE Column stats: NONE

function name: com.ffl.study.hive.udtf.ArrToMapUDTF@6b7a0f18

Lateral View Join Operator

outputColumnNames: _col0, _col1, _col2, _col6, _col7

Statistics: Num rows: 6 Data size: 52 Basic stats: COMPLETE Column stats: NONE

Select Operator

expressions: _col0 (type: int), _col1 (type: string), _col2 (type: string), _col6 (type: string), _col7 (type: string)

outputColumnNames: _col0, _col1, _col2, _col3, _col4

Statistics: Num rows: 6 Data size: 52 Basic stats: COMPLETE Column stats: NONE

ListSink

打开源码 LateralViewJoinOperator和LateralViewForwardOperator ,可以看到如下注释

LateralViewJoinOperator
/**
 * The lateral view join operator is used for FROM src LATERAL VIEW udtf()...
 * This operator was implemented with the following operator DAG in mind.
 *
 * For a query such as
 *
 * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS
 * adid
 *
 * The top of the operator DAG will look similar to
 *
 *            [Table Scan]
 *                |
 *       [Lateral View Forward]
 *              /   \
 *   [Select](*)    [Select](adid_list)
 *            |      |
 *            |     [UDTF] (explode)
 *            \     /
 *      [Lateral View Join]
 *               |
 *               |
 *      [Select] (pageid, adid.*)
 *               |
 *              ....
 *
 * Rows from the table scan operator are first to a lateral view forward
 * operator that just forwards the row and marks the start of a LV. The
 * select operator on the left picks all the columns while the select operator
 * on the right picks only the columns needed by the UDTF.
 *
 * The output of select in the left branch and output of the UDTF in the right
 * branch are then sent to the lateral view join (LVJ). In most cases, the UDTF
 * will generate > 1 row for every row received from the TS, while the left
 * select operator will generate only one. For each row output from the TS, the
 * LVJ outputs all possible rows that can be created by joining the row from the
 * left select and one of the rows output from the UDTF.
 *
 * Additional lateral views can be supported by adding a similar DAG after the
 * previous LVJ operator.   // 后面还可以接join操作
 */
LateralViewForwardOperator   // 用于谓词下推判断
/**
 * LateralViewForwardOperator. This operator sits at the head of the operator
 * DAG for a lateral view. This does nothing, but it aids the predicate push
 * down during traversal to identify when a lateral view occurs.
 *
 */

可以看到lateral view 和join操作类似。另外lateral view也支持

谓词下推

,具体源码待以后有需要再深入研究 ^~^

说明:笔者在其它平台用大表数据测试,方法一只开启了map任务;方法二同时开启了map和reduce任务;本地电脑由于数据量较小,没有开启mr任务,因此无法看到效果;


代码地址



版权声明:本文为u012485099原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。