压缩
文件压缩有两大好处:可以减少存储文件所需要的磁盘空间;可以加速数据在网络 和磁盘上的传输。需要处理大量数据时,这两大好处是相当重要的,所以需要仔细 考虑在Hadoop中如何使用压缩。
有很多种不同的压缩格式、工具和算法,它们各有千秋。表4-1列出了与Hadoop结合使用的常见压缩方法。
表4-1.压缩格式总结
压缩格式 | 工具 | 算法 | 文件扩展名 | 是否包含多个文件 | 是否可切分 |
---|---|---|---|---|---|
DEFLATE * | N/A | DEFLATE | deflate | 否 | 否 |
Gzip | gzip | DEFLATE | gz | 否 | 否 |
bzip2 | bzip2 | bzip2 | bz2 | 否 | 是 |
LZ0 | Lzop | LZ0 | lzo | 否 | 否 |
*DEFLATE是一个标准压缩算法,该算法的标准实现是zlib。没有可用于生成DEFLATE文件的常用命令行工具,因为通常都用gzip格式。注意,gzip文件格式只是在 DEFLATE格式上增加了文件头和一个文件尾。.deflate文件扩展名是Hadoop约定的
所有压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。表4-1列出的所有压缩工具都提供9个不同的选项来控制压缩时 必须考虑的权衡:选项-1为优化压缩速度,-9为优化压缩空间。例如,下述命令通过最快的压缩方法创建一个名为file.gz的压缩文件:
gzip -1 file
不同压缩工具有不同的压缩特性。gzip是一个通用的压缩工具,在空间/时间性能 的权衡中,居于其他两个压缩方法之间。bzip2比gzip更髙效,但压缩速度更慢一点。bzip2的解压速度比压缩速度快,但与其他压缩格式相比,仍然要慢一些。另一方面,LZO优化压缩速度,其速度比gzip(或其他压缩/解压缩工具)更快,但压缩效率稍逊一筹。
表4-1中的“是否可切分”这一列,表示该压缩算法是否支持切分(splitable)也就是说,是否可以搜索数据流的任意位置并进一步往下读取数据。可切分压缩格式尤其适合MapReduce。
codec
codec实现了一种压缩-解压缩算法。在Hadoop中,一个对CompressionCodec接口的实现代表一个codec。所以,例如,GzipCodec包装了gzip的压缩和解压缩算法。表4-2列举了Hadoop实现的codec。
表 4-2. Hadoop 的压缩 codec
压缩格式 | HadoopCompressionCodec |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZO代码库拥有GPL许可,因而可能没有包含在Apache的发行版本中,因此, Hadoop 的 codec 需要单独从 http://code.google.com/p/hadoop-gpl-compression下载,或从http://github.com/kevinweil/hadoop-lzo下载,该代码库包含有修正的软件错误及其他一些工具。LzopCodec与lzop工具兼容,LzopCodec本质上是 LZO格式的但包含额外的文件头,因此这通常就是你想要的。也有针对纯LZO格式的LzoCodec,并使用.lzo_deflate作为文件扩展名(类似于DEFLATE,但纯gzip并不包含文件头)。
通过CompressionCodec对数据流进行压缩和解压缩
CompressionCodec包含两个函数,可以轻松用于压缩和解压缩数据。如果要对写入输出数据流的数据进行压缩,可用createOutputStream(OutputStream out)法在底层的数据流中对需要以压缩格式写入在此之前尚未压缩的数据新建一个 CompressionOutputStream对象。相反,对输入数据流中读取的数据进行解压缩的时候,则调用 createInputStream(InputStream in)获取 CompressionInputStream, 可通过该方法从底层数据流读取解压缩后的数据。
CompressionOutputStream 对象和 CompressionInputStream 对象,类似于 java.util.zip.DeflaterOutputStream 和 java.util.zip.DeflaterInputStreain,只不过前两者能够重置其底层的压缩或解压缩方法,对于某些将部分数据流(section of data stream)压缩为单独数据块(block)的应用——例如SequenceFile,这个能力是非常重要的。
例4-1显示了如何利用API来压缩从标准输入中读取的数据并将其写到标准输出。
例4-1.该程序压缩从标准输入读取的数据,然后将其写到标准输出
public class StreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.inj out, 4096, false); out.finish(); } }
这个应用希望将CompressionCodec实现的完全合格名称作为第一个命令行参数。我们使用ReflectionUtils来构建一个新的codec实例,然后在System.out上 包裹一个压缩方法。由此,我们可以对IOUtils对象调用copyBytes()方法,从而将输入的数据复制到输出,输出由CompressionOutputStream对象压缩。最后,我们对CompressionOutputStream对象调用finish()方法,要求压缩方法完成到压缩数据流的写操作,但不关闭这个数据流。我们可以用下面这行命令做一个测试,通过GzipCodec的StreamCompressor对象对字符串“Text”进行压缩,然后使用gunzip从标准输入中对它进行读取并解压缩操作:
% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io. compress.GzipCodec \ 丨 gunzip text
通过 CompressionCodecFactory 推断 CompressionCodec
在读取一个压缩文件时,通常可以通过文件扩展名推断需要使用哪个codec。如果文件以.gz结尾,则可以用GzipCodec来读取,如此等等。表4-1为毎一种压缩格式列举了文件扩展名。
通过使用其geiCodec()方法,CompressionCodecFactory提供了一种方法可以将文件扩展名映射到一个CompressionCodec,该方法取文件的Path对象作为参数。例4-2所示的应用便使用这个特性来对文件进行解压缩。
例4-2.该应用使用由文件扩展名推断而来的codec来对文件进行解压缩
public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(l); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec•createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils. copyBytes(iri, out, conf); } finally { IOUtils.closeStream(in) IOUtils.closeStream(out); } } }
一旦找到对应的codec,便去除文件扩展名形成输出文件名,这是通过 CompressionCodecFactory对象的静态方法removeSuffix()来实现的。按照这种方法,一个名为file.gz的文件可以通过下面的程序压缩为名为file的文件:
% hadoop FileDecompressor file.gz
CompressionCodecFactory 会从 io.compression.codecs 属性定义的一个列表中找到 codec。默认情况下,该列表列出了Hadoop提供的所有codec,所以只有在你拥有一个希望注册的定制codec(例如外部管理的LZO codec)时才需对其进行修改。每个codec都知道自己默认的文件扩展名,因此CompressionCodecFactory可通过捜索注册的codec找到匹配指定文件扩展名的codec(如果有的话)。
表4-3.压缩codec的属性
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
io.compression. codecs | 句点分隔的类名 | •org.apache.hadoop.io. compress.DefaultCodec | 压缩/解压缩的 CompressionCodec |
•org.apache.hadoop.io. compress.GzipCodec | 类列表 | ||
•org.apache.hadoop.io. compress.Bzip2Codec |
原生类库
为了性能,最好使用“原生”(native)类库来实现压缩和解压缩。例如,在一个测试中,使用原生gzip类库可以减少大约一半的解压缩时间和大约10%的压缩时间 (与内置的java实现相比)。表4-4给出了每种压缩格式的Java实现和原生类库实现。并非所有格式都有原生实现(例如,bzip2),有些则只有原生类库实现(例如, LZO)。
压缩格式 | Java实现 | 原生实现 |
---|---|---|
DEFLATE | 是 | 是 |
gzip | 是 | 是 |
bzip2 | 是 | 否 |
LZO | 否 | 是 |
Hadoop本身包含有为 32 位和 64 位Linux构建的压缩代码库(位于lib/native目录)。对于其他平台,需要根据 Hadoop wiki (http://wiki.apache.org/hadoop/NativeHadoop) 的指令根据需要来编译代码库。
可以通过Java系统的java.library.path属性指定原生代码库。bin文件夹中的 hadoop脚本可以帮你设置该属性,但如果不用这个脚本,则需要在应用中手动设置该属性。
默认情况下,Hadoop会根据自身运行的平台搜索原生代码库,如果找到相应的代码库就会自动加载。这意味着,你无需为了使用原生代码库而修改任何设置。但是,在某些情况下,例如调试一个压缩相关问题时,可能需要禁用原生代码库。将属性hadoop.native.lib的值设置成false即可,这可确保使用内置的Java代码库(如果有的话)。
CodecPool如果使用的是原生代码库并且需要在应用中执行大量压缩和解压缩操作,可以考虑使用CodecPool,它允许你反复使用压缩和解压缩,以分摊创建这些对象所涉及的开销。
例4-3中的代码显示了 API函数,不过在这个程序中,它只新建了一个Compressor,并 不需要使用压缩/解压缩池。
例4-3.该程序使用压缩池对读取自标准输入的数据进行压缩,然后将其写到标准输出
public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec); CompressionOutputStream out = codec.createOutputStream(System.out, compressor); IOUtils.copyBytes(System.in^ out, 4096, false); out.finish(); } finally { CodecPool• returnCompressor(compressor); } } }
在codec的重载方法 createOutputStream()中,对于指定的 CompressionCodec,我们从池中获取一个Compressor实例。通过使用finally数据块我们,在不同的数据流之间来回复制数据,即使出现IOException异常,也可以确保compressor可以返回池中。
压缩和输入分片
在考虑如何压缩将由MapReduce处理的数据时,理解这些压缩格式是否支持切分 (splitting)是非常重要的。以一个存储在HDFS文件系统中且压缩前大小为1 GB的 文件为例。如果HDFS的块大小设置为64MB,那么该文件将被存储在16个块 中,把这个文件作为输入数据的MapReduCe作业,将创建16个数据块,其中每个 数据块作为一个map任务的输入。
现在,经过gzip压缩后,文件大小为1 GB。与以前一样,HDFS将这个文件保存 为16个数据块。但是,将每个数据块单独作为一个输入分片是无法实现工作的, 因为无法实现从gzip压缩数据流的任意位置读取数据,所以让map任务独立于其他任务进行数据读取是行不通的。gzip格式使用DEFLATE算法来存储压缩后的数据,而DEFLATE算法将数据存储在一系列连续的压缩块中。问题在于从每个块的起始位置进行读取与从数据流的任意位置开始读取时一致并接着往后读取下一个数 据块,因此需要与整个数据流进行同步。由于上述原因,gzip并不支持文件切分。
在这种情况下,MapReduce会做正确的事情,不会去尝试切分gzip压缩文件,因为它知道输入是gzip压缩文件(通过文件扩展名看出)且gzip不支持切分。这是可行的,但牺牲了数据的本地性:一个map任务处理16个HDFS块,而其中大多数 块并没有存储在执行该map任务的节点。而且,map任务数越少,作业的粒度就较大,因而运行的时间可能会更长。
前面假设的例子中,如果文件是通过LZO压缩的,我们会面临相同的问题,因为这个压缩格式也不支持数据读取和数据流同步。但是,bzip2文件提供不同数据块之间的同步标识(pi的48位近似值),因而它是支持切分的。表4-1列出了每个压缩格式是否支持切分。
应该使用哪种压缩格式? 使用哪种压缩格式与具体应用相关。是希望应用运行速度最快,还是更关注尽 可能降低数据存储开销?通常情况下,需要为应用尝试不同的策略,并且为应 用构建一套测试基准,从而找到最理想的压缩格式。 对于巨大的、没有存储边界的文件,如日志文件,可以考虑如下选项。
对大文件来说,不应该使用不支持切分整个文件的压缩格式,否则将失去数据 的本地特性,进而造成MapReduce应用效率低下。 |
在MapReduce中使用压缩
在“通过 CompressionCodecFactory 推断 CompressionCodec” 小节中, 已经指出一点:如果输入文件是压缩的,那么在根据文件扩展名推断出相应的 codec后,MapReduce会在读取文件时自动解压缩文件。
要想对MapReduce作业的输出进行压缩操作,应在作业配置过程中,将 mapred.output.compress 属性设为 true 和 mapred.output.compression.codec属性设置为打算使用的压缩codec的类名,如例4-4所示。
public class MaxTGmpenatuneWithCompression { public static void main(String[] args) throws IOException { if(angs.length != 2) { System.Gnn.pnintln("Usage: MaxTempGnatuneWithCompnession <input path>" +"<output path>"); System.exit(-1); } JobConf conf = new DobConf(MaxTemperatureWithCompression.class); conf.setDobName("Max tempenatune with output compression"); FileInputFonmat.addInputPath(conf, new Path(angs[0])); FileOutputFonmat.setOutputPath(conf, new Path(angs[l])); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWnitable.class); conf.setBoolean("mapned.output.compress", true); conf.setClass("mapned.output.compression.codec", GzipCodec.class,CompressionCodec.class); conf.setMappenClass(MaxTempenatuneMappen.class); conf.setCombinenClass(MaxTempenatuneReducen.class); conf.setReducenClass(MaxTempenatuneReducen.class); JobClient•nunDob(conf); } }
我们按照如下指令对压缩后的输入运行程序(输出数据不必使用相同的压缩格式进行压缩,尽管本例中不是这样):
% hadoop MaxTempenatureWithCompression input/ncdc/sample.txt.gz output
最终输出的每个部分都是经过压缩的。在这里,只有一部分结果:
% gunzip -c output/part-00000.gz 1949 111 1950 722
如果为输出生成烦序文件(sequence file),可以设置mapred.output. compression.type属性来控制要使用哪种压缩格式。默认值是RECORD,即针对毎条纪录进行压缩。如果将其改为BLOCK,将针对一组纪录进行压缩,这是推荐的压缩策略,因为它的压缩效率更高。
对map任务输出进行压缩
尽管MapReduce应用读写的是未经压缩的数据,但如果对map阶段的中间输入进行压缩,也可以获得不少好处。由于map任务的输出需要写到磁盘并通过网络传输到reducer节点,所以如果使用LZO这样的快速压缩方式,是可以获得性能提升的,因为需要传输的数据减少了。启用map任务输出压缩和设置压缩格式的配置属性如表4-5所示。
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.compress.map.output | boolean | false | 对map任务输出进行压缩 |
mapred.map.output. compression.codec | Class | •org.apache.hadoop.io. compress.DefaultCodec | 类列表 |
下面是在作业中启用map任务输出gzip压缩格式的代码:
conf.setCompressMapOutput(true); conf.setMapOutputCompressorClass(GzipCodec.class);