当前位置: 首页 > 知识库问答 >
问题:

Hadoop MapReduce流输出不同于本地运行MapReduce的输出

江天宇
2023-03-14
key1        val1
key1        val2
key1        val3
key1        val4
key2        val1
key2        val3
key2        val5
key3        val5
key4        val4
key1_key2   1   # obtained from val1
key1_key2   1   # obtained from val3
key1_key4   1   # obtained from val4
key2_key3   1   # obtained from val5
val_dic = dict()
def print_dic(dic):
    for val, key_array in dic.iteritems():
        key_pair= ""
        i=0
        j=1
        for i in range(len(key_array)-1):
            for j in range(i+1,len(key_array)):
                key_pair = key_array[i]+"_"+key_array[j]
                print "{0}\t{1}".format(key_pair,"1")
for line in sys.stdin:  
    key, val = line.strip().split("\t")
    if (not val in val_dic):
        val_dic[val]=[]
    val_dic[val].append(key) 
print_dic(val_dic)

减速器正在计算所有相同的值:

   current_pair = None
    current_count = 0 
    for line in sys.stdin:
    key_pair, count = line.strip().split("\t")
    count = int(count)
        if current_pair == key_pair:
            current_count += count
        else:
            print "{0}\t{1}".format(current_pair,str(current_count))
            current_pair = key_pair
            current_count = count
    print "{0}\t{1}".format(current_pair,str(current_count))

然而,当我在hadoop上运行一个更大的数据集时,似乎丢失了一半的结果。当我在本地机器上使用cat input mapper.py sort reducer.py>out-local测试它时,如果输入合理地很小,它工作得很好,但是在更大的数据集上(例如1M个条目),本地输出文件的条目几乎是在Hadoop上运行mapreduce作业的两倍。代码有错误吗?还是我漏掉了什么?任何帮助都是非常感谢的。

共有1个答案

谷梁宁
2023-03-14

映射器生成它所看到的给定值的所有键的成对组合。

map-reduce的模型是映射器以令人尴尬的并行方式处理输入的每个记录,并发出键值对。它将记录映射到键值对。实际上,一个典型的原生(Java)映射器一次只能“看到”一条记录,因此永远不能像流映射器那样操作。

在streaming api中,您可以稍微“欺骗”一点,并一次性处理整个输入拆分--对于给您的文件的整个块,您可以处理该块中的所有输入记录,因此除了映射单个键值对之外,还可以执行一些其他操作。但一般来说,您无法访问整个输入;输入被分成几个部分,映射器获取每个部分。如果一个分裂包含了整个输入,那么在映射阶段就没有任何并行性,也就没有理由使用hadoop了。

$ cat input1 input2 | ./map.py | sort | ./reduce.py 
None    0
key1_key2   2
key1_key4   1
key2_key3   1
$ cat input1 | ./map.py > output1
$ cat input2 | ./map.py > output2
$ cat output1 output2 | sort | ./reduce.py 
None    0
key2_key3   1

您将需要重构内容,以便映射只需发出(值,键)对,然后还原器将所有键集合在一起以获得给定的值,然后生成所有带有计数的键对。然后另一个map-reduce步骤将不得不进行计数。

因此,您将有一个map1.py和一个reduce1.py:

#!/usr/bin/env python 
# map1.py

import sys

for line in sys.stdin:  
    line = line.strip()
    key, val = line.strip().split("\t")
    print val, "\t", key

#!/usr/bin/env python
# reduce1.py

import sys

def emit_keypairs(keylist):
    for i in range(len(keylist)-1):
        for j in range(i+1,len(keylist)):
            key_pair = keylist[i]+"_"+keylist[j]
            print "{0}\t{1}".format(key_pair,"1")

current_word = None
current_keylist = []

for line in sys.stdin:
    line = line.strip()
    word, key = line.split('\t', 1)

    if current_word == word:
        current_keylist.append(key)
    else:
        if current_word:
            emit_keypairs(current_keylist)
        current_word = word
        current_keylist = [key]

# do not forget to output the last word if needed!
if current_word == word:
    emit_keypairs(current_keylist)

运行这些,然后基本上只对输出运行一个wordcount。这对于拆分输入文件是健壮的:

$ cat input1 | ./map1.py > map1
$ cat input2 | ./map1.py > map2
$ cat map1 map2 | sort | ./reduce1.py 

key1_key2   1
key1_key2   1
key1_key4   1
key2_key3   1
 类似资料:
  • 我正在尝试使用OMP运行矩阵乘法程序。我在串行和并行版本中得到了不同的输出。我正在尝试使用一个3*3矩阵进行测试。 我的并行代码是: 对于串行版本,我刚刚注释了该行: 我的并行版本的输出是: 起始矩阵 具有 12 个线程的多个示例初始化矩阵...线程 0 起始矩阵乘...线程 8 起始矩阵乘法...线程 6 起始矩阵乘...线程 9 起始矩阵乘法...线程 5 起始矩阵乘法...线程 1 起始矩阵

  • 本小节将会介绍基本输入输出的 Java 标准类,通过本小节的学习,你将了解到什么是输入和输入,什么是流;输入输出流的应用场景,File类的使用,什么是文件,Java 提供的输入输出流相关 API 等内容。 1. 什么是输入和输出(I / O) 1.1 基本概念 输入/输出这个概念,对于计算机相关专业的同学并不陌生,在计算中,输入/输出(Input / Output,缩写为 I / O)是信息处理系

  • 问题内容: 我使用运行时exec()方法在Java中创建一个子进程。但是,由于子流程是一个交互式程序,因此我需要在需要时向其提供输入。另外,我需要显示子流程的输出。如何以最简单的方式做到这一点? 我正在使用StreamGobbler通过process.getInputStream()显示程序输出。但是,我不知道如何识别程序何时等待输入以及何时使用proc.getOutputStream提供输入。我

  • 问题内容: 我正在写一个FTP下载器。代码的一部分是这样的: 我正在调用函数进程来处理回调: 输出是这样的: 但我希望它打印此行,下次重新打印/刷新它,因此它只会显示一次,并且我会看到下载进度。 怎么做到呢? 问题答案: 这是Python 3.x的代码: 该关键字是什么做的工作在这里- 在默认情况下,在一个换行符(结束)字符,但可以使用不同的字符串替换。在这种情况下,用回车符结束该行,而是将光标返

  • 我有一个棘手的问题要解决。我使用系统。方法调用之前和之后的currentTimeMillis(),因为我必须测量这两条语句之间经过的时间。 我担心的是每次运行程序都会得到不同的结果。 我明白了(这很完美): 几秒钟后,我再次运行程序,我得到(这是错误的): 我说这个输出是错误的,因为每辆车的等待时间不应该少于100毫秒。 什么实际影响基于currentTimeMillis函数的时间计算? 为什么我

  • 我试图通过在Local上连接两个数据流来运行Flink上的基本连接。源流的数据类型是相同的(Tuple4(String,String,Long,Long))。在多次运行下面提到的函数后,我随机收到了两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到了相同的调试日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。 源函数和其他定义都来自本教程。还从Flink官方文件