我有几个<code>(标题,文本)
现在我想在这些有序对的文本字段上实现单词计数。
所以我的最终输出应该是这样的:
(title-a , word-a-1 , count-a-1 , word-a-2 , count-a-2 ....)
(title-b , word-b-1, count-b-1 , word-b-2 , count-b-2 ....)
.
.
.
.
(title-x , word-x-1, count-x-1 , word-x-2 , count-x-2 ....)
总而言之,我想在第一个mapduce的输出记录上单独实现wordcount。有人能给我一个好方法吗?或者我如何链接第二个map duce作业来创建上述输出或更好地格式化它?
下面是代码,从github借用并做了一些更改
package com.org;
import javax.xml.stream.XMLStreamConstants;//XMLInputFactory;
import java.io.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import javax.xml.stream.*;
public class XmlParser11
{
public static class XmlInputFormat1 extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}
/**
* XMLRecordReader class to read through a given xml document to output
* xml blocks as records as specified by the start tag and end tag
*
*/
// @Override
public static class XmlRecordReader extends
RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit fileSplit = (FileSplit) split;
// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0,
buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}
}
public static class Map extends Mapper<LongWritable, Text,Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws
IOException, InterruptedException {
String document = value.toString();
System.out.println("'" + document + "'");
try {
XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new
ByteArrayInputStream(document.getBytes()));
String propertyName = "";
String propertyValue = "";
String currentElement = "";
while (reader.hasNext()) {
int code = reader.next();
switch (code) {
case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:
currentElement = reader.getLocalName();
break;
case XMLStreamConstants.CHARACTERS: //CHARACTERS:
if (currentElement.equalsIgnoreCase("title")) {
propertyName += reader.getText();
//System.out.println(propertyName);
} else if (currentElement.equalsIgnoreCase("text")) {
propertyValue += reader.getText();
//System.out.println(propertyValue);
}
break;
}
}
reader.close();
context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));
}
catch(Exception e){
throw new IOException(e);
}
}
}
public static class Reduce
extends Reducer<Text, Text, Text, Text> {
@Override
protected void setup(
Context context)
throws IOException, InterruptedException {
context.write(new Text("<Start>"), null);
}
@Override
protected void cleanup(
Context context)
throws IOException, InterruptedException {
context.write(new Text("</Start>"), null);
}
private Text outputKey = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
for (Text value : values) {
outputKey.set(constructPropertyXml(key, value));
context.write(outputKey, null);
}
}
public static String constructPropertyXml(Text name, Text value) {
StringBuilder sb = new StringBuilder();
sb.append("<property><name>").append(name)
.append("</name><value>").append(value)
.append("</value></property>");
return sb.toString();
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");
Job job = new Job(conf);
job.setJarByClass(XmlParser11.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(XmlParser11.Map.class);
job.setReducerClass(XmlParser11.Reduce.class);
job.setInputFormatClass(XmlInputFormat1.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
我们在线找到的字数代码对所有文件进行字数统计,并给出输出。我想分别对每个文本字段进行字数计算。上面的映射器用于从XML文档中提取标题和文本。是否有任何方法可以在同一映射器中进行字数统计。如果这样做,我的下一个疑问是如何将它与已经存在的键值对(标题、文本)一起传递给reducer。对不起,我不能正确地表达我的问题,但我想读者一定有一些想法
我建议你可以用正则表达式
并执行映射和分组。在 hadoop 示例 jar 文件中提供 Grep 类,您可以使用正则表达式执行 hdfs 数据的映射。并对映射的数据进行分组。
我不确定我是否理解正确。所以我有很多问题和我的答案。
首先,编写此代码的人可能正在尝试展示如何编写自定义InputFormat以使用MR处理xml数据。我不知道它与您的问题有何关系。
总而言之,我想对第一个mapreduce的输出记录单独实现wordcount。有人能给我建议一个做这件事的好方法吗
读取第一个MR生成的输出文件并执行。
或者我如何链接第二个map reduce作业来创建上面的输出或更好地格式化它?
您绝对可以通过编写多个驱动程序方法(每个方法对应一个)以这种方式将作业链接在一起。有关更多详细信息,请参阅此内容以获取示例。
我想单独为每个文本字段进行字数统计。
你所说的分开是什么意思?在传统的字计数程序中,每个字的计数是独立于其他字计算的。
有什么方法可以让我在同一个映射器中进行字数统计?
我希望您正确理解了wordcount程序。在传统的wordcount程序中,您一次读取一行输入文件,将该行分割成多个字,然后将每个字作为键发出,值为1。所有这些都发生在映射器中,映射器本质上是同一个映射器。然后在作业的缩减部分中确定每个单词的总计数。如果您希望从映射器本身发出单词及其总计数,则必须读取映射器本身中的整个文件并进行计数。为此,您需要将InputFormat中的isSplittable设置为false,这样输入文件将作为一个整体读取,并只转到一个映射器。
当您从Mapper发出一些东西时,如果它不是Map唯一的作业,您的Mapper的输出会自动转到Reduce er。您还需要其他东西吗?
减速器正在计算所有相同的值: 然而,当我在hadoop上运行一个更大的数据集时,似乎丢失了一半的结果。当我在本地机器上使用cat input mapper.py sort reducer.py>out-local测试它时,如果输入合理地很小,它工作得很好,但是在更大的数据集上(例如1M个条目),本地输出文件的条目几乎是在Hadoop上运行mapreduce作业的两倍。代码有错误吗?还是我漏掉了什么
我写了映射和Reduce程序,其中reducer的输出键和值不同于它的输入或映射器的输出。我在司机班上做了适当的改变。下面是我在运行它时得到的异常: 信息MapReduce.job:任务Id:Attribut_1550670375771_4211_M_0000032,状态:失败错误:java.io.ioException:map中的值类型不匹配:expected org.apache.hadoop
给定这9个单词,在页面上显示与其所选数字对应的单词1.mercury2.venus3.earth4.mars5.jupiter6.saturn7.uranus8.neptune9.pluto 我不确定我在这里错过了什么,我做了很多尝试,一个错误,似乎没有什么工作。 我尝试使用NumEntry作为所有if语句的比较,但它不起作用。当我使var NumEntry=true;只有水星会显示。当我做var
我希望我的第一个reduce任务生成类似smth的(当然, 请注意,我无法循环访问
MapReduce传递和发出键值对的基本信息。我需要弄清楚我们通过什么和发出什么。以下是我的关注点:MapReduce输入和输出: 1.Map() 方法 - 它是否采用单个或列表的键值对并发出什么?2.对于每个输入键值对,发出什么映射器?相同类型还是不同类型 ?3.对于每个中间键,减速器会发出什么?类型是否有任何限制?4.减速机接收所有具有相同键的值。值将如何排序,如排序或轨道排序?该顺序是否因运
本文向大家介绍Hadoop MapReduce多输出详细介绍,包括了Hadoop MapReduce多输出详细介绍的使用技巧和注意事项,需要的朋友参考一下 Hadoop MapReduce多输出 FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制