数据处理操作
加载和存储数据
在本章中,我们已经看过Pig如何从外部存储加载数据来进行处理。与之相似,存 储处理结果也是非常直观的。下面的例子使用PigStorage将元组存储为以用分号 分隔的纯文本值:
grunt> STORE A INTO'out' USING PigStorage(':' );grunt> cat outJoe:cherry:2Ali:apple:3Joe:banana:2Eve:apple:7
过滤数据
如果你已经把数据加载到关系中,那么下一步往往是对这些数据进行过滤,移除你 不感兴趣的数据。通过在整个数据处理流水线的早期对数据进行过滤,可以使系统 数据处理数据总蛩最小化,从而提升处理性能。
FOREACH...GENERATE
我们已经介绍了如何使用带有简单表达式和UDF的FILTER操作从一个关系中移 除行。FOREACH...GENERATE操作用于逐个处理一个关系中的行。它可用于移除 字段或创建新的字段。在这个示例里,我们既要删除字段,也要创建字段:
grunt> DUMP A;(Joe,cherry,2)(Ali,apple,3)(Joe,banana,2)(Eve,apple,7)grunt> B = FOREACH A GENERATE $Q, $2+1, 'Constant'; grunt> DUMP B;(Joe,3,Constant)(Ali,4,Constant)(Doe,3,Constant)(Eve,8,Constant)
在这里,我们已经创建了一个有三个字段的新关系B。它的第一个字段是A的第 —个字段($0)的投影。B的第二个字段是A的第三个字段($2)加1。B的第三个字 段是一个常摄字段(即B中毎一行在第三个字段的取值都相同),其类型为 chararray,取值为 Constant。
FOREACH...GENERATE操作可以使用嵌套形式以支持更复杂的处理„在如下示例 中,我们计算天气数据集的多个统计值:
--year_stats.pig REGISTER pig-examples.jar;DEFINE isGood com.hadoopbook.pig.IsGoodQuality(); records = LOAD iinput/ncdc/all/19(1,2,3,4,5)0*'USING com.hadoopbook.pig.CutLoadFunc(,5-10,ll-15,16-19,88-92,93-93')AS (usaf:chararray, wban:chararray, year:int, temperature:int, quality:int);grouped_records = GROUP records BY year PARALLEL 30;year_stats = FOREACH grouped_records { uniq_stations = DISTINCT records.usaf; good_records = FILTER records BY isGood(quality);GENERATE FLATTEN(group), COUNT(uniq_stations) AS station_count,COUNT(good_records) AS good_record count, COUNT(records) AS record_count;}DUMP year_stats;
通过使用我们前面开发的cut UDF,我们从输入数据集加载多个字段到records 关系中。接下来,我们根据年份对records进行分组。请注意,我们使用关键字 PARALLEL来设置要使用多少个reducer。这在使用集群进行处理时非常重要。然 后,我们使用嵌套的FOREACH...GENERATE操作对每个组分别进行处理。第一重嵌 套语句使用DISTINCT操作为每一个气象观测站的USAF标识创建一个关系。第 二层嵌套语句使用FILTER操作和一个UDF为包含“好”读数记录创建一个关 系。最后一层嵌套语句是GENERATE语句(嵌套FOREACH...GENERATE语句必 须以GENERATE语句作为最后一层嵌套语句)。该语句使用分组后的记录和嵌套语 句块创建的关系生成了需要的汇总字段。
在若干年数据上运行以上程序,我们得到如下结果:
(1920,8L,8595L,8595L)(1950,1988L,8635452L,8641353L)(1930,121L,89245L,89262L)(1910,7L,7650L,7650L)(1940,732L,1052333L,1052976L)
这些字段分别表示年份、不同气象观测站的个数、好的读数的总数、总的读数。从 中我们可以看到气象观测站个数和读数个数是如何随着时间变化而增长的。
STREAM
STREAM操作让你可以用外部程序或脚本对关系中的数据进行变换。这一操作的 命名对应于Hadoop的Streaming,后者为MapReduce的提供类似能力。
STREAM可以使用内置命令作为参数。下面的例子使用Unix cut命令从八中每个元组抽取第二个字段。注意,命令及其参数要用反向撇号引用:
grunt> C = STREAM A THROUGH 'cut -f 2'; grunt> DUMP C;(cherry) (apple)(banana)(apple)
STREAM操作使用PigStorage来序列化/反序列化关系,输出为程序的标准输出流 或从标准输入流读入。人中的元组变换成由制表符分隔的行,然后传递给脚本。脚 本的输出结果被逐行读入,并根据制表符来划分以创建新的元组,并输出到关系 C。也可以使用DEFINE命令,通过实现PigToStream和StreamToPig两者都包 含在org.apache.pig包中)来提供定制的序列化和反序列化程序。
在编写定制的处理脚本时,Pig流式处理是最有用的。以下的Python脚本用于过滤 气温记录:
#!/usr/bin/env python import re import sysfor line in sys.stdin:(year, temp, q) = line.stnip() .split() if (temp != "9999" and ne.match("[01459]", q)): print "%s\t%s" % (yean, temp)
要使用这一脚本,需要把脚本传输到集群上。这可以通过DEFINE子句来完成。 该子句还为STREAM命令创建了一个别名。然后,便可以像下面的Pig脚本那样 在STREAM语句中使用该别名:
--max_temp_filten_stneam.pigDEFINE is_good_quality 'is_good_quality.py'SHIP (1chll/snc/main/python/is_good_quality.py'); records = LOAD 'input/ncdc/micno-tab/sample.txt'AS (yean:chananray, tempenatune:int, quality:int); filtened_neconds = STREAM records THROUGH is_good_quality AS (yean:chararray, tempenatune:int); gnouped_neconds = GROUP filtened_neconds BY year; max_temp = FOREACH gnouped_neconds GENERATE group, MAX(filtened_neconds.tenipenatune);DUMP max_temp;
分组与连接数据
在MapReduce中要对数据集进行连接操作需要程序员写不少程序。因为只有非规范化的大规模数据集才最适宜使用Pig(或 MapRedUCe)这样的工 具进行分析,因此连接在Pig中的使用频率远小于在SQL中的使用频率。
JOIN
让我们来看一个“内连接”(inner join)的示例。考虑有如下关系A和B:
grunt> DUHP A;(2,Tie)(4,Coat)(3,Hat)(1,Scanf) grunt> DUMP B;(Joe,2)(Hank,4)(Ali,0)(Eve,B)(Hank,2)
我们可以在两个关系的数值型(标识符)属性上对它们进行连接操作:
grunt> C = JOIN A BY $0, BBY $1; grunt> DUMP C;(2,Tie,Joe,2)(2,Tie,Hank,2)(3,Hat,Eve,3)(4,Coat,Hank,4)
这是一个典型的“内连接”(inner join)操作:两个关系元组的毎次匹配都和结果中 的一行相对应。(这其实是一个等值连接(equijoin),即连接谓词(join predicate)为相 等。)结果中的字段由所有输入关系的所有字段组成。
如果要进行连接的关系太大,不能全部放在内存中,则应该使用通用的连接操作。 如果有一个关系小到能够全部放在内存中,则可以使用一种特殊的连接操作,叫 “分段复制连接”(fragment replicate join),它把小的输入关系发送到所有 mapper,并在map端使用内存查找表对(分段的)较大的关系进行连接。要使用特殊 的语法让Pig使用分段复制连接:
gnunt> C = ]OIN A BY $0, B BY $1 USING ” replicated” ;
这里,第一个关系必须是大的关系,后一个则是相对较小的关系(能够全部存放在 内存中)。
Pig也支持通过使用类似于SQL的语法进行外连接。例如:
grunt> C = JOIN A BY $0 LEFT OUTER, B BY $1; gnunt> DUMP C;(l,Scarf,,)(2,Tie,Joe,2)(2,Tie,Hank,2)(3,Hat,Eve,3)(4,Coat,Hank,4)
COGROUP
JOIN结果的结构总是“平面”的,即一组元组。COGROUP语句和JOIN类似,但 是不同点在于,它会创建一组嵌套的输出元组集合。如果你希望利用如下语句中输 出结果那样的结构,那么COGROUP将会有用:
grunt> D = COGROUP A BY $0, B BY $1; grunt> DUMP D;(0,{},{(Ali,0)})(1,{(1,Scarf)},{})(2,{(2,Tie)h{(Joe,2),(Hank,2)})(3,{(3,Hat)},{(Eve,3)})(4,{(4,Coat)},{(Hank,4)})
COGROUP为每个不同的分组键值生成一个元组。每个元组的第一个字段就是那个 键值。其他字段是各个关系中匹配该键值的元组所组成的“包”(bag)。第一个包 包含关系A中有该键值的匹配元组。同样,第二个包包含关系B中有该键值的 匹配元组。
如果某个键值在一个关系中没有匹配的元组,那么对应于这个关系的包就为空。在 前面的示例中,因为没有人购买围巾(ID为1),所以对应元组的第二个包就为空。 这是一个外连接的例子。COGROUP的默认类型是外连接。可以使用关键词 OUTER来显式指明使用外连接,COGROUP产生的结果和前一个语句相同:
D = COGROUP A BY $0 OUTER, B BY $1 OUTER;
也可以使用关键词INNERk COGROUP使用内连接的语义,剔除包含空包的行。 INNER关键词是针对关系进行使用的,因此如下语句只是去除关系A不匹配的行 (在这个示例中就是去掉未知商品0对应的行):
grunt> E = COGROUP A BY $0 INNER, B BY $1;grunt> DUMP E;(1,{(1,Scarf)},{})(2,{(2,Tie)},{(Joe,2),(Hank,2)})(3,{(3jHat)},{(Eve,3)})(4,{(4,Coat)},{(Hank,4)})
我们可以把这个结构平面化,从人找出买了每一项商品的人。
grunt> F = FOREACH E GENERATE FLATTEN(A), B.$0; grunt> DUMP F;(1,Scarf,{})(2,Tie,{(Joe),(Hank)})(3,Hat,{(Eve)})(4,Coat,{(Hank)})
把COGROUP, INNER和FLATTEN(消除嵌套)组合起来使用相当于实现了(内)连接:
grunt> G = COGROUP A BY $0 INNER, B BY $1 INNER; grunt> H = FOREACH G GENERATE FLATTEN($1), FLATTEN($2); grunt> DUMP H;(2,Tie,Joe,2)(2jTie,Hank,2)(3,Hat,Eve,3)(4,Coat,Hank,4)
这和JOINABY$0,BBY$1的结果是一样的。
如果要连接的键由多个字段组成,则可以在JOIN或COGROUP语句的BY子句中 把它们都列出来。这时要保证每个BY子句中的字段个数相同。下面是如何在Pig 中进行连接的另一个示例。该脚本计算输入的时间段内每个观测站报告的最高 气温:
--max_temp_station_name.pig REGISTER pig-examples.jar;DEFINE isGood com.hadoopbook.pig.IsGoodQuality();stations = LOAD Jinput/ncdc/metadata/stations-fixed-width.txt'USING com.hadoopbook.pig.CutLoadFunc('1-6,8-12,14-42')AS (usaf:chararray, wban:chararray, name:chararray);trimmed_stations = FOREACH stations GENERATE usaf, wban, com.hadoopbook.pig.Trim(name);records = LOAD'input/ncdc/all/191*'USING com.hadoopbook.pig.CutLoadFunc(J5-10,ll-15,88-92,93-93J)AS (usaf:chararray, wban:chararray, temperature:intj quality:int);filtered_records = FILTER records BY temperature != 9999 AND isGood(quality); grouped_records = GROUP filtered_records BY (usaf, wban) PARALLEL 30; max_temp = FOREACH grouped_records GENERATE FLATTEN(group), MAX(filtered_records.temperature); max_temp_named = JOIN max_temp BY (usaf, wban) trimmed_stations BY (usaf, wban) PARALLEL 30;max_temp_result = FOREACH max_temp_named GENERATE $0,$1,$5,$2;STORE max_temp_result INTO Jmax_temp_by_station';
我们使用先前开发的cut UDF来加载包括气象观测站ID(USAF和WBAN标识)、 名称的关系以及包含所有气象记录且以观测站ID为键的关系。我们在根据气象观 测站进行连接之前,先根据观测站ID对气象记录进行分组和过滤,并计算最高气 温的聚集值。最后,在进行连接之后,我们把所需要的字段——即USAF、WBAN、 观测站名称和最高气温——投影到最终结果。
下面是20世纪头10年的结果:
228020 99999 SORTAVALA 322 029110 99999 VAASA AIRPORT 300 040650 99999 GRIMSEY 378
因为观测站的元数据较少,所以这个査询可以通过使用“分段复制连接”(fragment,replicate join)来进一步提升运行效率。
CROSS
Pig Latin包含“叉乘”(cross-product,也称为“笛卡儿积” [Cartesian product])的操作。这一操作把一个关系中的毎个元组和第二个中的所有元组进行连接(如果有更多的关系,那么这个操作就进一步把结果逐一和这些关系的每一个元组进行连接)。这个操作的输出结果的大小是输入关系的大小的乘积。输出结果可能会非常大:
grunt> I = CROSS A, B;grunt> DUMP I;(2,Tie,Joe,2)(2,Tie,Hank,4)(2,Tie,Ali,0)(2,Tie,Eve,3)(2,Tie,Hank,2)(4,Coat,Joe,2)(4,Coat,Hank,4);4,Coat,Ali,0)(4,Coat,Eve,3)(4,Coat,Hank,2)(3,Hat,Joe,2)(3,Hat,Hank,4)(3,Hat,Ali,0)(3,Hat,Eve,3)(3,Hat,Hank,2)(1,Scarf,Joe,2)(1,Scarf,Hank,4)(1,Scarf,Ali,0)(1,Scarf,Eve,3)(1,Scarf,Hank,2)
在处理大规模数据集时,应该尽量避免会产生平方(或更差)级中间结果的操作。只 有在极少数情况下,才需要对整个输入数据集计算叉乘。
例如,一开始,用户可能觉得必须生成文档集合中所有文档的两两配对组合才能计 算文档两两之间的相似度。但是,随着对数据和应用的深入了解,他会发现大多数 文档配对的相似度为零(即它们之间没有关系)。于是,我们就能找到一种更好的算 法来计算相似度。
在此,解决这一问题的主要思路是把计算聚焦于用于计算相似度的实体,如文档中 的关键词06«1^,让它们成为算法的核心。事实上,我们还要删去对区分文档没有 帮助的词,即禁用词,进一步缩减问题的搜索空间。使用这一技术,分 析近一百万个(106)文档大约会产生约十亿个(109)中间结果文档配对。°而如果用朴 素的方法(即生成输入集合的叉乘)或不消除停用词,会产生一万亿个(102)个文档 配对。
GROUP
COGROUP用于把两个或多个关系中的数据放到一起,而GROUP语句则对一个关 系中的数据进行分组。GROUP不仅支持对键值进行分组(即把键值相同的元组放到 一起),你还可以使用表达式或用户自定义函数作为分组键。例如,有如下关系A:
grunt> DUMP A;(Joe,cherry)(Ali,apple)(Joe^banana)(Eve,apple)
我们根据这个关系的第二个字段的字符个数进行分组:
grunt> B =: GROUP A BY SIZE($1); grunt> DUMP B;(5L,{(Ali,apple),(Eve,apple)})(6L,{(Doe,cherry),(Doe,banana)})
GROUP会创建一个关系,它的第一个字段是分组字段,其别名指定group。第二 个字段是包含与原关系(在本示例中就是A)模式相同的被分组字段的包。
有两种特殊的分组操作:ALL和ANY。ALL把一个关系中的所有元组放入一个 包。这和使用某个常量函数作为分组函数所获得的结果一样:
grunt> C = GROUP A ALL; grunt> DUMP C;(all,{(Doe,cherry),(Ali,apple),(Doe,banana),(Eve,apple)})
关键词ANY用于对关系中的元组随机分组。它对于取样非常有用。
对数据进行排序
Pig中的关系是无序的。考虑如下关系A:
grunt> DUMP A;(2.3)(1,2)(2.4)
Pig按什么顺序来处理这个关系中的行是不一定的。特别是在使用DUMP或 STORE检索A中的内容时,Pig可能以任何顺序输出结果。如果想设置输出的顺 序,可以使用ORDER操作按照某个或某几个字段对关系中的数据进行排序。默认 的排序方式是对具有相同类型的字段值使用自然序进行排序(natural ordering),而 不同类型字段值之间的排序则是任意的、确定的(例如,一个元组总是小于一 个包)。
如下示例对A中元组根据第一个字段的升序和第二个字段的降序进行排序:
grunt> B = ORDER A BY $0, $1 DESC; grunt> DUMP B;(1,2)(2,4)(2.3)
对排序后关系的后续处理并不保证能够维持已排好的顺序。例如:
grunt> C = FOREACH B GENERATE *;
即使关系C和关系B有相同的内容,关系C用DUMP或STORE仍然可能产生以 任意顺序排列的输出结果。正是由于这样,通常只在获取结果前一步才使用ORDER操作。
LIMIT语句对于限制结果的大小以快速获得一个关系样本,非常有用。而“取原 型化”(prototyping)操作(即 ILLUSTRATE命令)则更适用于根据数据产生有代表性 的样本。LIMIT语句可紧跟ORDER语句使用以来获得排在最前面的n个元组。通 常,LIMIT会随意选择一个关系中的n个元组。但是,当它紧跟ORDER语句使用 时,ORDER产生的次序会继续保持(这和其他操作不保持输入关系数据顺序的规则 不同,是一个例外):
grunt> D = LIMIT B 2; grunt> DUHP D;(1,2)(2,4)
如果所给的限制值远远大于关系中所有元组个数的总数,则返回所有元组(LIMIT操作没有作用)。
使用LIMIT能够提升系统的性能。因为Pig会在处理流水线中尽早使用限制操 作,以最小化需要处理的数据总量。因此,如果不需要所有输出数据,就应该用LIMIT操作。
组合和切分数据
有时,你把几个关系组合在一起。为此可以使用UNION语句。例如:
grunt> DUHP A;(2,3)(1,2)(2,4)grunt> DUHP B;(z,x,8)(w,y,l)grunt> C = UNION A, B; grunt> DUMP C;(z,x,8)(w,y,l)(2,3)(1,2)(2,4)
C是关系A和B的“并”(union)。因为关系本身是无序的,因此C中元组的顺序 是不确定的。另外,如示例中那样,我们可以对两个模式不同或字段个数不同的关 系进行并操作。Pig会试图合并正在执行UNIDN操作的两个关系的模式。在这个 例子中,两个模式是不兼容的,因此C没有模式:
grunt> DESCRIBE A;A: {f0: intjfl: int} grunt> DESCRIBE B;B: {f0: chararray,fl: chararray,f2: int} grunt> DESCRIBE C;Schema for C unknown.
如果输出关系没有模式,脚本就需要能够处理字段个数和数据类型都不同的元组。