mrjob是python的mapreduce编程接口,可以很方便的完成单层或多层map-reduce任务。现利用mrjob完成最简单的字频统计任务。
我们要统计的文本如下:
a b c
a b
d d
将其保存并命名为text文件
[hadoop@localhost workspace]$ cat text
a b c
a b
d d
相应的py程序如下
[hadoop@localhost workspace]$ cat test3.py
#!/usr/bin/python
from mrjob.job import MRJob
class COUNT(MRJob):
def mapper(self,_,line):
words = line.split()
for word in words:
yield word,1
def reducer(self,key,value):
yield key,sum(value)
if __name__ == "__main__":
COUNT.run()
以上脚本里,mrjob会把读取文件的每一行传给首个mapper,因为在这个脚本里只定义了一个mapper函数,因此所处理的数据的每行将传到mapper的line参数中。
此外,开头的#!/usr/bin/python是注明脚本的默认运行程序,倘若运行py脚本时指明程序可不添加。
首先修改py文件的权限,防止因权限不足运行失败
[hadoop@localhost workspace]$ chmod +x ./test3.py
随后执行如下命令运行我们的Py脚本,完成Mapreduce的词频统计,我们首先不使用hadoop,仅在本地进行测试
[hadoop@localhost workspace]$ ./test3.py -r local text -o out
No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/test3.hadoop.20210424.053521.990352
Running step 1 of 1...
job output is in out
Removing temp directory /tmp/test3.hadoop.20210424.053521.990352...
[hadoop@localhost workspace]$ cat out/*
"a" 2
"b" 2
"c" 1
"d" 2
可以看到程序被成功执行。
在指令./test3.py -r local text -o out中,
./text3.py是我们的脚本文件,
-r local指明mrjob运行方式是本地,
text是我们的输入的文件名,这里是当前目录下的text文件,
-o out是指明将结果输出在哪个文件下,这里是指在当前目录下新创建out目录保存输出结果(请保证程序执行前-o所指定的目录未被创建,因为该目录将由程序运行时自动创建生成,提前创建可能导致运行过程中产生冲突而会导致报错)
若您在您的py文件开头未添加#!/usr/bin/python这一行,您可以通过如下命令运行您的程序(即在命令前添加python或python3)
[hadoop@localhost workspace]$ python ./test3.py -r local text -o out
No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/test3.hadoop.20210424.053521.990352
Running step 1 of 1...
job output is in out
Removing temp directory /tmp/test3.hadoop.20210424.053521.990352...
[hadoop@localhost workspace]$ cat out/*
"a" 2
"b" 2
"c" 1
"d" 2
通过以下指令将程序在hadoop上运行
[hadoop@localhost workspace]$ ./test3.py -r hadoop text -o output
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop/bin...
Found hadoop binary: /usr/local/hadoop/bin/hadoop
Using Hadoop version 2.6.0
Looking for Hadoop streaming jar in /usr/local/hadoop...
Found Hadoop streaming jar: /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar
Creating temp directory /tmp/test3.hadoop.20210424.055450.699698
uploading working dir files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.055450.699698/files/wd...
Copying other local files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.055450.699698/files/
Running step 1 of 1...
packageJobJar: [/tmp/hadoop-unjar6276312191919821230/] [] /tmp/streamjob825727840311838804.jar tmpDir=null
Connecting to ResourceManager at /0.0.0.0:8032
Connecting to ResourceManager at /0.0.0.0:8032
Total input paths to process : 1
number of splits:2
Submitting tokens for job: job_1619235102568_0005
Submitted application application_1619235102568_0005
The url to track the job: http://localhost:8088/proxy/application_1619235102568_0005/
Running job: job_1619235102568_0005
Job job_1619235102568_0005 running in uber mode : false
map 0% reduce 0%
map 67% reduce 0%
map 100% reduce 0%
map 100% reduce 100%
Job job_1619235102568_0005 completed successfully
Output directory: hdfs:///user/hadoop/output
Counters: 49
File Input Format Counters
Bytes Read=21
File Output Format Counters
Bytes Written=24
File System Counters
FILE: Number of bytes read=62
FILE: Number of bytes written=328809
FILE: Number of large read operations=0
FILE: Number of read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=305
HDFS: Number of bytes written=24
HDFS: Number of large read operations=0
HDFS: Number of read operations=9
HDFS: Number of write operations=2
Job Counters
Data-local map tasks=2
Launched map tasks=2
Launched reduce tasks=1
Total megabyte-seconds taken by all map tasks=139382784
Total megabyte-seconds taken by all reduce tasks=23851008
Total time spent by all map tasks (ms)=136116
Total time spent by all maps in occupied slots (ms)=136116
Total time spent by all reduce tasks (ms)=23292
Total time spent by all reduces in occupied slots (ms)=23292
Total vcore-seconds taken by all map tasks=136116
Total vcore-seconds taken by all reduce tasks=23292
Map-Reduce Framework
CPU time spent (ms)=5380
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=4791
Input split bytes=284
Map input records=3
Map output bytes=42
Map output materialized bytes=68
Map output records=7
Merged Map outputs=2
Physical memory (bytes) snapshot=430739456
Reduce input groups=4
Reduce input records=7
Reduce output records=4
Reduce shuffle bytes=68
Shuffled Maps =2
Spilled Records=14
Total committed heap usage (bytes)=256843776
Virtual memory (bytes) snapshot=3072700416
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
job output is in hdfs:///user/hadoop/output
Removing HDFS temp directory hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.055450.699698...
Removing temp directory /tmp/test3.hadoop.20210424.055450.699698...
[hadoop@localhost workspace]$ hadoop dfs -cat output/*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
"a" 2
"b" 2
"c" 1
"d" 2
可以看到程序成功运行,程序将本机当前目录下text文件自动上传到hadoop服务器的临时目录中,并将输出结果保存在hadoop服务器的/user/hadoop/output目录下,并最终将中间过程产生的临时文件进行删除。
此外我们还可以直接将hadoop上的文件作为输入,如下:
(ps:执行前需要先将之前的output目录删除,否则报错,指令为:hadoop dfs -rm -R output)
[hadoop@localhost workspace]$ ./test3.py -r hadoop hdfs:///user/hadoop/text -o output
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop/bin...
Found hadoop binary: /usr/local/hadoop/bin/hadoop
Using Hadoop version 2.6.0
Looking for Hadoop streaming jar in /usr/local/hadoop...
Found Hadoop streaming jar: /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar
Creating temp directory /tmp/test3.hadoop.20210424.060646.551956
uploading working dir files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.060646.551956/files/wd...
Copying other local files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.060646.551956/files/
Running step 1 of 1...
packageJobJar: [/tmp/hadoop-unjar2796801622331185171/] [] /tmp/streamjob6895096939037159635.jar tmpDir=null
Connecting to ResourceManager at /0.0.0.0:8032
Connecting to ResourceManager at /0.0.0.0:8032
Total input paths to process : 1
number of splits:2
Submitting tokens for job: job_1619235102568_0006
Submitted application application_1619235102568_0006
The url to track the job: http://localhost:8088/proxy/application_1619235102568_0006/
Running job: job_1619235102568_0006
Job job_1619235102568_0006 running in uber mode : false
map 0% reduce 0%
map 33% reduce 0%
map 100% reduce 0%
map 100% reduce 100%
Job job_1619235102568_0006 completed successfully
Output directory: hdfs:///user/hadoop/output
Counters: 49
File Input Format Counters
Bytes Read=21
File Output Format Counters
Bytes Written=24
File System Counters
FILE: Number of bytes read=62
FILE: Number of bytes written=328656
FILE: Number of large read operations=0
FILE: Number of read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=201
HDFS: Number of bytes written=24
HDFS: Number of large read operations=0
HDFS: Number of read operations=9
HDFS: Number of write operations=2
Job Counters
Data-local map tasks=2
Launched map tasks=2
Launched reduce tasks=1
Total megabyte-seconds taken by all map tasks=98526208
Total megabyte-seconds taken by all reduce tasks=19869696
Total time spent by all map tasks (ms)=96217
Total time spent by all maps in occupied slots (ms)=96217
Total time spent by all reduce tasks (ms)=19404
Total time spent by all reduces in occupied slots (ms)=19404
Total vcore-seconds taken by all map tasks=96217
Total vcore-seconds taken by all reduce tasks=19404
Map-Reduce Framework
CPU time spent (ms)=4590
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=880
Input split bytes=180
Map input records=3
Map output bytes=42
Map output materialized bytes=68
Map output records=7
Merged Map outputs=2
Physical memory (bytes) snapshot=489639936
Reduce input groups=4
Reduce input records=7
Reduce output records=4
Reduce shuffle bytes=68
Shuffled Maps =2
Spilled Records=14
Total committed heap usage (bytes)=256843776
Virtual memory (bytes) snapshot=3076481024
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
job output is in hdfs:///user/hadoop/output
Removing HDFS temp directory hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.060646.551956...
Removing temp directory /tmp/test3.hadoop.20210424.060646.551956...
[hadoop@localhost workspace]$ hadoop dfs -cat output/*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
"a" 2
"b" 2
"c" 1
"d" 2
其中,命令中的hdfs:///user/hadoop/text指的是存储在hadoop服务器中位于/user/hadoop/目录下的text文件,你可以通过指令hadoop dfs -put ./text text将刚才我们所用到的当前目录下的text文件移动到hadoop服务器上的/user/hadoop/目录下。
此外需要注意的是,如果你登陆系统的用户并不是hadoop,那么上传的位置会有所改变,将自动变为/user/{您的用户名},因此上传前要确保在hadoop服务器上您已预先创建了相应的目录。
以上便是使用mrjob的基本操作,如需转载请注明出处,感谢。