Hive QL用户自定义函数

Hive虽然具有丰富的内置函数,但这些内置函数并满足不了所有的业务需求。

在Hive中,除了Hive自带的内置函数之外,用户还可以根据需要自行定义函数。用户自定义函数(UDF)是一个允许用户扩展HQL的强大的功能。

Hive支持以下三种自定义函数:

  • UDF:User-Defined-Function,用户自定义函数,一对一的输入输出。(最常用的)
  • UDTF:User-Defined Table-Generating Functions,用户自定义表生成函数。一对多的输入输出,比如 lateral view explore()。
  • UDAF:User-Defined Aggregation Function,用户自定义聚合函数,多进一出,比如 count/max/min。

创建自定义函数步骤

用户可以使用Java编写自己的UDF,一旦将用户自定义函数加入到用户会话中(交互式的或者通过脚本执行的),它们就将和内置的函数一样使用。

创建用户自定义函数的步骤如下:

  • 编写自定义函数代码;
  • 编译部署;
  • 在hive中注册自定义函数;
  • 在查询中使用自定义函数;
  • 销毁自定义函数;

1. 用户自定义函数 UDF

UDF函数作用于单个数据行,且产生一个数据行作为输出。

定义UDF函数要注意下面几点:

  • 继承org.apache.hadoop.hive.ql.exec.UDF;
  • 重写evaluate(),这个方法不是由接口定义的,因为它可接受的参数的个数,数据类型都是不确定的。Hive会检查UDF,看能否找到和函数调用相匹配的evaluate()方法。

例如,下面我们定义一个字符串大小写转换的UDF:

public class ToLowerOrUpperCase extends UDF {
    public String evaluate(String val, int i) {
        if (StringUtils.isBlank(val)) {
            return "";
        }
        else if (i == 0) {
            return val.toUpperCase();
        }
        else {
            return val.toLowerCase();
        }
    }
}

1)将编写的自定义函数源码编译并打包,例如udf.jar。

2)在hive中,将udf.jar包加载到classpath:

hive> add jar /hivedata/udf.jar;

3)将自定义函数注册为HIVE函数,例如取名为my_case:

hive> create temporary function my_case as 'hive_udf.ToLowerOrUpperCase';

4)在Hive QL查询中就可以应用自定义函数my_case:

hive> select my_case('abac',0);

2. 用户自定义表函数 UDTF

UDTF函数作用于单个数据行,且产生多个数据行作为输出,是一对多的输入输出。

定义UDTF函数要注意下面几点:

  • 继承继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  • 重写initlizer()、process()、evaluate()方法。

例如,下面我们定义一个explode(Map(k,v))转换的UDTF(自定义表函数):

public class MyMapUDTF extends GenericUDTF{

     @Override
     public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
         if (args.length != 1) {
             throw new UDFArgumentLengthException("请传入要解析的列值。");
         }

         ArrayList<String> fieldNameList = new ArrayList<String>();
         ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
         fieldNameList.add("k");     // 字段"k"
         fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
         fieldNameList.add("v");     // 字段"v"
         fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

         return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNameList,fieldOIs);
     }

     @Override
     public void process(Object[] args) throws HiveException {
         String input = args[0].toString();
         String[] kv = input.split(";");
         for(int i=0; i<kv.length; i++) {
             try {
                 String[] result = kv[i].split(":");
                 forward(result);
             } catch (Exception e) {
                 continue;
             }
         }
     }
 }

1)将编写的自定义函数源码编译并打包,例如udtf.jar。

2)在hive中,将udtf.jar包加载到classpath:

hive> add jar /hivedata/udtf.jar;

3)将自定义函数注册为HIVE函数,例如取名为my_map:

hive> create temporary function my_map as 'hive_udf.MyMapUDTF';

4)在Hive QL查询中就可以应用自定义函数my_map:

hive> select my_map("name:张三;age:23;gender:男");

3. 用户自定义聚合函数 UDAF

用户自定义聚合函数UDAF 接受多个输入数据行,并产生一个输出数据行。它实现多对一的输入输出,类似count、sum、max等聚合函数。

定义UDAF函数必须继承自org.apache.hadoop.hive.ql.exec.UDAF,并且包含一个实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator接口的的静态嵌套类。该嵌套类必须实现下面这5个方法:

方法 说明
init() 该方法负责初始化计算函数并重设它的内部状态 。
iterate() 每次对一个新值进行聚合计算时会调用该方法。
terminatePartial() Hive需要部分聚合结果时会调用该方法。
merge() Hive需要将部分聚合结果和另外部分聚合结果合并时会调用该方法。
terminate() 调用该方法返回最终聚合结果。

例如,下面我们定义一个计算最大值的UDAF(自定义聚合函数):

public class MyMaxUDAF extends UDAF {
    public static class MaxIntUDAFEvaluator implements UDAFEvaluator {
        private IntWritable result;

        public void init() {
            result = null;
        }

        public boolean iterate(IntWritable value) {
            if (value == null) {
                return true;
            }
            if (result == null) {
                result = new IntWritable( value.get() );
            } else {
                result.set( Math.max( result.get(), value.get() ) );
            }
            return true;
        }

        public IntWritable terminatePartial() {
            return result;
        }

        public boolean merge(IntWritable other) {
            return iterate( other );
        }

        public IntWritable terminate() {
            return result;
        }
    }
}

1)将编写的自定义函数源码编译并打包,例如udaf.jar。

2)在hive中,将udaf.jar包加载到classpath:

hive> add jar /hivedata/udaf.jar;

3)将自定义函数注册为HIVE函数,例如取名为my_max:

hive> create temporary function my_max as 'hive_udf.MyMaxUDAF';

4)在Hive QL查询中就可以应用自定义函数my_max:

hive> select my_max(age) from employees;

删除自定义函数

如果不再需要自定义函数了,可使用如下语法删除:

drop [temporary] function [if exists] [dbname.]function_name;

例如,要删除注册的自定义函数my_max,可以执行如下Hive QL命令:

hive> drop temporary function if exists my_max;

《Flink原理深入与编程实战》