当前位置: 首页 > 知识库问答 >
问题:

Hadoop 1个输入文件= 1个输出文件,仅映射

堵毅然
2023-03-14

我是Hadoop的新手,但这是我上个月的一个学习项目。

为了使这一点足够模糊,以便对其他人有用,让我先抛出基本目标……假设:

    < li >显然,您有一个大型数据集,包含数百万个基本ASCII文本文件。 < ul > < li >每个文件都是一个“记录”
  • e. g. /user/hduser/data/customer1/YYYY-MM-DD, /user/hduser/data/customer2/YYYY-MM-DD
    < li >例如/user/HD user/out/customer 1/YYYY-MM-DD、/user/HD user/out/customer 2/YYYY-MM-DD

我看过多个线程:

    < li >多输出路径java hadoop mapreduce < li >新api中的MultipleTextOutputFormat替代项 < li >在Hadoop mapreduce中分离输出文件 < li >推测性任务执行-尝试解决-m-part####问题

还有更多..我也一直在读汤姆·怀特的Hadoop书。我一直急切地想学这个。我经常在新API和旧API之间切换,这增加了学习的困惑。

许多人都指出了多个输出(或旧的API版本),但我似乎无法产生我想要的输出 - 例如,多个输出似乎不接受“/”来创建indic() 中的目录结构

需要采取哪些步骤来创建具有所需输出结构的文件?目前我有一个WholeFileInputFormat类,以及相关的RecordReader,它具有(NullWritable K,ByteWritable V)对(如果需要可以更改)

我的地图设置:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    private Text filenameKey;
    private MultipleOutputs<NullWritable, Text> mos;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();
        filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/
        mos = new MultipleOutputs(context);
    }
}

还有一个cleanup()函数调用mos。close(),而map()函数目前是未知函数(这里我需要帮助)

这些信息足够给新手指出答案的方向了吗?我接下来的想法是在每个map()任务中创建一个MultipleOutputs()对象,每个对象都有一个新的baseoutput字符串,但是我不确定这样做是否有效,或者是否是正确的操作。

建议将不胜感激,程序中的任何东西都可以在这一点上改变,除了输入——我只是在尝试学习框架——但我希望尽可能接近这个结果(稍后我可能会考虑将记录合并为更大的文件,但每个记录已经是20MB,我希望在我无法在记事本中阅读之前确保它工作正常

编辑:这个问题可以通过修改/扩展文本输出html" target="_blank">格式.class来解决吗?似乎它可能有一些可行的方法,但我不确定我需要覆盖哪些方法......

共有1个答案

白学
2023-03-14

如果关闭推理执行,则没有什么可以阻止您在映射器中手动创建输出文件夹结构/文件,并将记录写入它们(忽略输出上下文/收集器)

例如,扩展代码段(setup 方法),您可以执行如下操作(这基本上是多个输出正在执行的操作,但假设推理执行已关闭以避免两个映射任务尝试写入同一输出文件时的文件冲突):

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultiOutputsMapper extends
        Mapper<LongWritable, Text, NullWritable, NullWritable> {
    protected String filenameKey;
    private RecordWriter<Text, Text> writer;
    private Text outputValue;
    private Text outputKey;

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // operate on the input record
        // ...

        // write to output file using writer rather than context
        writer.write(outputKey, outputValue);
    }

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();

        // extract parent folder and filename
        filenameKey = path.getParent().getName() + "/" + path.getName();

        // base output folder
        final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
        // output file name
        final Path outputFilePath = new Path(baseOutputPath, filenameKey);

        // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder
        TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() {
            @Override
            public Path getDefaultWorkFile(TaskAttemptContext context,
                    String extension) throws IOException {
                return outputFilePath;
            }
        };

        // create a record writer that will write to the desired output subfolder
        writer = tof.getRecordWriter(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        writer.close(context);
    }
}

需要考虑的几点:

  • 客户x/yyyy-MM-dd路径文件还是文件文件夹(如果文件夹为文件,那么您需要相应地修改 - 此实现假设每个日期有一个文件,文件名为yyyy-MM-dd)
  • 您可能希望查看惰性输出格式,以防止创建空的输出映射文件
 类似资料:
  • 问题内容: 我是Hadoop的新手,正在尝试弄清楚它是如何工作的。至于练习,我应该实现类似于WordCount- Example的东西。任务是读入多个文件,执行WordCount并为每个输入文件编写一个输出文件。Hadoop使用组合器,将map- part的输出改编为reducer的输入,然后写入一个输出文件(我猜每个正在运行的实例)。我想知道是否可以为每个输入文件写入一个输出文件(因此保留inp

  • 目录表 文件 使用文件 储存器 储存与取储存 概括 在很多时候,你会想要让你的程序与用户(可能是你自己)交互。你会从用户那里得到输入,然后打印一些结果。我们可以分别使用raw_input和print语句来完成这些功能。对于输出,你也可以使用多种多样的str(字符串)类。例如,你能够使用rjust方法来得到一个按一定宽度右对齐的字符串。利用help(str)获得更多详情。 另一个常用的输入/输出类型

  • 文件 std::fs::File 本身实现了 Read 和 Write trait,所以文件的输入输出非常简单,只要得到一个 File 类型实例就可以调用读写接口进行文件输入与输出操作了。而要得到 File 就得让操作系统打开(open)或新建(create)一个文件。还是拿例子来说明 use std::io; use std::io::prelude::*; use std::fs::File;

  • 所以我正在和Yii2一起工作,对它来说是相当新的。我正在使用Kartik文件上传,并试图将代码转换为多个文件。但它只保存了第一个文件。 我已经删除了验证,因为这也是失败的,但一旦我知道所有其他的都在工作,我会重新添加。 型号: 控制器: 查看:

  • 我正在学习Hadoop,并尝试执行我的Mapduce程序。所有Map任务和Reduce er任务都完成得很好,但Reducer将Mapper Output写入Output文件。这意味着根本没有调用Reduce函数。我的示例输入如下所示 预期输出如下所示 以下是我的计划。 这里问了同样的问题,我在reduce函数中使用了Iterable值作为该线程中建议的答案。但这并不能解决问题。我不能在那里发表评

  • create 静态方法以只写模式(write-only mode)打开一个文件。若文件已经存在,则旧内容将被销毁。否则,将创建一个新文件。 static LOREM_IPSUM: &'static str = "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut