当前位置: 首页 > 知识库问答 >
问题:

reducer中的Hadoop组

莫誉
2023-03-14

我决定创建自己的WritableComparable类来学习Hadoop如何使用它。因此,我创建了一个带有两个实例变量(orderNumber cliente)的Order类,并实现了所需的方法。我还为getters/setters/hashcode/equals/toString使用了Eclipse生成器。

相比较而言,我决定只使用orderNumber变量。

我创建了一个简单的MapReduce作业,仅用于统计数据集中订单的出现次数。错误地,我的一个测试记录是Ita而不是Itá,正如您可以在这里看到的:

123 Ita
123 Itá
123 Itá
345 Carol
345 Carol
345 Carol
345 Carol
456 Iza Smith

根据我的理解,第一条记录应该被视为不同的顺序,因为记录1的hashCode与记录2和3的hashCode不同。

Order [cliente=Ita, orderNumber=123]    3
Order [cliente=Carol, orderNumber=345]  4
Order [cliente=Iza Smith, orderNumber=456]  1

我想它应该有一个行的itárecords与count 2和Ita应该有count 1。

由于我在compareTo中只使用了orderNumber,所以我尝试在这个方法中使用html" target="_blank">字符串cliente(下面对代码进行了评论)。然后,它就像我所期待的那样奏效了。

那么,那是意料之中的结果吗?hadoop难道不应该只使用hashCode对key及其值进行分组吗?

public class Order implements WritableComparable<Order>
{
private String cliente;
private long orderNumber;


@Override
public void readFields(DataInput in) throws IOException 
{
    cliente = in.readUTF();
    orderNumber = in.readLong();

}


@Override
public void write(DataOutput out) throws IOException 
{
    out.writeUTF(cliente);
    out.writeLong(orderNumber);

}

@Override
public int compareTo(Order o) {
    long thisValue = this.orderNumber;
    long thatValue = o.orderNumber;
    return (thisValue < thatValue ? -1 :(thisValue == thatValue ? 0 :1));
    //return this.cliente.compareTo(o.cliente);
}

@Override
public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((cliente == null) ? 0 : cliente.hashCode());
    result = prime * result + (int) (orderNumber ^ (orderNumber >>> 32));
    return result;
}


@Override
public boolean equals(Object obj) {
    if (this == obj)
        return true;
    if (obj == null)
        return false;
    if (getClass() != obj.getClass())
        return false;
    Order other = (Order) obj;
    if (cliente == null) {
        if (other.cliente != null)
            return false;
    } else if (!cliente.equals(other.cliente))
        return false;
    if (orderNumber != other.orderNumber)
        return false;
    return true;
}


@Override
public String toString() {
    return "Order [cliente=" + cliente + ", orderNumber=" + orderNumber + "]";
}
public class TesteCustomClass extends Configured implements Tool
{
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Order, LongWritable>
{
    LongWritable outputValue = new LongWritable();
    String[] campos;
    Order order = new Order();

        @Override
    public void configure(JobConf job)
    {
    }

    @Override
    public void map(LongWritable key, Text value, OutputCollector<Order, LongWritable> output, Reporter reporter) throws IOException 
            {
        campos = value.toString().split("\t");

            order.setOrderNumber(Long.parseLong(campos[0]));
        order.setCliente(campos[1]);

        outputValue.set(1L);
        output.collect(order, outputValue);
    }
}

public static class Reduce extends MapReduceBase implements Reducer<Order, LongWritable, Order,LongWritable>
{

    @Override
    public void reduce(Order key, Iterator<LongWritable> values,OutputCollector<Order,LongWritable> output, Reporter reporter) throws IOException 
    {
        LongWritable value = new LongWritable(0);
        while (values.hasNext())
        {
            value.set(value.get() + values.next().get());
        }
        output.collect(key, value);
    }
}

@Override
public int run(String[] args) throws Exception {

    JobConf conf = new JobConf(getConf(),TesteCustomClass.class);

    conf.setMapperClass(Map.class);
    //  conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    conf.setJobName("Teste - Custom Classes");

    conf.setOutputKeyClass(Order.class);
    conf.setOutputValueClass(LongWritable.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));


    JobClient.runJob(conf);

    return 0;

}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(),new TesteCustomClass(),args);
    System.exit(res);
}
}

共有1个答案

曹茂材
2023-03-14

默认的分区器是hashpartitioner,它使用hashcode方法来确定将K、V对发送到哪个还原器。

在reducer中(或者如果您使用的是运行映射端的组合器),compareTo方法用于对键进行排序,然后(默认情况下)还用于比较是否应该将顺序键分组在一起,并在同一迭代中减少它们的关联值。

如果您不使用cliente键变量,而在compareTo方法中只使用ordernumber变量,那么具有相同ordernumber的任何键的值都将一起被缩小--而不管cliente值(这就是您当前观察的值)

 类似资料:
  • 我只是使用3机器集群测试单词计数示例。我的代码与此示例相同,但以下代码除外:

  • 我在使用MapReduce作业时遇到了问题。我的函数确实运行并产生了所需的输出。但是,函数没有运行。该函数似乎从未被调用过。我使用Text作为键,Text作为值。但我不认为这会导致问题。 输入文件的格式如下: 我想将一行的第二个日期提取为<code>Text<code>并将其用作reduce的键。键的值将是同一行中最后两个<code>float<code>值的组合 即: 以便可以将值部分视为由空白

  • 我正在学习一些MapReduce,但是我遇到了一些问题,情况是这样的:我有两个文件:“users”包含一个用户列表,其中包含一些用户数据(性别、年龄、国家等)...)文件看起来像这样: “歌曲”包含所有用户收听的歌曲的数据(用户ID,收听日期和时间,艺术家ID,艺术家姓名,歌曲ID,歌曲标题): 目标是在某些国家找到k首最受欢迎的歌曲。k和输入中提供的国家列表。 我决定为映射器使用Multiple

  • 减速器 自定义可写类

  • 我试图将批量加载map-reduce定制到HBase中,我遇到了reducer的问题。起初我认为我没有写好reducer,但是在reducer中抛出运行时异常并看到代码工作时,我意识到reducer根本没有运行。到目前为止,我看不出这个问题的一些常见答案有什么问题; 我的配置将mapoutput和output分开。 我的减速器和映射器具有覆盖功能。 我有Iterable,我的reducer输入是(

  • 我试图运行WordCount示例的一个变体,这个变体是,映射器输出文本作为键和文本作为值,而还原器输出文本作为键和NullWritable作为值。 除了地图,减少签名,我把主要的方法是这样的: