1, 安装mrjob
pip install mrjob
pip的安装看上一篇文章。
2,代码测试
mrjob安装完之后,就可以直接用了。如果hadoop已经配置好,不需要额外的配置东西(HADOOP_HOME这个环境变量要配置好),基于mrjob的程序就可以直接在hadoop平台上运行了。
mrjob提供了几种代码运行的方式,1)本地测试,就是直接在本地运行代码 2)在本地模拟hadoop的运行 3)在hadoop集群上运行 等等。下面先看一下本地运行的情况。
来自官网的一段代码:
from mrjob.job import MRJob class MRWordCounter(MRJob): def mapper(self, key, line): for word in line.split(): yield word, 1 def reducer(self, word, occurrences): yield word, sum(occurrences) if __name__ == '__main__': MRWordCounter.run()
本地运行:python MRWrodCounter.py -r inline <input> output
这会把结果输出到Output里面。
发现还有一种用法:python MRWrodCounter.py -r inline input1 这样可以直接打印到屏幕,此时可以设置多个输入如:python MRWordCounter.py -r inline input1 input2 input3。
应用python MRWordCounter.py -r inline input1 input2 input3 > out 命令可以将处理多个文件的结果输出到out里面。
本地模拟hadoop运行:python MRWordCounter -r local <input> output
这个会把结果输出到output里面,这个output必须写。
hadoop集群上运行:python MRWordCounter -r hadoop <input> output
3,mrjob的用法
mrjob的用法在它的官方文档里面写的很全面,这里写到的知识其中最基本的部分。
首先,分析上面的代码。
一个map-reduce任务最简单的写法就是覆盖MRJob的mapper,combiner,reducer函数。默认的配置下,输入到mapper的key是个None. mapper产生(word,1),这个具体是通过JSON在各个任务中传输。所以,你的python要支持JSON。再看,combiner和reducer,这个两个的key没问题,需要注意的是value部分,根据官方文档上描述,这个value是个iterator of the numbers,所以sum这个函数用在这里是很合理的。最终reduce的输出还是key,value对,中间用tab键分隔。
mrjob还提供了定义多个步骤的功能,覆盖steps()函数即可完成,下面代码展示了这一过程:
from mrjob.job import MRJob class MRDoubleWordFreqCount(MRJob): """Word frequency count job with an extra step to double all the values""" def get_words(self, _, line): for word in line.split(): yield word.lower(), 1 def sum_words(self, word, counts): yield word, sum(counts) def double_counts(self, word, counts): yield word, counts * 2 def steps(self): return [self.mr(mapper=self.get_words, combiner=self.sum_words, reducer=self.sum_words), self.mr(mapper=self.double_counts)] if __name__=='__main__': MRDoubleWordFreqCount.run()
这个mrjob用起来很方便的,可以很容易的测试代码,也可以很快捷的进行开发。