Falcon提供了周期性的将源数据从外部数据库(数据库,drop boxes etc)导入到Hadoop上以及将讲过Hadoop运算的数据导出到外部的数据库。
在至今所有的版本中,Falcon仅仅支持关系型数据库(Mysql,Oracle等)讲过JDBC作为外部的数据源。将来的版本可能增加支持其他类型的数据库。
要将数据导入和数据导出需要满足以下的条件:
- Sqoop 1.4.6+
- Oozie 4.2.0+
- 合适的数据库连接器
Note
Falcon通过Sqoop进行数据导入和数据导出操作,Sqoop将需要合适的数据库驱动来连接相应的数据库。必须确保数据库的jar包被复制到Oozie中的Sqoop的share lib中:
For example, in order to import and export with MySQL, please make sure the latest MySQL connector
*mysql-connector-java-5.1.31.jar+* is copied into oozie's Sqoop share lib
/user/oozie/share/lib/{lib-dir}/sqoop/mysql-connector-java-5.1.31.jar+
where {lib-dir} value varies in oozie deployments.
实体定义和设置
数据源实体实现了数据库连接和数据库凭据验证的高度抽象。数据源实体支持了数据库特定凭据验证的读和写的接口,如果读和写的接口不能使用使用的话,它会调用默认的凭据进行验证。通常情况下,数据源实体将被系统管理员定义。
接下来的例子定义了一个Mysql的数据源的实体,这个导入操作将使用Mysql的读接口(URI:jdbc:mysql://dbhost/test)用户名:import_usr,密码:sqoop。导出操作将使用带有(URI:jdbc:mysql://dbhost/test)的写接口采用用户名:export_usr和密码指定在HDFS的 “/user/ambari-qa/password-store/password_write_user”.这个文件中。
当读接口和写接口的凭据不能使用时,将使用默认的凭据The default credential specifies the password using password alias feature available via hadoop credential functionality. 用户可以创建一个秘钥使用如下命令:
hadoop credential -create <alias> -provider <provider-path>
where is a string and is a HDFS jceks file,在运行期间 ,the specified alias will be used to look up the password stored encrypted in the jceks hdfs file specified under the providerPath element.
读和写的接口可以使管理员分开读和写的工作量:
File: mysql-database.xml
<?xml version="1.0" encoding="UTF-8"?>
<datasource colo="west-coast" description="MySQL database on west coast" type="mysql" name="mysql-db" xmlns="uri:falcon:datasource:0.1">
<tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
<interfaces>
<!-- ***** read interface ***** -->
<interface type="readonly" endpoint="jdbc:mysql://dbhost/test">
<credential type="password-text">
<userName>import_usr</userName>
<passwordText>sqoop</passwordFile>
</credential>
</interface>
<!-- ***** write interface ***** -->
<interface type="write" endpoint="jdbc:mysql://dbhost/test">
<credential type="password-file">
<userName>export_usr</userName>
<passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile>
</credential>
</interface>
<!-- *** default credential *** -->
<credential type="password-alias">
<userName>sqoop2_user</userName>
<passwordAlias>
<alias>sqoop.password.alias</alias>
<providerPath>hdfs://namenode:8020/user/ambari-qa/sqoop_password.jceks</providerPath>
</passwordAlias>
</credential>
</interfaces>
<driver>
<clazz>com.mysql.jdbc.Driver</clazz>
<jar>/user/oozie/share/lib/lib_20150721010816/sqoop/mysql-connector-java-5.1.31</jar>
</driver>
</datasource>
Feed实体可以使用户在指定去保持数据生命周期和复制数据的同时定义数据导入和数据导出的策略,导入和导出策略将引用已经定义的数据库实体去连接和指定特定的数据表。接下来的例子定义了一个具有导入和导出策略的feed实体,导入和导出都将引用“mysql-db”实体,导入操作将使用读接口和导出操作将使用写接口,一个Feed实例将每一个小时被创建一次和由于保留策略的设置这个feed策略将被删除在90天之后。
File: customer_email_feed.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--
A feed representing Hourly customer email data retained for 90 days
-->
<feed description="Raw customer email feed" name="customer_feed" xmlns="uri:falcon:feed:0.1">
<tags>externalSystem=USWestEmailServers,classification=secure</tags>
<groups>DataImportPipeline</groups>
<frequency>hours(1)</frequency>
<late-arrival cut-off="hours(4)"/>
<clusters>
<cluster name="primaryCluster" type="source">
<validity start="2015-12-15T00:00Z" end="2016-03-31T00:00Z"/>
<retention limit="days(90)" action="delete"/>
<import>
<source name="mysql-db" tableName="simple">
<extract type="full">
<mergepolicy>snapshot</mergepolicy>
</extract>
<fields>
<includes>
<field>id</field>
<field>name</field>
</includes>
</fields>
</source>
<arguments>
<argument name="--split-by" value="id"/>
<argument name="--num-mappers" value="2"/>
</arguments>
</import>
<export>
<target name="mysql-db" tableName="simple_export">
<load type="insert"/>
<fields>
<includes>
<field>id</field>
<field>name</field>
</includes>
</fields>
</target>
<arguments>
<argument name="--update-key" value="id"/>
</arguments>
</export>
</cluster>
</clusters>
<locations>
<location type="data" path="/user/ambari-qa/falcon/demo/primary/importfeed/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}"/>
<location type="stats" path="/none"/>
<location type="meta" path="/none"/>
</locations>
<ACL owner="ambari-qa" group="users" permission="0755"/>
<schema location="/none" provider="none"/>
</feed>
导入策略将使用datasource实体指定特定连接特定的数据源,表名应该在存在引用的数据库中,导入过程可以指定导入特定的字段并且可以指定是每次都是全部一次性倒入或者增量导入,
The merge policy specifies how to organize (snapshot or append, i.e time series partiitons) the data on hadoop.有效的组合是:
[full,snapshot] :数据每次被全部抽取并且被保存在Feed指定的位置。
[incremental, append] :data is extracted incrementally using the key specified in the deltacolumn and added as a partition to the feed instance location.
接下来的例子定义了[incremental, append]的策略:
<import>
<source name="mysql-db" tableName="simple">
<extract type="incremental">
<deltacolumn>modified_time</deltacolumn>
<mergepolicy>append</mergepolicy>
</extract>
<fields>
<includes>
<field>id</field>
<field>name</field>
</includes>
</fields>
</source>
<arguments>
<argument name="--split-by" value="id"/>
<argument name="--num-mappers" value="2"/>
</arguments>
</import>
Filed标签将一定哪一个列将被导入,默认情况下,所有的列将被导入,Includes标签定义了其中需要导入的列的字段,Excludes正好与Includes相反,
arguments标签将可以调用在Sqoop中任何额外的参数。
导出策略和导入策略同理,使用Datasource去连接数据库,加载特定列的数据对数据表进行插入或者更新数据,Fields操作的选项和Import具有同样的策略。表名应该存在于数据库中。
操作步骤:
## submit the mysql-db datasource defined in the file mysql_datasource.xml
falcon entity -submit -type datasource -file mysql_datasource.xml
## submit the customer_feed specified in the customer_email_feed.xml
falcon entity -submit -type feed -file customer_email_feed.xml
## schedule the customer_feed
falcon entity -schedule -type feed -name customer_feed