用户定义函数
你要写的查询有时无法轻松(或根本不能)使用Hive提供的内置函数来表示。通过 编写“用户定义函数”(user-defined function, UDF),有Hive,插入用户写的处理 代码并在查询中调用它们,变得更简单。
UDF必须用化抑语言编写。Hive本身也是用Java写的。对于其他编程语言,可以 考虑使用SELECT TRANSFORM查询,有了它,可以让数据流式通过用户定义脚本。
Hive中有三种UDF:(普通)UDF、UDAF(用户定义聚集函数,user-defined aggregate function)以及UDTF(用户定义表生成函数,user-defined table-generating function)。 它们接受输入和产生输出的数据行在数量不同。
和其他两种类型相比,表生成函数的知名度较低。所以让我们来看一个示例。考虑 这样一个表,它只有一列X,包含的是字符串数组。回头看看表的定义和填充方式 是很有启发的:
CREATE TABLE arrays (x ARRAY) ROW FORMAT DELIMITEDFIELDS TERMINATED BY '\001'COLLECTION ITEMS TERMINATED BY '\002';
注意,ROW FORMAT子句指定数组中的项用Control-B字符分隔。我们要加载的示例文件内容如下,为了显示方便,用B^表示Control-B字符:
a^Bbc^Bd^Be
在运行LOAD DATA以命令以后,下面的查询可以确认数据已正确加载:
hive > SELECT *FROM arrays; [”a”,"b"][”c”,”d”,”e”]
接下来,我们可以使用explode UDTF对表进行变换。这个函数为数组中的每一项输出一行。因此,在这里,输出的列y的数据类型为STRING。其结果是,表 被“平面化”(flattened)成五行:
hive > SELECT explode(x) AS y FROM arrays;abcde
带UDTF的SELECT语句在使用时有一些限制(例如不能检索额外的列表达式),使 实际使用时这些语句的用处并不大。为此,Hive支持LATERAL VIEW查询,后者 更强大^这里不介绍LATERAL VIEW查询。相关详情可访问http://wiki.apache.org/ hadoop/Hive/LcmguageMarmal/LatendVieWo
编写UDF
为了演示编写和使用UDF的过程,我们将编写一个简单的剪除字符串尾字符的 UDF。Hive已经有一个内置的名为trim的函数,所以我们把自己的函数称为 strip。Strip Java类的代码如例12-2所示。
例12-2.剪除字符串尾字符的UDF
package com.hadoopbook.hive; import org.apache.commons.lang.StringUtils; importorg.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text;public class Strip extends UDF { private Text result = new Text();public Text evaluate(Text str) { if (str == null) { return null;}result.set(StringUtils.strip(str.toString())); return result;public Text evaluate(Text str, String stripChars) { if (str == null) { return null;}result.set(StringUtils.strip(str.toString(), stripChars)); return result;}}
一个UDF必须满足下面两个条件。
一个 UDF 必须是 org.apache.hadoop.hive.ql.exec.UDF 的子类。
一个UDF必须至少实现evaluate()方法。
evaluate()方法不是由接口定义的,因为它可接受的参数的个数、它们的数据类 型及其返回值的数据类型都是不确定的。Hive会检查UDF,看能否找到和函数调 用相匹配的evaluate()方法。
这个Strip类有两个evaluate()方法。第一个方法去除输入的前导和结束的空白 字符;而第二个方法则去除字符串尾出现在指定字符集合中的任何字符。实际的字 符处理工作交由Apache Commons项目里的StringUtils类来完成。所以代码中 唯一值得一提的是对Hadoop Writable库中Text的使用。Hive实际支持在UDF中 使用Java的基本类型(以及其他一些像java.util.List和java.util.Map这样 的类型)。所以,下面这样的函数签名(signature)的效果是一样的:
public String evaluate(String str)
但是,通过使用Text,我们可以利用对象重用的优势,增效节支。因此一般推荐 使用这种方法。
为了在Hive中使用UDF,我们需要把编译后的Java 类打包(package)成一个 JAR 文件(可以用本书所附代码输入ant hive来完成),并在出代中注册这个文件:
ADD JAR /path/to/hive-examples.jar;
我们还需要为Java的类名起一个别名:
CREATE TEMPORARY FUNCTION strip AS 'com.hadoopbook.hive.Strip';
这里的TEMPORARY关键字强调了这样的事实:UDF只是为这个Hive会话过程定 义的(它们并没有在metastore中持久化存储)。事实上,这意味着你需要在每个脚 本或会话的最开始添加JAR文件并定义函数。
现在UDF可以像内置函数一样使用:
hive> SELECT strip('bee') FROM dummy; beehive> SELECT strip('banana ', 'ab') FROM dummy;nan
注意,UDF名不是大小写敏感的:
hive> SELECT STRIP('bee') FROM dummy; bee
编写UDAF
聚集函数比普通的UDF难写得多,因为值是在块内进行聚集的(这些块可能分布在 很多Map或Reduce任务中),从而实现时要能够把部分的聚集值组合成最终结 果。实现此功能的代码最好用示例来进行解释。所以,让我们来看一个简单的UDAF的实现,它用于计算一组整数的最大值(例12-3)。
例12-3.计算一组整数中最大值的UDAF
package com.hadoopbook.hive; import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;import org.apache.hadoop.io.IntWritable;public class Maximum extends UDAF {public static class MaximumIntUDAFEvaluator implements UDAFEvaluator {private IntWritable result;public void init() { result = null;public boolean iterate(IntWritable value) { if (value == null) { return true;}if (result == null) { result = new IntWritable(value.get());} else {result.set(Math.max(result.get(), value.get()));}return true;}public IntWritable terminatePartial() { return result;}public boolean merge(IntWritable other) { return iterate(other);}public IntWritable terminate { return result;}}}
这个类的结构和UDF的稍有不同,UDAF必须是org.apache.hadoop.hive.ql.exec.UDAF(注意UDAF中的"A")的子类,且包含一个或多个嵌套的、实现 了 org.apache.hadoop.hive.ql.exe的静态类。在这个示例中,只有一个嵌套 类,MaximumlntUDAFEvaluator。但是我们也可以添加更多的计算函数(如 MaximumLongUDAFEvaluator 和MaximumFloatUDAFEval)来提供计算长整型、 浮点型等类型数最大值的UDAF的重载。
一个计算函数必须实现下面这5个方法(处理流程如图12-4所示)。
init()
init()方法负责初始化计算函数并重设它的内部状态。在MaximumlntUDAFEvaluator 中,我们把存放最终结果的IntWritable对象设为null。我们使用null来 表示目前还没有对任何值进行聚集计算,这和对空集NULL计算最大值应有的 结果是一致的。
iterate()
每次对一个新值进行聚集计算时都会调用iterate()方法。计算函数要根据 聚集计算的结果更新其内部状态。iterate()接受的参数和Hive中被调用函 数的参数是对应的。在这个示例中,只有一个参数。方法首先检查参数值是否 为null,如果是,则将其忽略。否则,result变量实例就被设为这个参数的 整数值(如果这是方法第一次接受输入),或设为当前值和参数值中的较大值 (如果已经接受过一些值)。如果输人值合法,我们就让方法返回ture。
terminatePartial()
Hive需要部分聚集结果时会调用terminatePartial()方法。这个方法必须 返回一个封装了聚集计算当前状态的对象。在这里,使用IntWritable对已 知的最大值或在没有值时的空值进行封装即可。
merge()
在Hive决定要合并一个部分聚集值和另一个部分聚集值时会调用merge()方法。该方法接受一个对象作为输入。这个对象的类型必须和terminatePartial() 方法的返回类型一致。在这个示例里,merge()方法可以直接使用iterate() 方法,因为部分结果的聚集和原始值的聚集的表达方法是相同的。但一般情况 下不能这样做(我们后面会看到更普遍的示例),这个方法实现的逻辑会合并计 算函数和部分聚集的状态。
图12-4.包含UDAF部分结果的数据流
terminate
Hive需要最终聚集结果时会调用terminate()方法。计算函数需要把状态作 为一个值返回。在这里,我们返回实例变量result。 现在让我们来执行这个新写的函数:
hive> CREATE TEMPORARY FUNCTION maximum AS' com.hadoopbook.hive.Maximum'; hive> SELECT maximum(temperature) FROM records;110
一个更复杂的UDAF
前面的示例有一个特别的现象:部分聚集结果可以使用和最终结果相同的类型 (IntWritable)来表示。对于更复杂的聚集函数,情况并非如此。考虑一个计算一 组double类型值均值的UDAF,就可以看出这一点。从数学角度来看,要把两个 部分的均值合并成最终的均值是不可能的。作为替 代,我们可以用一对数——目前已经处理过的double值的和以及目前已经处理过 的数的个数一来表示部分聚集结果。
这个思路在UDAF中的实现如例12-4所示。注意,部分聚集结果用一个嵌套的静 态类struct实现,类名是PartialResult,由于我们使用了 Hive能够处理的字段 类型(Java原子数据类型),所以Hive足够“聪明”,能够自己对这个类进行序列 化和反序列化。
在这个示例中,merge()方法和iterate()方法不同。它把“部分和”(partialsum) 和“部分计数值”(partial count)分别进行加法合并。此外,terminatePartial() 的返回类型为PartialResult这个类型当然不会给调用函数的用户看到terminate()的返回类型则是最终用户可以看到的DoubleWritable。
例12-4.计算一组double值均值的UDAF
package com.hadoopbook.hive;import ong.apache.hadoop.hive.ql.exec.UDAF;import ong.apache.hadoop.hive.ql.exec.UDAFEvaluaton;import org.apache.hadoop.hive.serde2.io.DoubleWritable;public class Mean extends UDAF {public static class MeanDoubleUDAFEvaluaton implements UDAFEvaluator { public static class PartialResult { double sum; long count;}private PartialResult partial;public void init() { partial = null;public boolean iterate(DoubleWritablG value) { if (value == null) { return true;}if (partial == null) {partial = new PartialResult();}partial.sum += value.get(); partial.count++; return true;public PartialResult terminatePartial() { return partial;public boolean merge(PartialResult other) { if (other == null) { return true;}if (partial == null) {partial = new PartialResult();}partial.sum +- other.sum; partial.count += other.count; return true;public DoubleWritable terminate() { if (partial == null) { return null;}return new DoubleWritable(partial.sum / partial.count);}}}