最近学习中遇到一个编写HIVE UDAF函数的问题,最后没弄明白怎么写的,后来看到别人写的UDAF函数后深受启发,所以在这里记录一下UDAF的编写过程。
UDAF简介
UDAF是Hive中用户自定义的聚合函数,Hive内置的UDAF函数有sum()和count(),不过这在实际开发中根本不够用,用户有时候希望能够自定义一些聚合函数来注册使用,Hive也考虑到了这个问题,因此提供了GenericUDAF供用户继承和改写,主要有两个抽象类:
1 | org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver |
抽象类简介
Hive最终也是将SQL转化成MapReduce程序执行的,只不过Hive隐藏了转化过程,只提供SQL接口给用户,在编写自定义聚合函数时就需要用户熟悉MapReduce的具体过程,并控制不同阶段程序的执行逻辑。UDAF函数先读取数据,也就是mapper过程,汇聚后(combine),最后将数据聚合(Reduce)阶段处理,然后将最终结果输出。
AbstractGenericUDAFResolver
这个抽象类的作用主要是指定sql传入的参数要交给那个Evaluator进行处理,需要覆盖实现public GenericUDAFEvaluator getEvaluator(TypeInfo[] info)
这个方法。
GenericUDAFEvaluator
UDAF的主要逻辑在GenericUDAFEvaluator
这里,其中关键是理解Evaluator是怎么处理数据的,这就牵扯到了objectInspector和Model类。
objectInspector是用于解耦数据使用与数据格式的,使得数据流在输入输入端能够切换成不同的格式,不同的处理阶段使用不同的数据格式。
Model类是GenericUDAFEvaluator中的一个内部类,定义了不同的处理阶段:
1 | public static enum Mode { |
上面的四个过程其实跟MapReduce的数据处理过程息息相关,从数据到达map阶段开始,就是进入了PARTIAL1阶段,这个时候会调用自己实现的接口iterate()
方法和terminatePartial()
方法,iterate()
方法是对每条数据进行相应处理,terminatePartial()方法是返回map和combine处理完成后的中间结果,我们在PARTIAL2阶段也会调用terminatePartial()
方法。这个阶段用户可以根子需要将输入数据转换成想要的类型输出。
每个map处理完成后根据需要可能会进入combine阶段,也就是PATTIAL2阶段,这个阶段中主要是进行数据到合并,也就是将部分聚合数据合并,组成一个更大的部分聚合数据,然后交给reduce阶段。这个阶段调用的用户自己实现的函数是merge()
和terminatePartial()
。这个阶段输入什么类型的数据合并之后就输出什么类型的数据。
map和combine都完成后,就进入到了reduce阶段,也就是FINAL阶段,这个阶段主要是对部分聚合数据进行全聚合处理,然后根据业务需要,把想要的数据提取出来,以要想的格式返回出来就可以了,这个阶段调用的函数是merge()
和terminate()
方法。这个阶段的输入数据就是PARTIAL2阶段的输出数据,用户可以根据业务需要指定输出类型。
还有一种情况是不需要reduce的操作,比如最终想要的数据类型跟输入的数据类型一致,这时只需要进行map操作就可以了,这时就只调用iterate()
和terminate()
就可以了。
GenericUDAFEvaluator方法
上面说到的iterate()
, terminatePartial()
, merge()
, terminate()
这些方法都是在GenericUDAFEvaluator中定义的抽象方法,另外还有两个抽象方法也要实现:getNewAggregationBuffer
和reset
,分别用于创建保存数据聚合结果的实例和重置聚合结果。
1 | // 确定各个阶段输入输出参数的数据格式ObjectInspectors |
引用一下其他人的图,下面是Model中各个阶段调用函数的情况:
这是各个阶段中参数的类型及传递情况,理解好数据是怎么流动的很关键,基本上知道怎么流动的就知道怎么处理了:
UDAF需求
这里的需求是编写一个Hive自定义聚合函数,用来计算用户数据中当前时刻(time_tag)前30分钟内用户的行为(active_name)。
数据库中数据格式
Hive中的数据存放在一个叫做bigdata.weblog的表中,表格式如下:
1 | hive> desc weblog; |
里面的数据内容大概是这样的:
1 | hive> select * from weblog limit 5; |
我们只需要根据time_tag,将取出的active_name根据最后一条time_tag的大小往前推30分钟,将30分钟内这个区间段内的active_name放到一个列表中返回即可。
代码编写
1 | package com.bigdata.etl.udaf; |
代码编写过程中遇到小坑,是自己没有认真思考造成的,我在最初传入参数时是先传入active_name后传入time_tag的,而且是以active_name作为terminate()
中Map的key来处理的数据,结果就是最终返回的active_name只有三种类型:pageview
, order
, pay
。这是必然的,因为在merge()
中往myagg.container中put数据时,active_name重复的话会覆盖之前相同的key下的value,所以不管怎样最终只有三个数据输出啦,(⊙﹏⊙)b。
接下来启动hive,将jar包添加进去,创建临时函数,使用临时函数获取数据:
1 | add jar /mnt/home/1015146591/jars/etl-1.0-jar-with-dependencies.jar; |
最终结果:
1 | hive> create temporary function collect_list as 'com.bigdata.etl.udaf.UDAFCollectAction'; |
总结
这是UDAF的典型使用方法,了解UDAF开发的关键是要理解MapReduce的数据处理过程以及hive的GenericUDAFEvaluator
这个抽象类是怎么定义数据处理流程的,其中数据的流动是怎么在具体的方法中传递的是最关键的点,了解了Model中各个阶段与MapReduce的对应关系以及函数调用关系,就可以根据不同的也无需要编写任何你想要的UDAF函数了。