当前位置: 首页 > 工具软件 > mrjob > 使用案例 >

Hadoop深入浅出Hadoop Streaming&MRJob

鲜于宏义
2023-12-01

Hadoop Streaming 是Hadoop提供的一个 MapReduce 编程工具,它允许用户使用任何可执行文件、脚本语言或其他编程语言来实现 Mapper 和 Reducer 作业。Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程序的输入,将标准输出作为程序的输出就可以了。

原理分析

Hadoop Streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer,我们来分析下Map/Reduce框架和Streaming mapper/reducer之间是如何工作的:

  1. Mapper: 我们使用一个可执行文件用于mapper时候, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。 mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入, 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的<不包括tab>作为value。 如果没有tab,整行作为key值,value值为null;

  2. Reducer: 我们使用一个可执行文件用于reducer,同样的是每个reducer任务会把这个可执行文件作为一个单独的进程启动。 reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入,同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的<不包括tab>作为value。

简单的例子

python切分例子

// mapper.py
#!/usr/bin/env python
# 切分word
import sys

def read_input(file):
    for line in file:
        yield line.split()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print '%s%s%d' % (word, separator, 1)

if __name__ == "__main__":
    main()

shell脚本切分例子

# mapper.sh
#! /bin/bash

while read LINE; do
  for word in $LINE
  do
    echo "$word 1"
  done
done

打包提交

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \  -- 输入文件目录
    -output myOutputDir \  -- 输出文件目录
    -mapper mapper.py \  -- mapper脚本
    -reducer /bin/wc \  -- reducer脚本
    -file myPythonScript.py \  -- 脚本文件
    -file myDictionary.txt  -- 其他文件

常见参数

K-V的一些参数

  1. -D stream.reduce.output.field.separator=SEP可以设置K-V分割符号,默认是\t
  2. stream.num.map.output.key.fields,stream.num.reducer.output.key.fields;指定mapper和reducer输出的key的列范围,例如,值为2表示第1、2列一起作为key,默认是第一个分隔符前面的部分;
  3. mapred.text.key.partitioner.options用来保证指定key后,相同的key被分配到同一个reducer上,设置值为-ki,j,表示从第i到第j列作为分区的key。

文件上传的一些参数

  1. -files参数可以上传文件,比方python文件等;

  2. -archives参数后的文件会被自动解压,并且在默认情况下会通过一个命名为<文件名>的软链接symlink指向解压后的文件夹。例如:-archives libs.tgz会将解压后的文件夹用链接#libs.tgz来表示。

MRJob

MRJob是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在MRJob的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码来进行测试,同时,MRJob还可以轻松运行于Amazon Elastic MapReduce上。MRJob虽然是快速编写map缩减工作的好工具,用起来比较方便,但是Python通常比Java慢,MRJob比大多数python框架要慢,所以具有运行时成本。

安装

首先要安装mrjob,使用pip安装。

pip install mrjob

介绍

MRJob定义了如下函数,我们可以重写这些函数来进行我们的map-reduce任务。

基本函数介绍:

  1. mapper(key, value):是对输入的每一行数据进行处理,key可能为NULL;
  2. combiner(key, values): 是map端聚合的处理函数;
  3. reducer(key, values): reduce端处理函数。

初始化函数介绍:

  1. mapper_init():在map任务处理第一条数据前进行的初始化操作;
  2. combiner_init():在combine任务处理第一条数据前进行的初始化操作;
  3. reducer_init():在reduce任务处理第一条数据前进行的初始化操作。

后处理函数介绍:

  1. mapper_final():是在map任务到达最后一行处理数据后出发的操作,可以清理;
  2. combiner_final():是在combine任务到达最后一行处理数据后出发的操作,可以清理;
  3. reducer_final():是在reduce任务到达最后一行处理数据后出发的操作,可以清理。

另外还有一个比较实用的函数,可以用来计数,最后输出:

  1. increment_counter(group, counter, amount=1)中group最后日志打印出现的组,counter表示计数器名字,amout表示计数,一般可以设置为1,表示加1,这样子在运行结束时候,我们就可以看到我们统计的信息;

简单程序

我们来实现一个简单的统计单词的程序,mapper对于每行数据,进行三方面处理,获取整个行的字符长度,单词个数,以及行数,reducer则实现了不同key的计数统计。

#coding=utf-8

from mrjob.job import MRJob

class MRFirstJob(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    MRFirstJob().run()

本地测试,可以通过python mr_job.py input.txt来执行,如下所示:

## input.txt
name test
xy dong asdt

## python mr_job.py test.txt
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535
Running step 1 of 1...
job output is in /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535/output
Streaming final output from /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535/output...
"words"	5
"lines"	2
"chars"	21
Removing temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_job.20201209.033142.803535...

集群提交我们后面进行讲解。

多步程序

MRJob.steps()给我们了可以进行多步任务的入口,我们需要覆盖steps方法,并在step中返回一个由mapper, combiner, reducer等组成的list,我们下面实现一个查找输入文件中的最高频词汇将被输出。

from mrjob.job import MRJob
from mrjob.step import MRStep
import re
 
class MRMostUsedWord(MRJob):
    def steps(self):
        return[
            MRStep(mapper = self.mapper_get_words,
                   combiner = self.combiner_count_words,
                   reducer = self.reducer_count_words),
            MRStep(reducer = self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        for word in line.split():
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts): 
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts): 
        yield None, (sum(counts), word)

    def reducer_find_max_word(self, _, word_count_pairs):
        yield max(word_count_pairs)
 
if __name__ == '__main__':
    MRMostUsedWord.run()

本地测试,可以通过python mr_steps_job.py test.txt来执行,如下所示:

## test.txt
name test
name li
hah test

## python mr_steps_job.py test.txt
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257
Running step 1 of 2...
Running step 2 of 2...
job output is in /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257/output
Streaming final output from /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257/output...
2	"test"
Removing temp directory /var/folders/v2/t9zqx6hj68vbyt22n5tsch0m0000gn/T/mr_steps_job.20201209.070918.071257...

hadoop集群提交运行

我们可以通过如下方式去提交到hadoop集群进行工作,填充里面的hadoop-bin<hadoop位置,可能没设置环境变量,绝对路径指定>,hadoop_streaming_jar<hadoop streaming支持的jar包位置>,files是要上传的文件,input_path是输入的多个文件路径,output_path是我们输出的目录。

python mr_job.py \ 
    --params 20201202 \
		-r hadoop --no-output \
		--hadoop-bin {hadoop_bin_path} \
		--hadoop-arg -libjars \
		--jobconf mapred.job.queue.name=xx \
		--jobconf mapred.min.split.size=7680000000 \
		--jobconf mapred.output.compress=true \
		--jobconf mapred.job.name=xxx \
		--jobconf mapred.reduce.tasks=1000\ 
		--jobconf mapreduce.reduce.java.opts=-Xmx12000m \
	  --hadoop-streaming-jar {hadoop_streaming_jar} \
	  {files}  \
	  {input_path}  \ 
	  -o {output_path}

参考

  1. https://zhuanlan.zhihu.com/p/34903460
  2. https://hadoop.apache.org/docs/r1.0.4/cn/streaming.html
  3. https://developer.aliyun.com/article/25543
  4. https://github.com/Yelp/mrjob/tree/master/mrjob
  5. https://mrjob.readthedocs.io/en/latest/guides/writing-mrjobs.html#defining-steps
 类似资料: