当前位置: 首页 > 工具软件 > mrjob > 使用案例 >

【MRJob】使用python3完成centos中的hadoop mapreduce操作

郎永福
2023-12-01

MRJob

mrjob是python的mapreduce编程接口,可以很方便的完成单层或多层map-reduce任务。现利用mrjob完成最简单的字频统计任务。

1.数据与脚本

我们要统计的文本如下:

a b c
a b
d d

将其保存并命名为text文件

[hadoop@localhost workspace]$ cat text
a b c
a b
d d

相应的py程序如下

[hadoop@localhost workspace]$ cat test3.py
#!/usr/bin/python

from mrjob.job import MRJob

class COUNT(MRJob):
	def mapper(self,_,line):
		words = line.split()
		for word in words:
			yield word,1
	
	def reducer(self,key,value):
		yield key,sum(value)


if __name__ == "__main__":
	COUNT.run()

以上脚本里,mrjob会把读取文件的每一行传给首个mapper,因为在这个脚本里只定义了一个mapper函数,因此所处理的数据的每行将传到mapper的line参数中。
此外,开头的#!/usr/bin/python是注明脚本的默认运行程序,倘若运行py脚本时指明程序可不添加。

2.提升py文件运行权限

首先修改py文件的权限,防止因权限不足运行失败

[hadoop@localhost workspace]$ chmod +x ./test3.py 

3.本地测试

随后执行如下命令运行我们的Py脚本,完成Mapreduce的词频统计,我们首先不使用hadoop,仅在本地进行测试

[hadoop@localhost workspace]$ ./test3.py -r local text -o out
No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/test3.hadoop.20210424.053521.990352
Running step 1 of 1...
job output is in out
Removing temp directory /tmp/test3.hadoop.20210424.053521.990352...
[hadoop@localhost workspace]$ cat out/*
"a"	2
"b"	2
"c"	1
"d"	2

可以看到程序被成功执行。
在指令./test3.py -r local text -o out中,
./text3.py是我们的脚本文件,
-r local指明mrjob运行方式是本地,
text是我们的输入的文件名,这里是当前目录下的text文件,
-o out是指明将结果输出在哪个文件下,这里是指在当前目录下新创建out目录保存输出结果(请保证程序执行前-o所指定的目录未被创建,因为该目录将由程序运行时自动创建生成,提前创建可能导致运行过程中产生冲突而会导致报错)

若您在您的py文件开头未添加#!/usr/bin/python这一行,您可以通过如下命令运行您的程序(即在命令前添加python或python3)

[hadoop@localhost workspace]$ python ./test3.py -r local text -o out
No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/test3.hadoop.20210424.053521.990352
Running step 1 of 1...
job output is in out
Removing temp directory /tmp/test3.hadoop.20210424.053521.990352...
[hadoop@localhost workspace]$ cat out/*
"a"	2
"b"	2
"c"	1
"d"	2

4.hadoop运行

通过以下指令将程序在hadoop上运行

[hadoop@localhost workspace]$ ./test3.py -r hadoop text -o output
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop/bin...
Found hadoop binary: /usr/local/hadoop/bin/hadoop
Using Hadoop version 2.6.0
Looking for Hadoop streaming jar in /usr/local/hadoop...
Found Hadoop streaming jar: /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar
Creating temp directory /tmp/test3.hadoop.20210424.055450.699698
uploading working dir files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.055450.699698/files/wd...
Copying other local files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.055450.699698/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar6276312191919821230/] [] /tmp/streamjob825727840311838804.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input paths to process : 1
  number of splits:2
  Submitting tokens for job: job_1619235102568_0005
  Submitted application application_1619235102568_0005
  The url to track the job: http://localhost:8088/proxy/application_1619235102568_0005/
  Running job: job_1619235102568_0005
  Job job_1619235102568_0005 running in uber mode : false
   map 0% reduce 0%
   map 67% reduce 0%
   map 100% reduce 0%
   map 100% reduce 100%
  Job job_1619235102568_0005 completed successfully
  Output directory: hdfs:///user/hadoop/output
Counters: 49
	File Input Format Counters 
		Bytes Read=21
	File Output Format Counters 
		Bytes Written=24
	File System Counters
		FILE: Number of bytes read=62
		FILE: Number of bytes written=328809
		FILE: Number of large read operations=0
		FILE: Number of read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=305
		HDFS: Number of bytes written=24
		HDFS: Number of large read operations=0
		HDFS: Number of read operations=9
		HDFS: Number of write operations=2
	Job Counters 
		Data-local map tasks=2
		Launched map tasks=2
		Launched reduce tasks=1
		Total megabyte-seconds taken by all map tasks=139382784
		Total megabyte-seconds taken by all reduce tasks=23851008
		Total time spent by all map tasks (ms)=136116
		Total time spent by all maps in occupied slots (ms)=136116
		Total time spent by all reduce tasks (ms)=23292
		Total time spent by all reduces in occupied slots (ms)=23292
		Total vcore-seconds taken by all map tasks=136116
		Total vcore-seconds taken by all reduce tasks=23292
	Map-Reduce Framework
		CPU time spent (ms)=5380
		Combine input records=0
		Combine output records=0
		Failed Shuffles=0
		GC time elapsed (ms)=4791
		Input split bytes=284
		Map input records=3
		Map output bytes=42
		Map output materialized bytes=68
		Map output records=7
		Merged Map outputs=2
		Physical memory (bytes) snapshot=430739456
		Reduce input groups=4
		Reduce input records=7
		Reduce output records=4
		Reduce shuffle bytes=68
		Shuffled Maps =2
		Spilled Records=14
		Total committed heap usage (bytes)=256843776
		Virtual memory (bytes) snapshot=3072700416
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
job output is in hdfs:///user/hadoop/output
Removing HDFS temp directory hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.055450.699698...
Removing temp directory /tmp/test3.hadoop.20210424.055450.699698...
[hadoop@localhost workspace]$ hadoop dfs -cat output/*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

"a"	2
"b"	2
"c"	1
"d"	2

可以看到程序成功运行,程序将本机当前目录下text文件自动上传到hadoop服务器的临时目录中,并将输出结果保存在hadoop服务器的/user/hadoop/output目录下,并最终将中间过程产生的临时文件进行删除。

此外我们还可以直接将hadoop上的文件作为输入,如下:
(ps:执行前需要先将之前的output目录删除,否则报错,指令为:hadoop dfs -rm -R output)

[hadoop@localhost workspace]$ ./test3.py -r hadoop hdfs:///user/hadoop/text -o output
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop/bin...
Found hadoop binary: /usr/local/hadoop/bin/hadoop
Using Hadoop version 2.6.0
Looking for Hadoop streaming jar in /usr/local/hadoop...
Found Hadoop streaming jar: /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar
Creating temp directory /tmp/test3.hadoop.20210424.060646.551956
uploading working dir files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.060646.551956/files/wd...
Copying other local files to hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.060646.551956/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar2796801622331185171/] [] /tmp/streamjob6895096939037159635.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input paths to process : 1
  number of splits:2
  Submitting tokens for job: job_1619235102568_0006
  Submitted application application_1619235102568_0006
  The url to track the job: http://localhost:8088/proxy/application_1619235102568_0006/
  Running job: job_1619235102568_0006
  Job job_1619235102568_0006 running in uber mode : false
   map 0% reduce 0%
   map 33% reduce 0%
   map 100% reduce 0%
   map 100% reduce 100%
  Job job_1619235102568_0006 completed successfully
  Output directory: hdfs:///user/hadoop/output
Counters: 49
	File Input Format Counters 
		Bytes Read=21
	File Output Format Counters 
		Bytes Written=24
	File System Counters
		FILE: Number of bytes read=62
		FILE: Number of bytes written=328656
		FILE: Number of large read operations=0
		FILE: Number of read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=201
		HDFS: Number of bytes written=24
		HDFS: Number of large read operations=0
		HDFS: Number of read operations=9
		HDFS: Number of write operations=2
	Job Counters 
		Data-local map tasks=2
		Launched map tasks=2
		Launched reduce tasks=1
		Total megabyte-seconds taken by all map tasks=98526208
		Total megabyte-seconds taken by all reduce tasks=19869696
		Total time spent by all map tasks (ms)=96217
		Total time spent by all maps in occupied slots (ms)=96217
		Total time spent by all reduce tasks (ms)=19404
		Total time spent by all reduces in occupied slots (ms)=19404
		Total vcore-seconds taken by all map tasks=96217
		Total vcore-seconds taken by all reduce tasks=19404
	Map-Reduce Framework
		CPU time spent (ms)=4590
		Combine input records=0
		Combine output records=0
		Failed Shuffles=0
		GC time elapsed (ms)=880
		Input split bytes=180
		Map input records=3
		Map output bytes=42
		Map output materialized bytes=68
		Map output records=7
		Merged Map outputs=2
		Physical memory (bytes) snapshot=489639936
		Reduce input groups=4
		Reduce input records=7
		Reduce output records=4
		Reduce shuffle bytes=68
		Shuffled Maps =2
		Spilled Records=14
		Total committed heap usage (bytes)=256843776
		Virtual memory (bytes) snapshot=3076481024
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
job output is in hdfs:///user/hadoop/output
Removing HDFS temp directory hdfs:///user/hadoop/tmp/mrjob/test3.hadoop.20210424.060646.551956...
Removing temp directory /tmp/test3.hadoop.20210424.060646.551956...
[hadoop@localhost workspace]$ hadoop dfs -cat output/*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

"a"	2
"b"	2
"c"	1
"d"	2

其中,命令中的hdfs:///user/hadoop/text指的是存储在hadoop服务器中位于/user/hadoop/目录下的text文件,你可以通过指令hadoop dfs -put ./text text将刚才我们所用到的当前目录下的text文件移动到hadoop服务器上的/user/hadoop/目录下。
此外需要注意的是,如果你登陆系统的用户并不是hadoop,那么上传的位置会有所改变,将自动变为/user/{您的用户名},因此上传前要确保在hadoop服务器上您已预先创建了相应的目录。

以上便是使用mrjob的基本操作,如需转载请注明出处,感谢。

 类似资料: