key1 val1
key1 val2
key1 val3
key1 val4
key2 val1
key2 val3
key2 val5
key3 val5
key4 val4
key1_key2 1 # obtained from val1
key1_key2 1 # obtained from val3
key1_key4 1 # obtained from val4
key2_key3 1 # obtained from val5
val_dic = dict()
def print_dic(dic):
for val, key_array in dic.iteritems():
key_pair= ""
i=0
j=1
for i in range(len(key_array)-1):
for j in range(i+1,len(key_array)):
key_pair = key_array[i]+"_"+key_array[j]
print "{0}\t{1}".format(key_pair,"1")
for line in sys.stdin:
key, val = line.strip().split("\t")
if (not val in val_dic):
val_dic[val]=[]
val_dic[val].append(key)
print_dic(val_dic)
减速器正在计算所有相同的值:
current_pair = None
current_count = 0
for line in sys.stdin:
key_pair, count = line.strip().split("\t")
count = int(count)
if current_pair == key_pair:
current_count += count
else:
print "{0}\t{1}".format(current_pair,str(current_count))
current_pair = key_pair
current_count = count
print "{0}\t{1}".format(current_pair,str(current_count))
然而,当我在hadoop上运行一个更大的数据集时,似乎丢失了一半的结果。当我在本地机器上使用cat input mapper.py sort reducer.py>out-local测试它时,如果输入合理地很小,它工作得很好,但是在更大的数据集上(例如1M个条目),本地输出文件的条目几乎是在Hadoop上运行mapreduce作业的两倍。代码有错误吗?还是我漏掉了什么?任何帮助都是非常感谢的。
映射器生成它所看到的给定值的所有键的成对组合。
map-reduce的模型是映射器以令人尴尬的并行方式处理输入的每个记录,并发出键值对。它将记录映射到键值对。实际上,一个典型的原生(Java)映射器一次只能“看到”一条记录,因此永远不能像流映射器那样操作。
在streaming api中,您可以稍微“欺骗”一点,并一次性处理整个输入拆分--对于给您的文件的整个块,您可以处理该块中的所有输入记录,因此除了映射单个键值对之外,还可以执行一些其他操作。但一般来说,您无法访问整个输入;输入被分成几个部分,映射器获取每个部分。如果一个分裂包含了整个输入,那么在映射阶段就没有任何并行性,也就没有理由使用hadoop了。
$ cat input1 input2 | ./map.py | sort | ./reduce.py
None 0
key1_key2 2
key1_key4 1
key2_key3 1
$ cat input1 | ./map.py > output1
$ cat input2 | ./map.py > output2
$ cat output1 output2 | sort | ./reduce.py
None 0
key2_key3 1
您将需要重构内容,以便映射只需发出(值,键)对,然后还原器将所有键集合在一起以获得给定的值,然后生成所有带有计数的键对。然后另一个map-reduce步骤将不得不进行计数。
因此,您将有一个map1.py和一个reduce1.py:
#!/usr/bin/env python
# map1.py
import sys
for line in sys.stdin:
line = line.strip()
key, val = line.strip().split("\t")
print val, "\t", key
#!/usr/bin/env python
# reduce1.py
import sys
def emit_keypairs(keylist):
for i in range(len(keylist)-1):
for j in range(i+1,len(keylist)):
key_pair = keylist[i]+"_"+keylist[j]
print "{0}\t{1}".format(key_pair,"1")
current_word = None
current_keylist = []
for line in sys.stdin:
line = line.strip()
word, key = line.split('\t', 1)
if current_word == word:
current_keylist.append(key)
else:
if current_word:
emit_keypairs(current_keylist)
current_word = word
current_keylist = [key]
# do not forget to output the last word if needed!
if current_word == word:
emit_keypairs(current_keylist)
运行这些,然后基本上只对输出运行一个wordcount。这对于拆分输入文件是健壮的:
$ cat input1 | ./map1.py > map1
$ cat input2 | ./map1.py > map2
$ cat map1 map2 | sort | ./reduce1.py
key1_key2 1
key1_key2 1
key1_key4 1
key2_key3 1
我正在尝试使用OMP运行矩阵乘法程序。我在串行和并行版本中得到了不同的输出。我正在尝试使用一个3*3矩阵进行测试。 我的并行代码是: 对于串行版本,我刚刚注释了该行: 我的并行版本的输出是: 起始矩阵 具有 12 个线程的多个示例初始化矩阵...线程 0 起始矩阵乘...线程 8 起始矩阵乘法...线程 6 起始矩阵乘...线程 9 起始矩阵乘法...线程 5 起始矩阵乘法...线程 1 起始矩阵
问题内容: 我使用运行时exec()方法在Java中创建一个子进程。但是,由于子流程是一个交互式程序,因此我需要在需要时向其提供输入。另外,我需要显示子流程的输出。如何以最简单的方式做到这一点? 我正在使用StreamGobbler通过process.getInputStream()显示程序输出。但是,我不知道如何识别程序何时等待输入以及何时使用proc.getOutputStream提供输入。我
本小节将会介绍基本输入输出的 Java 标准类,通过本小节的学习,你将了解到什么是输入和输入,什么是流;输入输出流的应用场景,File类的使用,什么是文件,Java 提供的输入输出流相关 API 等内容。 1. 什么是输入和输出(I / O) 1.1 基本概念 输入/输出这个概念,对于计算机相关专业的同学并不陌生,在计算中,输入/输出(Input / Output,缩写为 I / O)是信息处理系
问题内容: 我正在写一个FTP下载器。代码的一部分是这样的: 我正在调用函数进程来处理回调: 输出是这样的: 但我希望它打印此行,下次重新打印/刷新它,因此它只会显示一次,并且我会看到下载进度。 怎么做到呢? 问题答案: 这是Python 3.x的代码: 该关键字是什么做的工作在这里- 在默认情况下,在一个换行符(结束)字符,但可以使用不同的字符串替换。在这种情况下,用回车符结束该行,而是将光标返
我试图通过在Local上连接两个数据流来运行Flink上的基本连接。源流的数据类型是相同的(Tuple4(String,String,Long,Long))。在多次运行下面提到的函数后,我随机收到了两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到了相同的调试日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。 源函数和其他定义都来自本教程。还从Flink官方文件
我有一个棘手的问题要解决。我使用系统。方法调用之前和之后的currentTimeMillis(),因为我必须测量这两条语句之间经过的时间。 我担心的是每次运行程序都会得到不同的结果。 我明白了(这很完美): 几秒钟后,我再次运行程序,我得到(这是错误的): 我说这个输出是错误的,因为每辆车的等待时间不应该少于100毫秒。 什么实际影响基于currentTimeMillis函数的时间计算? 为什么我