当前位置: 首页 > 知识库问答 >
问题:

通过Python在Hadoop流中使用文件

陆英毅
2023-03-14

我对Hadoop和MapReduce完全陌生,正在努力完成它。我正在尝试用python开发一个mapduce应用程序,在这个应用程序中,我使用来自2. CSV文件的数据。我只是在mapper中读取这两个文件,然后将文件中的键值对打印到sys.stdout

当我在一台机器上使用它时,该程序运行正常,但是使用Hadoop流式处理时,我遇到了错误。我认为我在Hadoop上的映射器中读取文件时犯了一些错误。请帮助我编写代码,并告诉我如何在Hadoop流中使用文件处理。mapper.py 代码如下。(您可以从注释中理解代码):

#!/usr/bin/env python
import sys
from numpy import genfromtxt

def read_input(inVal):
    for line in inVal:
        # split the line into words
        yield line.strip()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    labels=[]
    data=[]    
    incoming = read_input(sys.stdin)
    for vals in incoming:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited;
        if len(vals) > 10:
            data.append(vals)
        else:
            labels.append(vals)

    for i in range(0,len(labels)):
        print "%s%s%s\n" % (labels[i], separator, data[i])


if __name__ == "__main__":
    main()

有 60000 条记录从两个.csv文件输入到此映射器,如下所示(在单台计算机上,而不是在 hadoop 群集上):

cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py

共有2个答案

吴西岭
2023-03-14

你没有发布你的错误。在流式传输中,您需要传递-file参数或-input,以便文件可以随流式传输作业一起上传,或者知道在hdfs上的何处可以找到它。

楚泳
2023-03-14

在寻找解决方案3天后,我终于解决了这个问题。

问题出在新版本的Hadoop上(在我的例子中是2.2.0)。当从文件中读取值时,映射器代码在某个时候给出了非零的退出代码(可能是因为它一次读取了一个巨大的值列表(784))。Hadoop 2.2.0中有一个设置,它告诉Hadoop系统给出一个一般错误(代码1的子进程失败)。默认情况下,此设置设置为True。我只需要将此属性的值设置为False,它使我的代码运行没有任何错误。

设置为:流非零、退出失败。只需在流式传输时将其设置为 false 即可。因此,流式处理命令将有点像:

**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**

希望它能帮助某人,并节省3天... ;)

 类似资料:
  • 我在stackoverflow上看到过一种使用zip文件存储引用的python模块来执行hadoop流作业的技术。 在执行作业的映射阶段,我遇到了一些错误。我相当确定它与zip'd模块加载有关。 为了调试脚本,我使用命令行管道通过sys.stdin/sys.stdout运行我的数据集,进入我的映射器和缩减器,如下所示: 输入数据文件的头。txt|./映射器。py |排序-k1,1|./reduce

  • 当我学习mapreduce时,其中一个关键组件是组合器。这是映射器和还原器之间的一个步骤,基本上在映射阶段结束时运行还原器,以减少映射器输出的数据行数。随着我需要处理的数据的大小增加(以万亿字节的规模),减少步骤变得非常慢。我和我的一个朋友谈过,他说这也是他的经历,他没有使用组合器,而是使用哈希函数来划分他的reduce键,这减少了reduce步骤中每个键的值的数量。我试过了,效果很好。有没有其他

  • 我想实现一个SSIS作业,该作业能够下载位于远程Hadoop集群上的大型CSV文件。当然,在Hadoop系统上只有一个常规FTP服务器不会公开HDFS文件,因为它使用本地文件系统。 我想知道是否有一个FTP服务器实现位于HDFS之上。我更喜欢这种方法,而不是必须将文件从HDFS复制到本地FS,然后让FTP服务器提供服务,因为我需要分配更多的存储空间。

  • 我不确定“乱码”是我的问题的正确词。我的问题是这样的。我使用哈道普流-0.20.2-cdh3u6.jar和蟒蛇来编写映射还原。该命令如下所示: 概率,rule_dict,判决是我本地存储中的目录,其中的文件包含中文单词。当我使用python读取这些文件时,我会出现错误的“乱码”。文件的一小部分: 从这些文件中读取的内容是 有办法解决我的问题吗? 我已经将相同的python脚本和相同的目录上传到映射

  • 问题:我正在尝试创建一个云数据流管道,该管道使用Python SDK从Google云存储读取Avro文件,进行一些处理并在Google云存储上写回Avro文件。在查看ApacheBeam网站上提供的一些示例后,我尝试运行以下代码。我使用了和函数。我试图实现的是读取一个Avro文件并使用Dataflow写入同一个Avro文件,但它给了我以下警告,并且没有输出Avro文件。 警告/错误: 代码: 编辑