Hadoop Streaming 是Hadoop提供的一个 MapReduce 编程工具,它允许用户使用任何可执行文件、脚本语言或其他编程语言来实现 Mapper 和 Reducer 作业。Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程序的输入,将标准输出作为程序的输出就可以了。
Hadoop Streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer,我们来分析下Map/Reduce框架和Streaming mapper/reducer之间是如何工作的:
Mapper: 我们使用一个可执行文件用于mapper
时候, 每一个mapper
任务会把这个可执行文件作为一个单独的进程启动。 mapper
任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入, 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的<不包括tab>作为value。 如果没有tab,整行作为key值,value值为null;
Reducer: 我们使用一个可执行文件用于reducer
,同样的是每个reducer
任务会把这个可执行文件作为一个单独的进程启动。 reducer
任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入,同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的<不包括tab>作为value。
// mapper.py
#!/usr/bin/env python
# 切分word
import sys
def read_input(file):
for line in file:
yield line.split()
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
for word in words:
print '%s%s%d' % (word, separator, 1)
if __name__ == "__main__":
main()
# mapper.sh
#! /bin/bash
while read LINE; do
for word in $LINE
do
echo "$word 1"
done
done
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \ -- 输入文件目录
-output myOutputDir \ -- 输出文件目录
-mapper mapper.py \ -- mapper脚本
-reducer /bin/wc \ -- reducer脚本
-file myPythonScript.py \ -- 脚本文件
-file myDictionary.txt -- 其他文件
-D stream.reduce.output.field.separator=SEP
可以设置K-V分割符号,默认是\t
;stream.num.map.output.key.fields
,stream.num.reducer.output.key.fields
;指定mapper和reducer输出的key的列范围,例如,值为2表示第1、2列一起作为key,默认是第一个分隔符前面的部分;mapred.text.key.partitioner.options
用来保证指定key后,相同的key被分配到同一个reducer上,设置值为-ki,j
,表示从第i到第j列作为分区的key。-files
参数可以上传文件,比方python文件等;
-archives
参数后的文件会被自动解压,并且在默认情况下会通过一个命名为<文件名>的软链接symlink指向解压后的文件夹。例如:-archives libs.tgz
会将解压后的文件夹用链接#libs.tgz
来表示。
MRJob
是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在MRJob
的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码来进行测试,同时,MRJob
还可以轻松运行于Amazon Elastic MapReduce上。MRJob
虽然是快速编写map缩减工作的好工具,用起来比较方便,但是Python
通常比Java
慢,MRJob
比大多数python框架要慢,所以具有运行时成本。
首先要安装mrjob,使用pip安装。
pip install mrjob
MRJob
定义了如下函数,我们可以重写这些函数来进行我们的map-reduce任务。
基本函数介绍:
mapper(key, value)
:是对输入的每一行数据进行处理,key可能为NULL;combiner(key, values)
: 是map端聚合的处理函数;reducer(key, values)
: reduce端处理函数。初始化函数介绍:
mapper_init()
:在map任务处理第一条数据前进行的初始化操作;combiner_init()
:在combine任务处理第一条数据前进行的初始化操作;reducer_init()
:在reduce任务处理第一条数据前进行的初始化操作。后处理函数介绍:
mapper_final()
:是在map任务到达最后一行处理数据后出发的操作,可以清理;combiner_final()
:是在combine任务到达最后一行处理数据后出发的操作,可以清理;reducer_final()
:是在reduce任务到达最后一行处理数据后出发的操作,可以清理。另外还有一个比较实用的函数,可以用来计数,最后输出:
increment_counter(group, counter, amount=1)
中group最后日志打印出现的组,counter表示计数器名字,amout表示计数,一般可以设置为1,表示加1,这样子在运行结束时候,我们就可以看到我们统计的信息;我们来实现一个简单的统计单词的程序,mapper
对于每行数据,进行三方面处理,获取整个行的字符长度,单词个数,以及行数,reducer
则实现了不同key的计数统计。
#coding=utf-8
from mrjob.job import MRJob
class MRFirstJob(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRFirstJob().run()
本地测试,可以通过python mr_job.py input.txt
来执行,如下所示:
## input.txt
name test
xy dong asdt
## python mr_job.py test.txt
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535
Running step 1 of 1...
job output is in /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535/output
Streaming final output from /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535/output...
"words" 5
"lines" 2
"chars" 21
Removing temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535...
集群提交我们后面进行讲解。
MRJob.steps()
给我们了可以进行多步任务的入口,我们需要覆盖steps方法,并在step中返回一个由mapper, combiner, reducer等组成的list,我们下面实现一个查找输入文件中的最高频词汇将被输出。
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
class MRMostUsedWord(MRJob):
def steps(self):
return[
MRStep(mapper = self.mapper_get_words,
combiner = self.combiner_count_words,
reducer = self.reducer_count_words),
MRStep(reducer = self.reducer_find_max_word)
]
def mapper_get_words(self, _, line):
for word in line.split():
yield (word.lower(), 1)
def combiner_count_words(self, word, counts):
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
yield None, (sum(counts), word)
def reducer_find_max_word(self, _, word_count_pairs):
yield max(word_count_pairs)
if __name__ == '__main__':
MRMostUsedWord.run()
本地测试,可以通过python mr_steps_job.py test.txt
来执行,如下所示:
## test.txt
name test
name li
hah test
## python mr_steps_job.py test.txt
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257
Running step 1 of 2...
Running step 2 of 2...
job output is in /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257/output
Streaming final output from /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257/output...
2 "test"
Removing temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257...
我们可以通过如下方式去提交到hadoop集群进行工作,填充里面的hadoop-bin
<hadoop位置,可能没设置环境变量,绝对路径指定>,hadoop_streaming_jar
<hadoop streaming支持的jar包位置>,files
是要上传的文件,input_path
是输入的多个文件路径,output_path
是我们输出的目录。
python mr_job.py \
--params 20201202 \
-r hadoop --no-output \
--hadoop-bin {hadoop_bin_path} \
--hadoop-arg -libjars \
--jobconf mapred.job.queue.name=xx \
--jobconf mapred.min.split.size=7680000000 \
--jobconf mapred.output.compress=true \
--jobconf mapred.job.name=xxx \
--jobconf mapred.reduce.tasks=1000\
--jobconf mapreduce.reduce.java.opts=-Xmx12000m \
--hadoop-streaming-jar {hadoop_streaming_jar} \
{files} \
{input_path} \
-o {output_path}