我正在做一些类似于标准MapReduce示例的事情——字数统计,但是有所改变,我只希望得到前N个结果。
假设我在HDFS有一个非常大的文本数据集。有大量的例子展示了如何构建一个Hadoop MapReduce作业,为你提供文本中每个单词的字数。例如,如果我的语料库是:
“这是对测试数据的检验,也是检验这一点的好方法”
来自标准 MapReduce 字数统计作业的结果集为:
测试:3、a:2、this:2、is:1等。
但是,如果我只想获得在我的整个数据集中使用的前3个单词呢?
我仍然可以运行完全相同的标准MapReduce单词计数作业,然后在它准备好并输出每个单词的计数后,只获取前3个结果,但这似乎有点低效,因为在洗牌阶段需要移动大量数据。
我想的是,如果这个样本足够大,并且数据在HDFS中是随机且分布良好的,那么每个映射器不需要将其所有字数发送到Reducers,而只需要将一些顶部数据发送到Reducers。所以如果一个映射器有这样的:
a: 8234,人:5422,人:4352,…更多的词…,rareword:1,古怪词:1,等等。
然后,我想做的只是将每个映射器的前100个左右的单词发送到Reducer阶段——因为当所有的事情都结束时,“稀有单词”突然出现在前3名的可能性很小。这似乎可以节省带宽和Reducer处理时间。
这可以在组合器阶段完成吗?这种在洗牌阶段之前的优化通常会完成吗?
引用托马斯的话
要获得Top N,您只需将本地HashMultiset中的Top N写入输出收集器,并以正常方式在减少端聚合结果。这也为您节省了大量网络带宽,唯一的缺点是您需要在清理方法中对字数元组进行排序。
如果您只在本地HashMultiset中写入前N个元素,则可能会丢失元素的计数,如果从本地Hash多重集合传递,该元素可能会成为总的前10个元素之一。
例如,将以下格式视为三个映射,即MapName: elementName,elemenntcount:
地图 A : Ele1,4 : Ele2,5 : Ele3,5 : Ele4,2
地图 B : Ele1,1 : Ele5,7 : Ele6, 3 : Ele7,6
地图C:Ele5,4:Ele8,3:Ele1,1:Ele9,3
现在,如果我们考虑每个映射器的前3个,我们将错过元素“Ele1”,其总数应该是6,但由于我们正在计算每个映射器的前3个,我们看到“Ele1”的总计数为4。
我希望这是有道理的。请让我知道你对此的看法。
这是一个非常好的问题,因为您已经发现了Hadoop的单词计数示例的低效性。
优化问题的技巧如下:
在本地地图阶段执行基于HashMap
的分组,您也可以使用组合器。这看起来像这样,我正在使用Guava的HashMultiSet
,它促进了一个很好的计数机制。
public static class WordFrequencyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final HashMultiset<String> wordCountSet = HashMultiset.create();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
wordCountSet.add(token);
}
}
在清理阶段,您会发出结果:
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Text key = new Text();
LongWritable value = new LongWritable();
for (Entry<String> entry : wordCountSet.entrySet()) {
key.set(entry.getElement());
value.set(entry.getCount());
context.write(key, value);
}
}
因此,您将单词分组到本地工作块中,从而通过使用一位RAM来减少网络使用。您也可以使用<code>组合器
要获得前N名,只需将本地< code>HashMultiset中的前N名写入输出收集器,并在reduce端以正常方式聚合结果。这也为您节省了大量的网络带宽,唯一的缺点是您需要在清理方法中对字数元组进行排序。
部分代码可能如下所示:
Set<String> elementSet = wordCountSet.elementSet();
String[] array = elementSet.toArray(new String[elementSet.size()]);
Arrays.sort(array, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// sort descending
return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
}
});
Text key = new Text();
LongWritable value = new LongWritable();
// just emit the first n records
for(int i = 0; i < N, i++){
key.set(array[i]);
value.set(wordCountSet.count(array[i]));
context.write(key, value);
}
我希望你能领会在本地尽可能多地使用这个词的要点,然后只汇总前N个中的前N个;)
我是hadoop领域的新手,正在努力完成一个简单的任务。 有人能告诉我如何仅使用Map duce代码技术来获取字数示例的前n个值吗? 我不想为这个简单的任务使用任何hadoop命令。
嗨,我有下面的map-reduce代码,我试图通过它解析我的XML文件并在输出中创建一个CSV。 我还有一个名为Connect_Home的类,在这个类中,我使用JAXB解析数据,提取数据。但当我运行代码时,会出现以下错误:
问题内容: 我刚刚开始使用hadoop / hbase MapReduce工作(使用cloudera),但我有以下问题: 假设我们有一个带有主要和静态viariable的java类。该类定义与Mapper和Reducer任务相对应的内部类。在启动作业之前,主程序初始化静态变量。在Mapper类中读取此变量。然后使用群集上的“ hadoop jar”启动该类。 我的问题:我看不到其他节点上的Map和
我在单个节点上使用hadoop 1.0.1,并尝试使用python 2.7流式传输制表符分隔的文件。我可以让Michael Noll的字数计数脚本使用hadoop/python运行,但无法让这个极其简单的映射器和减速器工作,只是复制文件。这是映射器: 这是减速器: 以下是输入文件的一部分: mapper和reducer在linux中运行良好: 但在我修改映射器和reducer之后,将输入文件移动到
我在一个大约50个节点的集群上运行2.2.0上的hadoop,我的工作是64个map任务和20个reduce任务。map在大约30分钟内完成,然后所有reduce任务都在运行,但是我发现一个奇怪的日志是这样的:
问题内容: 我的问题对于HADOOP用户而言似乎很愚蠢。但是我对在地图减少问题中使用泛型感到困惑,例如“ WORD COUNT”。 我知道,泛型被基本用于类型转换和类型安全。但是我不能在这里将这个概念联系起来。 在字数问题上, 请任何人在这里让我明白泛型的使用 。如果我在问这个问题时犯了任何错误,请纠正我。 现在,我了解将泛型用于键值对(KEY IN,VALUE IN,KEY OUT,VALUE