Hadoop教程

使用导入的数据

一旦数据导入HDFS,就可以供定制的MapReduce程序使用。导入的文本格式数 据可以供Hadoop Streaming中的脚本或以TextlnputFormat为默认格式运行的 MapReduce作业使用。

为了使用导入记录的个别字段,必须对字段分隔符(以及转义/包围字符)进行解析, 抽出字段的值并转换为相应的数据类型。例如,在文本文件中,“sprocket”部件 的id被表示成字符串“1” ,但必须被解析为Java的Integer或int类型的变 量。Sqoop生成的表类能够自动完成这个过程,使你可以将精力集中在真正要运行 的MacReduce作业上。每个自动生成的类都有几个名为parse()的重载方法,这 些方法可以对表示为Text、CharSequence、char[]或其他常见类型的数据进行 操作。名为MaxWidgetId的MapReduce应用(在示例代码中)可以找到具有最大ID的 部件。

这个类可以和Widget.java 一起编译成一个JAR文件。在编译时需要将 Hadoop(Aadoop-core-version.jar)和Sqoop(sqoop-version.jar)的位置加入类路径。然 后,类文件就可以合并成一个JAR文件,并且像这样运行:

% jar cvvf widgets.jar *.class% HADOOP_CLASSPATH=/usr/lib/sqoop/sqoop-version*jar hadoop jar \>widgets.jar MaxWidgetId -libjars /usr/iib/sqoop/sqoop-version.jar

MaxWidgetId.run()方法在运行时,以及map任务在集群上运行时(通过 -libjars参数),这条命令确保了 Sqoop位于本地的类路径中(通过 $HADOOP_CLASSPATH)。运行之后,HDFS的maxwidgets路径中便有一个名为part-r-00000的文件,其内容 如下:

3,gadget,99.99,1983-08-13,13,Our flagship product

注意,在这个MapReduce示例程序中,一个Widget对象从mapper发送到 reducer,这个自动生成的Widget实现了Hadoop提供的Writable接口,该接 口允许通过Hadoop的序列化机制来发送对象,以及写入到SequenceFile文件或从 SequenceFile文件读出对象。

这个MaxWidgetID的例子是建立在新的MapReduce API之上的。虽然某些高级功 能(例如使用大对象数据)只有在新的API中使用起来才更方便,但无论新旧API, 都可以用来构建依赖于Sqoop生成代码的MapReduce应用。

导入的数据与Hive

对于很多类型的分析任务来说,使用类似于Hive的系统来处理关 系操作有利于加快分析任务的开发。特别是对于那些来自于关系数据源的数据,使 用Hive是非常有帮助的。Hive和Sqoop共同构成了一个强大的服务于分析任务的 工具链。

假设在我们的系统中有另外一组数据记录,来自一个基于Web的部件采购系统。 这个系统返回的记录文件中包含部件山、数量、送货地址和订单日期。

下面是此类记录的例子:

1,15,120 Any St.,Los Angeles,CA,90210,2010-08-013,4,120 Any St.,Los Angeles,CA,90210,2010-08-01 2,5,400 Some Pl.,Cupertino,CA,95014,2010-07-30 2,7,88 Mile Rd.,Manhattan,NY,10005,2010-07-18

通过使用Hadoop来分析这组采购记录,我们可以洞察我们的销售业务。这些数据 与来自关系数据源(widgets 表)的数据相结合,可以使我们做得更好。在这个例子 中,我们将计算哪个邮政编码区域的销售业绩最好,便可以让我们的销售团队更加 关注于该区域。为了做到这一点,我们同时需要来自销售记录和widgets表的 数据。

上述销售记录数据保存在一个名为sales.log的本地文件中。首先,让我们将销售数据载入Hive:

hive> CREATE TABLE sales(widget_id INT, qty INT,    >street STRING, city STRING, state STRING,    >zip INT, sale_date STRING)    >ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';0KTime taken: 5.248 secondshive> LOAD DATA LOCAL INPATH "sales.log" INTO TABLE sales;Copying data from file:/home/sales.log Loading data to table sales OKTime taken: 0.188 seconds

Sqoop能够根据一个关系数据源中的表来生成一个Hive表。既然我们已经将 widgets表的数据导入到HDFS,那么我们就直接生成相应Hive表的定义,然后 加载保存在HDFS中的数据:

% sqoop create-hive-table --connect jdbc:mysql://localhost/hadoopguide \--table widgets --fields-terminated-by ','10/06/23 18:05:34 INFO hive.HiveImport: 0K10/06/23 18:05:34 INFO hive.HiveImport: Time taken: 3.22 seconds 10/06/23 18:05:35 INFO hive.HiveImpont: Hive impont complete.% hivehive> LOAD DATA INPATH "widgets" INTO TABLE widgets;Loading data to table widgets OKTime taken: 3.265 seconds

在为一个特定的已导入数据集创建相应的Hive表定义时,我们需要指定该数据集 所使用的分隔符。否则,Sqoop将允许Hive使用它自己的默认分隔符(与Sqoop的 默认分隔符不同)。

如果想直接从数据库将数据导入到Hive,可以将上述的三个步骤(将数据导入 HDFS,创建Hive表,将HDFS中的数据导入Hive)缩短为一个步骤。在进行导入 时,Sqoop可以生成Hive表的定义,然后直接将数据导入Hive表。如果我们还没 有执行过导入操作,就可以使用下面这条命令,根据MySQL中的数据直接创建 Hive 中的 widgets 表:

% sqoop import --connect jdbc:mysql://localhost/hadoopguide \>--table widgets -m 1 --hive-import

无论选择哪一种数据导入的方式,现在我们都可以使用widgets数据集和sales 数据集来计算最赚钱的邮政编码地区。让我们来做这件事,并且把查询的结果保存 在另外一张表中,以便将来使用:

hive> CREATE TABLE zip_profits (sales_vol DOUBLE, zip INT);0Khive> INSERT OVERWRITE TABLE zip_profits  >SELECT SUM(w.price * s.qty) AS sales_vol, s.zip FROM SALES s >JOIN widgets w ON (s.widget_id = w.id) GROUP BY s.zip;...3 loaded to zip_profitsOKhive>SELECT * FROM zip_profits ORDER BY sales_vol DESC;...OK403.71 90210 28.0   10005 20.0   95014

关注微信获取最新动态