Hadoop教程

用户自定义函数

Pig设计者认识到以插件形式提供使用用户定制代码的能力是一个关键问题,但这 也是数据处理中最琐碎的工作。因此,他们的设计简化了用户自定义函数的定义 简单。

过滤UDF

我们通过写一个过滤不满足要求的气温质量读数的天气记录来演示如何编写过滤函 数。我们的基本想法是修改下面这行代码:

filtGred_rGCords = FILTER records BY temperature != 9999 AND(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);

修改后的代码如下:

filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);

这样有两个好处:这样写可以使Pig脚本更精简;而且这样还封装了处理逻辑,以 便轻松重用于其他脚本。如果只是编写一个即时查询,我们可能不需要麻烦地写一个UDF。只有需要不断进行相同的处理时,才需要如此写可重用的UDF。

UDF用Java编写。所有的过滤函数都是FilterFunc的子类,FilterFunc本身 是EvalFunc的子类。我们后面会对EvalFunc进行更详细的介绍。在这里,我们 只需要知道EvalFunc基本上就像下面的类一样:

public abstract class EvalFunc
 
   {public abstract T exec(Tuple input) throws IOException;}
 

EvalFunc只有一个抽象方法exec()。它的输入是一个元组,输出则只有一个 值,(参数化)类型为T。输入元组的字段包含传递给函数的表达式一在这个例子 里,它是一个整数。对于FilterFunc, T是Boolean类型的,对于那些不应该被 过滤掉的元组,该方法应该返回ture。

对于例子中的质量过滤器,我们要写一个IsGoodQuality类,扩展FilterFunc 并实现exec()。见例11-1。Tuple类本质上是一个与某个类型关联的对象列表。 在这里,我们只关心第一个字段(因为函数只有一个参数)。我们用Tuple的get() 方法,根据序号来获取这个字段。该字段的类型是整型。因此,如果它非空,我们 就对它进行类型转换,检查它是否表示气温读数是正确的,并根据检查结果返回相 应的值:true或false。

例11-1.这个FiterFunc UDF删除包含不符合质量要求的气温读数记录

package com.hadoopbook.pig;import java.io.IOException; import java.util.ArrayList; import java.util.List;import org.apache.pig.FilterFunc;import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple;import org.apache.pig.impl.logicalLayer.FrontendException; public class IsGoodQuality extends FilterFunc {@Overridepublic Boolean exec(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { return false;}try {Object object = tuple.get(0); if (object == null) {return false;}int i = (Integer) object;return i== 0 || i == 1 || i == 4 || i == 5 || i == 9; } catch (ExecException e) { throw new IOException(e);}}}

为了使用新函数,我们首先进行编译,并把它打包到一个JAR文件(在本书所附的 示例代码中,我们的做法是输入ant pig)。然后,我们通过RIGISTER操作指定 文件的本地路径(不带引号),告诉Pig这个JAR文件的信息:

grunt> REGISTER pig-examples.jar;

最后,我们就可以调用这个函数:

grunt> filtered_records = FILTER records BY temperature != 9999 AND >> com.hadoopbook.pig.IsGoodQuality(quality);

Pig把函数名作为Java类名,并试图用该类名来加载类以完成函数调用。(这也就 是为什么函数名是大小写敏感的,因为Java类名是大小写敏感的。)在搜索类的时 候,Pig使用包含已注册JAR文件的类加载器。运行于分布式模式时,Pig会确保 将JAR文件传输到集群。

对于本例中的UDF,Pig使用com.hadoopbook,pig.IsGoodQuality名称进行查 找,能在我们注册的JAR文件中找到它。

内置函数的解析也使用同样的方式处理。内置函数和UDF的处理有两个区别:Pig 会搜索一组内置包。因此,对于内置函数的调用并不一定要提供完整的名称。例 如,函数MAX实际上是由包org.apache.pig.builtin中的类MAX实现.的。这也 是Pig捜索的内置包,所以我们在Pig程序中可以使用MAX而不需要用 org.apache.pig.builtin.MAX。

不能在Pig中注册自己的包,但是我们可以使用DEFINE操作为函数定义别名,以 缩短函数名:

grunt> DEFINE isGood com.hadoopbook.pig.IsGoodQuality();grunt> filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);

需要在一个脚本里多次使用一个函数时,为函数定义别名是一个好办法。如果要向 UDF的实现类传递参数,必须定义别名。

只有在质量字段的类型定义为int时,前面定义的过滤器才能正常工作。但如果 没有类型信息,这个UDF就不能正常处理。因为此时该字段的类型是默认类型 bytearray,表示为 DataByteArray 类。因为 DataByteArray 不是整型,因此 类型转换失败。

修正这一问题最直接的办法是在exec()方法中把该字段转换成整型。但更好的办 法是告诉Pig该函数所期望的各个字段的类型。EvalFunc为此提供了getArgToFuncMapping()方法。我们可以重载这个方法来告诉Pig第一个字段应 该是整型。

@Overridepublic List<FuncSpec> getArgToFuncMapping() throws FrontendException { List<FuncSpec> funcSpecs = new ArrayList<FuncSpec> (); 	funcSpecs.add(new FuncSpec(this.getClass().getName(),new Schema(new Schema.FieldSchema(null,DataType.INTE6ER))));return funcSpecs;}

这个方法返回一个FuncSpec对象,后者对应于传递给exec()方法的那个元组的 每个字段。在这个例子里,只有一个字段。我们构造一个匿名FieldSchema(因为 Pig在进行类型转换时忽略其名称,因此其名称以null传递)。该类型使用Pig的 DataType类的常量INTEGER进行指定。

使用这个修改后的函数,Pig将尝试把传递给函数的参数转换成整型。如果无法转 换这个字段,则把这个字段传递为null。如果字段为null,exec()方法返回的 结果总是false。在这个应用中,因为我们想要在过滤掉其质量字段包含无法理解 的记录,因此这样做很合适。

以下是使用这个新函数的最终程序:

--max_temp_filter_udf.pig REGISTER pig-examples.jar;DEFINE isGood com.hadoopbook.pig.IsGoodQuality(); records = LOAD finput/ncdc/micro-tab/sample.txt,AS (year:chararray, temperature:int, quality:int); filtered_records = FILTER records BY temperature != 9999 AND isGood(quality); grouped_records = GROUP filtered_records BY year; max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);DUMP max_temp;

计算UDF

写计算函数比写过滤函数的步骤要稍微多一些(见例11-2)。让我们考虑写一个 UDF,它类似于java.lang.String中trim()方法,从chararray值中去掉开 头和结尾的空白符。我们将在本章稍后用到这个函数。

例11-2.这个EvalFunc UDF从chararray值中去除开头和结尾的空白符 public

public class Trim extends EvalFunc
 
   {@Overridepublic String exec(Tuple input) throws IOException { if (input == null || input.size() == 0) { return null;}try {Object object = input.get(0); if (object == null) { return null;}return ((String) object).trim();} catch (ExecException e) { throw new IOException(e);}}@Overridepublic List<FuncSpec> getArgToFuncMapping() throws FrontendException { List
  
    funcList = new ArrayList<FuncSpec>(); funcList.add(new FuncSpec(this.getClass().getName(), new Schema( new Schema.FieldSchema(null, DataType.CHARARRAY))));return funcList;}}
  
 

要写一个计算函数,需要参数化返回值的类型,扩展EvalFunc类(在Trim UDF 中,该类型为String)和IsGoodQuality UDF中的方法一样,exec()和方法 都很直观。 在写计算函数的时候,需要考虑输出模式。在下面的语句中,B的模式由函数^子 决定:

B = FOREACH A GENERATE udf($0)

如果udf创建了有标量字段的元组,那么Pig可以通过“反射”(reflection)确定B 的模式。对于复杂数据类型,例如包(bag)、元组或映射,Pig需要更多的信息。此 时需要实现outputSchema()方法将输出模式的相关信息告诉Pig。

Trim UDF返回一个字符串。Pig把返回值翻译为chararray类型。从以下会话可 以看出:

grunt> DUMP A;(pomegranate)(banana )(apple)(lychee ) grunt> DESCRIBE A;A: {fruit: chararray}grunt> B = FOREACH A GENERATE com.hadoopbook.pig.Trim(fruit); grunt> DUMP B;(pomegranate)(banana)(apple)(lychee)grunt> DESCRIBE B;B: {chararray}

A包含的chararray字段中有开头和结尾的空白符。我们将Trim函数应用于A 的第一个字段(名为fruit),从而创建B。B的字段被正确推断为chararray。


加载UDF

我们将演示一个定制的加载函数,该函数可以过指定纯文本的列的范围定义字段, 和Unix的cut命令非常类似。它的使用方式如下:

grunt> records = LOAD 'input/ncdc/micro/sample.txt'>> USING com.hadoopbook.pig.CutLoadFunc('16-19,88-92,93-93,)>> AS (year:int, temperature:int, quality:int); grunt> DUMP records;(1950,0,1)(1950,22,1)(1950,-11,1)(1949jlll,l)(1949,78,l)

传递给CutLoadFunc的字符串是对列的说明:每一个由逗号分隔的范围定义一个 字段。字段的名称和数据类型通过AS子句进行定义。让我们来看例11-3给出的 CutLoadFunc 的实现:

例11-3.该加载函数以列范围作为字段加载元组

public class CutLoadFunc extends LoadFunc {						private static final Log LOG : LogFactory.getLog(CutLoadFunc.class); private final List
 
   ranges;private final TupleFactory tupleFactory = TupleFactory.getInstance(); private RecordReader reader;public CutLoadFunc(String cutPattern) { ranges = Range.parse(cutPattern);}@Overridepublic void setLocation(String location, Dob job)throws IOException {FileInputFormat.setInputPaths(job, location);}@Overridepublic InputFormat getInputFormat() { return new TextInputFormat();}@Overridepublic void prepareToRead(RecordReader reader, PigSplit split) { this.reader = reader;}@Overridepublic Tuple getNext() throws IOException { try {if (1reader.nextKeyValue()) { return null;}Text value = (Text) reader.getCurrentValue();String line = value.toString();Tuple tuple = tupleFactory.newTuple(nanges.size()); for (int i = 0; i < ranges.size(); i++) {Range range = ranges.get(i); if (range.getEnd() > line.length()) { LOG.warn(String.format(” Range end (%s) is longer than line length (%s)w , range.getEnd(), line.length())); continue;}tuple.set(i, new DataByteArray(range.getSubstring(line)));}return tuple;} catch (InterruptedException e) { throw new ExecException(e);}}}
 

和Hadoop类似,Pig的数据加载先于mapper的运行,所以保证数据可以被分割成 能被各个mapper独立处理的部分非常重要。 从Pig 0.7.0开始(即本书所使用的版本),加载和存储函数接口已经进行了大幅修 改,以便与 Hadoop 的 InputFormat 和 OutputFormat 类基本一致。

针对Pig以前版本的函数需要重写(重写指南可从http:"wiki.apache.org/pig/ LoadStoreMigrationGuide 获得)。LoadFunc 一般使用底层已有的 lnputFormat 来 创建记录,而LoadFunc自身则提供把返回记录变为Pig元组的程序逻辑。

CutLoadFunc类使用说明了每个字段列范围的字符串作为参数进行构造。解析该 字符串并创建内部Range对象列表以封装这些范围的程序逻辑包含在Range类 中。这里没有列出这些代码(可在本书所附的示例代码中找到)。

Pig调用LoadFunc的setLocation()把输入位置传输给加载器。因为 CutLoadFunc使用TextlnputFormat把输入切分成行,因此我们只用 FileInputFormat的一个静态方法传递设置输入路径的位置信息。

然后,和在MapReduce中一样,Pig调用getInputFormat()方法为每一个分片新 建一个 RecordReader。Pig 把每个 RecordReader 传递给 CutLoadFunc 的prepareToRead方法以便通过引用来进行传递,这样,我们就可以在getNext() 方法中用它遍历记录。

Pig运行时环境会反复调用getNext(),然后’加载函数从reader中读取元组直到 reader读到分片中的最后一条记录。此时,加载函数返回空值null以报告已经没 有可读的元组。

负责把输入文件的行转换为Tuple对象,这是getNext()的任务。它利用Pig用 于创建Tuple实例的类TupleFactory来完成这一工作。newTuple()方法新建一 个包含指定字段数的元组。字段数就是Range类的个数,而这些字段使用Range 对象所确定的输入行中的子串填充。

我们还需要考虑输入行比设定范围短的情况。一种选择是抛出异常并停止进一步的 处理。如果你的应用不准备在碰到不完整或损坏的记录时继续工作,这样处理当然 没问题。在很多情况下,一种更好的选择是返回一个有null字段的元组,然后让Pig脚本根据情况来处理不完整的数据。我们这里采取的是后一种方法:当range 超出行尾时,通过终止for循环把元组随后的字段都设成默认值null。

使用模式

现在让我们来考虑加载的字段数据类型。如果用户指定模式,那么字段就需要转换 成相应的数据类型。但在Pig中,这是在加载后进行的。因此,加载器应该始终用 类型DataByteArray来构造包含bytearray字段的元组。当然,我们也可以让加载器函数来完成类型转换。这时需要重载getLoadCaster()返回包含一组类型转换方法的定制LoadCaster接口实现:

public interface LoadCaster {public Integer bytesToInteger(byte[] b) throws IOException; public Long bytesToLong(byte[] b) throws IOException; public Float bytesToFloat(byte[] b) throws IOException; public Double bytesToDouble(byte[] b) throws IOException;public String bytesToCharArray(byte[] b) throws IOException; public Map<String, Object> bytesToMap(byte[] b) throws IOException; public Tuple bytesToTuple(byte[] b) throws IOException; public DataBag bytesToBag(byte[] b) throws IOException;}

CutLoadFunc并没有覆盖getLoadCaster()。因为默认的getLoadCaster()实 现返回了 Utf8StorageConverter,后者提供UTF-8编码数据到Pig数据类型的 标准转换。

在有些情况下,加载函数本身可以确定模式。例如,如果我们在加载XML或 JSON这样的自描述数据,则可以为Pig创建一个模式来处理这些数据。此外,加 载函数可以使用其他方法来确定模式,例如使用外部文件,或通过传递模式信息给 构造函数。为了满足这些需要,加载函数应该(在实现LoadFunc接口之外)实现 LoadMetadata接口,向Pig运行时环境提供模式。但是请注意,如果用户通过 LOAD的AS子句定义模式,那么它的优先级将髙于LoadMetadata接口定义的模式。

加载函数还可以实现LoadPushDown接口,了解查询需要哪些列。因为此时加载 器可以只加载查询需要的列,因此这可能有助于按列存储的优化。在示例中, CutLoadFunc需要读取元组的整行,所以只加载部分列不容易实现,鉴于此,我 们在这里不使用这种优化技术。

关注微信获取最新动态