Hadoop教程

深入了解数据库导入

如前所述,Sqoop是通过一个MapReduce作业从数据库中导入一个表,这个作业 从表中抽取一行行记录,然后将记录写入HDFS。MapReduce是如何读取记录的? 本节将解释Sqoop的底层工作机理。

图15-1粗略演示了Sqoop是如何与源数据库及Hadoop进行交互的。像Hadoop一样,Sqoop是用Java语言编写的。Java提供了一个称为JDBC(Java Database Connectivity)的API,应用程序可以使用这个API来访问存储在RDBMS中的数据 以及检查数据的性质和类型。大多数数据库厂商都提供JDBC驱动程序,其中实现 了JDBC API并包含用于连接其数据库服务器的必要代码。

图15-1. Sqoop的导入过程

在导入开始之前,Sqoop使用JDBC来检查将要导入的表。它检索出表中所有的列 以及列的SQL数据类型。这些SQL类型(VARCHAR、INTEGER等)被映射到Java数 据类型(String、Integer等),在MapReduce应用中将使用这些对应的Java类型 来保存字段的值。Sqoop的代码生成器使用这些信息来创建对应表的类,用于保存从表中抽取的记录。

例如,之前提到的Widget类包含下列方法,这些方法用于从抽取的记录中检索所有的列:

public Integer get_id();public String get_widget_name();public java.math.BigDecimal get_price();public java.sql.Date get_design_date();public Integer get_version();public String get_design_comment():

不过,对于导入来说,更关键的是DBWritable接口的序列化方法,这些方法能使 Widget类和JDBC进行交互:

public void readFields(ResultSet —dbResults) throws SQLException; public void write(PreparedStatement —dbStmt) throws SQLException;

Sqoop启动的MapReduce作业用到一个InputFormat,它可以通过JDBC从一个 数据库表中读取部分内容。Hadoop提供的DataDrivenDBInputFormat能够为几 个map任务对査询结果进行划分。

使用一个简单的查询通常就可以读取一张表的内容,例如:

SELECT coLl,coL2,coL3,... FROM tabLeName

但是,为了获得更好的导入性能,人们经常将这样的査询划分到多个节点上执行。 查询是根据一个“划分列”(splitting column)来进行划分的。根据表的元数据, Sqoop会选择一个合适的列作为划分列(通常是表的主键)。主键列中的最小值和最 大值会被读出,与目标任务数一起用来确定每个map任务要执行的查询。

例如,假设widgets表中有100 000条记录,其id列的值为0~99 999。在导入 这张表时,Sqoop会判断出id是表的主键列。启动MapReduce作业时,用来执行 导入的 DataDrivenDBhputFormat 便会发出一条类似于 SELECT MIN(id), MAX(id) FROM widgets的查询语句。检索出的数据将用于对整个数据集进行划 分。假设我们指定并行运行5个map任务(使用-m 5),这样便可以确定毎个map 任务要执行的查询分别为:SELECT id, widget_name, ... FROM widgets WHERE id >= 0 AND id <20000, SELECT id, widget_name, ... FROM widgets WHERE id >= 20000 AND id <40000, ...,以此类推。

划分列的选择是影响并行执行效率的重要因素。如果id列的值不是均匀分布的(也 许在id值50 000到75 000的范围内没有记录),那么有一部分map任务可能只有 很少或没有工作要做,而其他任务则有很多工作要做。在运行一个导入作业时,用 户可以指定一个列作为划分列,从而调整作业的划分使其符合数据的真实分布。如 果使用-m 1让一个任务执行导入作业,就不再需要这个划分过程。

在生成反序列化代码和配置lnputFormat之后,Sqoop将作业发送到MapReduce 集群。map任务执行查询并且将ResultSet中的数据反序列化到生成类的实例, 这些数据要么直接保存在SequenceFile文件中,要么在写到HDFS之前被转换成分 隔的文本。

导入控制

Sqoop不需要每次都导入整张表。例如,可以指定仅导入表的部分列。用户也可以 在查询中加入WHERE子句,以此来限定需要导入的记录。例如,如果上个月已经 将id为0~99 999的记录导入,但本月供应商的产品目录中增加了 1000种新部 件,那么导入时在查询中加入子句WHERE id >= 100000,就可以实现只导入所 有新增的记录。用户提供的WHERE子句会在任务分解之前执行,并且被下推至每 个任务所执行的查询中。


导入和一致性

在向HDFS导入数据时,重要的是要确保访问的是数据源的一致性快照。从一个 数据库中并行读取数据的Map任务分别运行在不同的进程中。因此,它们不能共 享一个数据库事务。保证一致性的最好方法就是在导入时不允许运行任何进程对表 中现有数据进行更新。

直接模式导入

Sqoop的架构允许它在多种可用的导入方法中进行选择。多数数据库都使用上述基 于DataDrivenDBInputFormat的方法。一些数据库提供了能够快速抽取数据的 特定工具。例如,MySQL的mysqldump能够以大于JDBC的吞吐率从表中读取数 据。在Sqoop的文档中将这种使用外部工具的方法称为“直接模式”(direct mode)。由于直接模式并不像1080方法那样通用,所以必须由用户明确地启动(通 过一direct参数)。(例如,MySQL的直接模式不能处理大对象数据——类型为CLOB或BLOB的列,Sqoop需要使用JDBC专用的API将这些列载入HDFS。)

对于那些提供了此类特定工具的数据库,Sqoop使用这些工具能够得到很好的效 果。采用直接模式从MySQL中导入数据通常比基于JDBC的导入更加髙效(就map 任务和所需时间而言)。Sqoop仍然并行启动多个map任务,接着这些任务将分别 创建mysqldump程序的实例并且读取它们的运行结果,其效果类似于Maatkit工具集中mk-parallel-dump的分布式实现。

即使是用直接模式来访问数据库的内容,元数据的查询仍然是通过JDBC来实 现的。

关注微信获取最新动态