关于MapReduce and Distributed Cache
一、创建Hadoop Job
在前面的系列一里面我们已经知道如何配置hadoop了,在完成配置工作之后,我们如何提交job,并运行这些job将是接下来我们讲述的
使用SHDP创建job是相当简单的
<hdp:job id="mr-job" 指定jod id
input-path="/input/" output-path="/ouput/" 指定任务input和output地址
mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper" 指定mapper
reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/> 指定reduce
和我们用spring 定义bean操作的写法很相似。上面的代码可以看到没有指定hadoop configuration,在默认情况下,则会使用默认的约定命名“hadoopConfiguration”,关于key和value的类型将根据mapper和reduce类型自动进行匹配。上面的属性配置型是可以重写的:configuration-ref、key、value
<hdp:job id="mr-job"
input-path="/input/" output-path="/ouput/"
mapper="mapper class" reducer="reducer class"
jar-by-class="class used for jar detection"
properties-location="classpath:special-job.properties">
electric=sea
</hdp:job>
(1)、创建hadoop steaming job
具体代码如下:
<hdp:streaming id="streaming"
input-path="/input/" output-path="/ouput/"
mapper="${path.cat}" reducer="${path.wc}"/>
hadoop streamimg job(简称流)这个也是hadoop比较流行的特征。创建map/reduce的job通过可执行文件或者脚本,类似使用cat和wc命令,很容易的从命令行启动stream job,是需要配置很多参数,而SHDP恰恰简化了这些工作。
命令行配置:
<hdp:streaming id="streaming-env"
input-path="/input/" output-path="/ouput/"
mapper="${path.cat}" reducer="${path.wc}">
<hdp:cmd-env>
EXAMPLE_DIR=/home/example/dictionaries/
...
</hdp:cmd-env>
</hdp:streaming>
二、运行hadoop job
当job在创建和配置后需要提交到到hadoop cluster、
单个job
<hdp:job-runner id="myjob-runner" pre-action="cleanup-script" post-action="export-results" job-ref="myjob" run-at-startup="true"/>
<hdp:job id="myjob" input-path="/input/" output-path="/output/"
mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
reducer="org.apache.hadoop.examples.WordCount.IntSumReducer" />
多个job
<hdp:job-runner id="myjobs-runner" pre-action="cleanup-script" job-ref="myjob1, myjob2" run-at-startup="true"/>
<hdp:job id="myjob1" ... />
<hdp:streaming id="myjob2" ... />
默认情况下 run-at-startup为false,那么一个job不能执行的则可以通过手动设置或者设置run-at-startup设置为true。在没一个job run before 和 after 都将触发pre 和 post行为。
注:当hadoop job提交或者执行处于锁定状态,JobRunner使用JDK的Excutor开启一个job;默认实现是SyncTaskExcutor开启thread执行job来模仿hadoop命令行行为。由于hadoop的操作是非常耗时的,可能会导致程序冻结,不能完成正常的操作甚至会导致程序挂掉等等类似的情况,因而在应用正式的生成环境,要多多检查对应的策略是否实现比较好。
job runner允许在关机的时候 正在运行的job能够被cancelled 和 killed ,这些应用只适合wait-for-completion为true,使用另外一个不同的executor替换默认的;若是自定义行为的话,可以通过设定kill-job-at-shutdown为false或重新实现executor-ref的实现
三、hadoop job tasklet
针对hadoop的批处理环境,SHDP提供专用的tasklet 执行批处理
<hdp:job-tasklet id="hadoop-tasklet" job-ref="mr-job" wait-for-completion="true" />
一般默认情况下,tasklet要等job complete之后才能执行。一旦wait-for-complete 为false,则不会等到job完成任务之后,才将job提交hadoop cluster
四、hadoop tool
指定class
<hdp:tool-runner id="someTool" tool-class="org.foo.SomeTool" run-at-startup="true">
<hdp:arg value="data/in.txt"/>
<hdp:arg value="data/out.txt"/>
property=value
</hdp:tool-runner>
也可以使用jar
<hdp:tool-runner ... jar="myTool.jar">
</hdp:tool-runner>
使用jar属性也就是意味着jar被用作实例并启动工具,同时它所依赖的jar也会被加载,也就是意味着jar不再作为classpath必须的部分
嵌套tool
<hdp:tool-runner id="someTool" run-at-startup="true">
<hdp:tool>
<bean class="org.foo.AnotherTool" p:input="data/in.txt" p:output="data/out.txt"/>
</hdp:tool>
</hdp:tool-runner>
注意:tool-runner默认不执行tool,只有当它被应用的时候;但是可以通过run-at-startup属性进行修改
(1)、使用tool-runner替换shell
shell实现:
hadoop jar job1.jar -files fullpath:props.properties -Dconfig=config.properties ...
hadoop jar job2.jar arg1 arg2...
...
hadoop jar job10.jar ...
tool-runner
<hdp:tool-runner id="job1" tool-class="job1.Tool" jar="job1.jar" files="fullpath:props.properties" properties-location="config.properties"/>
<hdp:tool-runner id="job2" jar="job2.jar">
<hdp:arg value="arg1"/>
<hdp:arg value="arg2"/>
</hdp:tool-runner>
<hdp:tool-runner id="job3" jar="job3.jar"/>
五、hadoop Distributed Cache 分布式cache
<hdp:cache create-symlink="true">
<hdp:classpath value="/cp/some-library.jar#library.jar" />
<hdp:cache value="/cache/some-archive.tgz#main-archive" />
<hdp:cache value="/cache/some-resource.res" />
<hdp:local value="some-file.txt" />
</hdp:cache>
DistributedCache对应的格式 absolute-path#link-name;若是link-name没有指定,则会自动推断资源文件名字