当前位置: 首页 > 知识库问答 >
问题:

映射还原 - 化简器在一行中发出输出

倪举
2023-03-14

我有一个简单的MapReduce作业,它应该从文本文件中读取字典,然后逐行处理另一个大文件并计算逆文档矩阵。输出应该如下所示:

word-id1  docX:tfX docY:tfY
word-id2  docX:tfX docY:tfY etc...

但是,减速器的输出只在一个huuuge行中发出。我不明白为什么它应该为每个word-id(这是减速器的关键)发出新行。

映射器生成正确的输出(一对<code>单词id的值在单独的行中)。我在没有减速器的情况下进行了测试。reducer应该只为每个键在一行中附加与相同键对应的值。

你能看一下我的代码吗(特别是关于减速器和作业的配置),告诉我为什么减速器只发出一条巨大的线,而不是对应指定键的多条线?我花了很多小时调试这个,无法绕开它。

public class Indexer extends Configured implements Tool {

    /*
     * Vocabulary: key = term, value = index
     */
    private static Map<String, Integer> vocab = new HashMap<String, Integer>();

    public static void main(String[] arguments) throws Exception {
        System.exit(ToolRunner.run(new Indexer(), arguments));
    }

    public static class Comparator extends WritableComparator {
        protected Comparator() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // Here we use exploit the implementation of compareTo(...) in
            // Text.class.
            return -a.compareTo(b);
        }
    }

    public static class IndexerMapper extends
            Mapper<Object, Text, IntWritable, Text> {
        private Text result = new Text();

        // load vocab from distributed cache
        public void setup(Context context) throws IOException {
            Configuration conf = context.getConfiguration();
            FileSystem fs = FileSystem.get(conf);
            URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
            Path getPath = new Path(cacheFiles[0].getPath());

            BufferedReader bf = new BufferedReader(new InputStreamReader(
                    fs.open(getPath)));
            String line = null;
            while ((line = bf.readLine()) != null) {
                StringTokenizer st = new StringTokenizer(line, " \t");

                int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id
                String word = st.nextToken(); // second element is the term

                // save vocab
                vocab.put(word, index);

            }
        }

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            // init TF map
            Map<String, Integer> mapTF = new HashMap<String, Integer>();

            // parse input string
            StringTokenizer st = new StringTokenizer(value.toString(), " \t");

            // first element is doc index
            int index = Integer.parseInt(st.nextToken());

            // count term frequencies
            String word;
            while (st.hasMoreTokens()) {
                word = st.nextToken();

                // check if word is in the vocabulary
                if (vocab.containsKey(word)) {
                    if (mapTF.containsKey(word)) {
                        int count = mapTF.get(word);
                        mapTF.put(word, count + 1);
                    } else {
                        mapTF.put(word, 1);
                    }
                }
            }

            // compute TF-IDF
            int wordIndex;
            for (String term : mapTF.keySet()) {
                int tf = mapTF.get(term);

                if (vocab.containsKey(term)) {
                    wordIndex = vocab.get(term);

                    context.write(new IntWritable(wordIndex), new Text(index + ":" + tf));
                }

            }               
        }
    }

    public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text>
    {
        @Override
        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {

            StringBuilder sb = new StringBuilder(16000);

            for (Text value : values)
            {
                sb.append(value.toString() + " ");
            }


            context.write(key, new Text(sb.toString()));
        }
    }

    /**
     * This is where the MapReduce job is configured and being launched.
     */
    @Override
    public int run(String[] arguments) throws Exception {
        ArgumentParser parser = new ArgumentParser("TextPreprocessor");

        parser.addArgument("input", true, true, "specify input directory");
        parser.addArgument("output", true, true, "specify output directory");

        parser.parseAndCheck(arguments);

        Path inputPath = new Path(parser.getString("input"));
        Path outputDir = new Path(parser.getString("output"));

        // Create configuration.
        Configuration conf = getConf();

        // add distributed file with vocabulary
        DistributedCache
                .addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);

        // Create job.
        Job job = new Job(conf, "WordCount");
        job.setJarByClass(IndexerMapper.class);

        // Setup MapReduce.
        job.setMapperClass(IndexerMapper.class);
        //job.setCombinerClass(IndexerReducer.class);
        job.setReducerClass(IndexerReducer.class);

        // Sort the output words in reversed order.
        job.setSortComparatorClass(Comparator.class);


        job.setNumReduceTasks(1);

        // Specify (key, value).
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // Input.
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output.
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileSystem hdfs = FileSystem.get(conf);

        // Delete output directory (if exists).
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute the job.
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

共有1个答案

何禄
2023-03-14

尝试这些来调试您的问题 -

  • 将还原数设置为0,然后查看映射器输出
  • 尝试使用默认比较器,在比较器中也需要强制转换对象,否则它们不会产生正确的结果
 类似资料:
  • 我遇到了一个非常非常奇怪的问题。还原器确实工作,但如果我检查输出文件,我只能找到映射器的输出。当我尝试调试时,在将映射器的输出值类型从Longwritable更改为Text之后,我发现了与单词计数示例相同的问题 这是结果。 然后我在输出文件中发现了奇怪的结果。这个问题发生在我将map的输出值类型和reducer的输入键类型更改为Text之后,无论我是否更改了reduce输出值的类型。我还被迫更改j

  • 有人能给一个Hbase的mapduce链接吗?我的要求是在hdfs文件上运行mapduce,并将减速器输出存储到hbase表中。Mapper输入将是hdfs文件,输出将是Text、IntWritable键值对。减速器输出将是put对象,即添加减速器Iterable IntWritable值并存储在hbase表中。

  • 目前我有一个DAO在做这样的事情: 而映射器则执行如下操作:(简化为该问题所需的内容) 作为JsonB字段存储在Aurora中。我读过一些关于注释的文章,但由于jdbi文档不清楚,所以我不确定这是否正确。 我映射结果的方式是“标准”方式吗?还是有更好/更有效的方式? 谢谢

  • 我正在学习Hadoop,并尝试执行我的Mapduce程序。所有Map任务和Reduce er任务都完成得很好,但Reducer将Mapper Output写入Output文件。这意味着根本没有调用Reduce函数。我的示例输入如下所示 预期输出如下所示 以下是我的计划。 这里问了同样的问题,我在reduce函数中使用了Iterable值作为该线程中建议的答案。但这并不能解决问题。我不能在那里发表评

  • 本文向大家介绍MyBatis输入映射和输出映射实例详解,包括了MyBatis输入映射和输出映射实例详解的使用技巧和注意事项,需要的朋友参考一下 什么是 MyBatis ? MyBatis 是支持定制化 SQL、存储过程以及高级映射的优秀的持久层框架。MyBatis 避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集。MyBatis 可以对配置和原生Map使用简单的 XML 或注解,将接口

  • 输入: 文件01.txt 文件02.txt 这意味着这两个测试都将有两条路径要处理。我打印了一些日志信息,试图理解Map/Reduce的过程。 看看从开始刷新映射输出到完成溢出0之间的关系:wordcount程序在最后一个reduce之前还有两个reduce任务,而secondsort程序只做一次reduce就完成了。由于这些程序非常“小”,我认为io.sort.mb/io.sort.refact