Hadoop教程

示例

虽然HDFS和MapReduce是用于对大数据集进行批处理的强大工具,但对于读或 写单独的记录,效率却很低。在这个示例中,我们将看到如何用HBase来填补它 们之间的鸿沟。

前面几章描述的气象数据集包含过去100多年上万个气象站的观测数据。这个数据 集还在继续增长,它的大小几乎是无限的。在这个示例中,我们将构建一个web界面来让用户査看不同观测站的数据。这些数据按照时间顺序分页显示。为此,让 我们假设数据集非常大,观测数据达到上亿条记录,且气温更新数据到达的速度很 快一比如从全球观测站收到超过每秒10万次更新。不仅如此,我们还假设这个 Web应用必须能够满足及时(most up-to-date)观测数据显示,即在收到数据后大约1 秒就能显示结果。

对数据集的第一个要求使我们排除了使用RDBMS实例,而可能选择HBase。对于查询延时的第二个要求排除了直接使用HDFS。MapRedUCe作业可以用于建立索引 以支持对观测数据进行随机访问,但HDFS和MapReduce并不揎长在有更新到达 时维护索引。

模式

在我们的示例中,有两个表。

Stations

这个表包含观测站数据。行的键是stationid。这个表还有一个列族info它 能作为键/值字典支持对观测站信息的查找。字典的键就是列名info:name, info.location以及info:description。表是静态的。在这里,列族info 的设计类似于RDBMS中表的设计。

Observations

这个表存放气温观测数据。行的键是stationid和逆序时间戳构成的组合 键。这个表有一个列族data,它包含一列airtemp,其值为观测到的气温值。

我们对模式的选择取决于我们希望以多高的效率来读取HBase。行和列以字典序升 序保存。虽然有二级索引和正则表达式匹配工具,但这些工具的性能仍然较低。所 以,清楚地认识到自己为最有效地存储和读取数据而希望以多髙的效率来查询数 据,非常关键,

在stations表中,显然选择stationid作为键,因为我们总是根据特定站点的 id来访问观测站的信息。但obserbations表使用的是一个组合键(把观测的时间 戳加在键之后)。这样,同一个观测站的观测数据就会被分组放到一起,使用逆序 时间戳(Long.MAX_VALUE-epoch)的二进制存储,系统把每个观测站观测数据中最 新的数据存储在最前面。

在外壳环境中,可以用以下方法来定义表:

hbase(main):036:0>	create ’stations', {NAME => 'info', VERSIONS => 1}0 row(s) in 0.1304	secondshbase(main):037:0>	create 'observations', {NAME => 'data', VERSIONS => 1}0 row(s) in 0.1332	seconds

在两个表中,我们都只对表单元格的最新版本感兴趣,所以VERSIONS设为1。这个参数的默认值是3。


加载数据

观测站的数量相对较少,所以我们可以使用任何一种接口来插入这些观测站的静态 数据。 但是,假设我们要加载数十亿条观测数据。这种数据导入是一个极为复杂的过程, 是一个需要长时间运行的数据库操作。MapReduce和HBase的分布式模型让我们 可以充分利用集群。通过把原始输入数据复制到HDFS,接着运行MapReduce作 业,我们就能读到输入数据并将其写入HDFS怍业,它将观测数据从前几章所用的输入文件导入 HBase。

例13-3.从HDFS向HBase表导入气温数据的MapReduce应用

public class HBaseTemperatuneImporter extends Configured implements Tool {// Inner-class for mapstatic class HBaseTemperatureMapper<K, V> extends MapReduceBase implements Mapper<LongWritabie,	 Text<K, V> { private NcdcRecordParser parser = new NcdcRecordParser(); private HTable table;public void map(LongWritable key. Text value,OutputCollector<K, V> output, Reporter reporter) throws IOException {parser.parse(value.toString()); if (parser.isValidTemperature()) {byte[] rowKey = RowKeyConverter.makeObservationRowKey(parser.getStationId(); parser.getObservationDate().getTime());Put p = new Put(rowKey);p.add(HBaseTemperatureCli.DATA_COLUMNFAMILY,HBaseTemperatureCli.AIRTEMP_QUALIFIER,Bytes.toBytes(parser.getAirTemperature())); table.put(p);public void configure(JobConf jc) { super.configure(jc);// Create the HBase table client once up-front and keep it around // rather than create on each map invocation.try {this.table = new HTable(new HBaseConfiguration(jc), ” observations” ); } catch (IOException e) { throw new RuntimeException(w Failed HTable construction” , e);@Overridepublic void close() throws IOException { super.close(); table.close();public int run(String[] args) throws IOException {if (args.length != 1) {System.err.println(w Usage: HBaseTemperatureImporter <input>” ); return -1;JobConf jc = new JobConf(getConf(), getClass()); FileInputFormat.addInputPath(jc, new Path(args[0])); jc.setMapperClass(HBaseTemperatureMapper.class); jc.setNumReduceTasks(0);jc.setOutputFormat(NullOutputFormat.class); JobClient.runJob(jc); return 0;public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HBaseConfiguration(), new HBaseTemperatureImporter(), args);System.exit(exitCode);}}

HBaseTemperatureImporter 有一个名为 HbaseTemperatureMapper 的内部类, 它类似于第5章的MaxTemperatureMaper类。外部类实现了 Tool,并对调用 HBaseTemperatureMapper 内部类进行设置。HBaseTemperatureMapper 和 MaxTemperatureMapper的输入相同,所进行的解析方法一都使用第5章所介绍的NcdcRecordParser来进行 也相同。解析时会检查输入是否为有效的气温。但是不同于在MaxTemperatureMapper中仅把有效气温加输出集合中,这个类把 有效的气温值添加到HBase的observations表的data:airtemp列。(我们使用了 从HBase的TemperatureCli类中导出的data和airtemp静态定义。后面对此 会有介绍。)在configure()方法中,我们对observations表新建了一个 HTable实例。在后面调用map和HBase进行交互时会用到它。最后,在HTable 实例中调用close把所有尚未清空的写缓存中的数据刷入磁盘。

行的键是在RowKeyConverter上的makeObservationRowKey()方法中,用观测站ID和观测时间创建的:

public class RowKeyConverter {private static final int STATION_ID_LENGTH = 12;/***@return A row key whose format is: <station_id> <reverse_order_epoch>*/public static byte[] makeObservationRowKey(String stationId, longobservationTime){ byte[]row=newbyte[STATION_ID_LENGTH+Bytes.SIZEOF_LONG]; Bytes.putBytes(row,0,Bytes.toBytes(stationId),0,STATION_ID_LENGTH); longreverseOrderEpoch=Long.MAX_VALUE-observationTime; Bytes.putLong(row,STATION_ID_LENGTH,reverseOrderEpoch); returnrow;}}

观测站ID其实是一个定长字符串,所以在转换时利用了这个事实。 makeOvservationRowKey()中使用的 Byte类来自HBase工具包。它包含字节数 组和普通的 Java和 Hadoop数据类型间的转换方法。在 makeObsenvationRowKey()中,Bytes.putLong()方法用来填充键的字节数组。 Bytes.SIZEOF_LONG常量用来确定数据行的键的数组大小和其中元素的位置。

我们可以用下面的命令来运行程序:

% hbase HBaseTemperatureImporter input/ncdc/all

对优化的一些说明

要特别当心导入导致锁定表的情况,这时所有的客户端都对同一个表的区域 (在单个节点上)进行操作,然后再对下一个区域进行操作,依次进行。这时加 载操作并没有均匀地分布在所有区域上。这通常是由“排序后输入”(sorted input)和切分的原理共同造成的。如果在插入数据前,针对行的键按数据排列 的次序进行随机处理,可能有助于减少这种情况。在我们的示例中,基于当前 stationid值的分布情况和TextlnputFormat分割数据的方式,上传操作应 该足以保证足够的分布式特性。

每个任务只获得一个HTable实例。实例化HTable是有代价的,所以如果为 每个插入操作实例化一个HTable,会对性能造成负面影响,因此,我们会在configure()步骤设置HTable。

默认情况下,每个HTable.put(put)在进行插入操作时事实上不使用任何缓 存。可以使用HTable.setAutoFlush(false),接着设置写缓存的大小,以 此禁用HTable的自动刷入特性。插入的数据占满写缓存之后,缓存才会被刷 入存储。但要记住,必须在每个任务的最后手工调用HTable.flushCommits() 或 HTable.close(),后者会调用 HTable.flushCommits(),以确保缓存中 最后没有剩下未被刷入的更新。这可以在mapper重载的close()中完成。

HBase 包含 TableInputFormat 和 TableOutputFormat。它们可用于帮助把 Hbase作为源或目标的MapReduce(见例13-2)。也可以像在第5章那样使用 MaxTemperatureMapper,增加一个 reducer 任务来接收 MaxTemperatureMapper 的输出并通过TableOutputFormat把结果导入HBase。


Web査询

为了实现一个Web应用,我们将直接使用HBase的Java API。在这里,我们将深 刻体会到选择模式和存储格式的重要性。 最简单的查询就是获取静态的观测站信息。这一类查询在传统数据库中也很简单, 但HBase提供了额外的控制功能和灵活性。我们把info列族作为键/值字典(列名 作为键,列值作为值),代码如下所示:

public Map
 
   getStationInfo(HTable table, String stationId) throws IOException {Get get = new Get(Bytes.toBytes(stationId)); get.addColumn(INFO_COLUMNFAMILY);Result nes = table.get(get); if (nes == null) { return null;}Map
  
    resultMap = new HashMap<String, Stning>(); resultMap.put("name” , getValue(res, INFO_COLUMNFAMILY, NAME_QUALIFIER)); resultMap.put(”location”, getValue(res, INFO_COLUMNFAMILY, LOCATfON_QUALIFIER)); resultMap.put("description", getValue(nes, INFO_COLUMNFAMILY, DESCRIPTION_QUALIFIER)); ~return resultMap;}private static String getValue(Result res, byte [] cf, byte [] qualifier) { byte [] value = res.getValue(cf, qualifier); return value == null? "": Bytes.toStning(value);}
  
 

在这个示例中,getStationInfo()接收一个HTable实例和一个观测站ID。为了 获取观测站的信息,我们使用HTable.get()来传递一个设置为获取已定义列族INFO_COLUMNFAMILY中所有内容的Get实例。

get()的结果返回给Result。它包含数据行,我们可以通过操作需要的列单元格 来取得单元格的值。getStationInfo()方法把Result Map转换为更便于使用的 由String类型的键和值构成的Map。

我们已经看出为什么在使用HBase时需要工具函数了。在HBase上,为了处理底 层的交互,我们已经开发出越来越多的抽象。但是,理解它们的工作机理以及各个 存储选项之间的差异,非常重要。

和关系型数据库相比,HBase的一个强项是不需要我们预先设定列。所以在将来, 如果每个观测站在这三个必有的属性以外还有几百个可选的属性,我们便可以直接 插入这些属性而不需要修改模式。当然,应用中读和写的代码是需要修改的。在示 例中,我们可以把显式获取各个值的代码改为循环遍历Result来获取每个值。

我们将在web应用中使用HBase扫描器(scanner)来检索观测数据。

例13-4.检索HBase表中某范围内气象站观测数据行的方法

public NavigableMap<Long,Integer> getStationObservations(HTable table,String stationId, long maxStamp, int maxCount) throws IOException { byte[] startRow = RowKeyConverter.makeObservationRowKey(stationId,, maxStamp); NavigableMap<Long, Integer> resultMap = new TreeMap<Long, Integer>();Scan scan = new Scan(startRow);scan.addColumn(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);ResultScanner scanner = table.getScanner(scan);Result res = null; int count = 0; try {while ((res = scanner.next()) != null && count++ < maxCount) { byte[] row = res.getRow();byte[] value = res.getValue(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);Long Stamp = Long.MAX_VALUEBytes.toLong(row, row.length -Bytes.SIZEOF—LONG, Bytes.SIZEOF_LONG); Integer temp = Bytes.toInt(value); resultMap.put(stamp, temp);}} finally {scanner.close();}return resultMap;/***Return the last ten observations.*/public NavigableMap
 
   getStationObservations(HTable table, 	String stationId) throws IOException { 	return getStationObservations(table, stationid, Long.MAX_VALUE, 10);
 

getStationObservations()方法接受观测站ID、由maxStamp定义的范围以及 最大数据行数(maxCount)作为参数。注意,返回的NavigableMap实际上是按降 序排列的。如果想以升序读它,需要使用NavigableMap.descendingMap()。

在前面的示例中,把数据存成Long.MAX_VALUE-stamp,其优势还不是特别明 显。如果要根据“偏移量”(offset)和“限制范围”(limit)来获取最新观测数据,这 种存储方式的优势就特别明显。而这种查询在Web应用中是屡见不鲜的。如果观 测数据直接用实际的时间戳来存放,我们就只能根据偏移量和限制范围髙效地获取 最老的观测数据。要获取最新的数据意味着要拿到所有的数据,直到最后才能获得 结果。从RDBMS转向HBase的一个主要原因就是HBase允许这种“提早过滤”(early-out)的情况。

关注微信获取最新动态