当前位置: 首页 > 知识库问答 >
问题:

包装类型在Hadoop中如何工作?

戚哲
2023-03-14

我的困惑从这里开始,假设我们在HDFS中有一些数据需要使用下面运行在hadoop代码中的Java代码来处理

org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
public class WordCountMapper
{
extends Mapper<LongWritable,Text,Text,IntWritable>
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
}
}
String line = value.toString();
for (String word : line.split(" ")){
if(word.length()>0){
context.write(new Text(word),new IntWritable(1));
}

在这段代码中,Hadoop的类型类似于LongWritable,Text,intwritable。
让我们选择包裹在Java字符串类型周围的Text类型(如果是错误的请纠正我)。
我的疑问是,当我们在上面的代码中将这些参数传递给我们的方法映射时,这些参数如何与导入包中的代码(即org.apache.hadoop.io.Text;)交互

下面是文本类代码

package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.MalformedInputException;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.util.Arrays;
import org.apache.avro.reflect.Stringable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;



@Stringable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Text
  extends BinaryComparable
  implements WritableComparable<BinaryComparable>
{
  private static final Log LOG = LogFactory.getLog(Text.class);

  private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal()
  {
    protected CharsetEncoder initialValue() {
      return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
    }
  };



  private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal()
  {
    protected CharsetDecoder initialValue() {
      return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
    }
  };



  private static final byte[] EMPTY_BYTES = new byte[0];
  private byte[] bytes;
  private int length;

  public Text()
  {
    bytes = EMPTY_BYTES;
  }


  public Text(String string)
  {
    set(string);
  }

  public Text(Text utf8)
  {
    set(utf8);
  }


  public Text(byte[] utf8)
  {
    set(utf8);
  }




  public byte[] getBytes()
  {
    return bytes;
  }

  public int getLength()
  {
    return length;
  }








  public int charAt(int position)
  {
    if (position > length) return -1;
    if (position < 0) { return -1;
    }
    ByteBuffer bb = (ByteBuffer)ByteBuffer.wrap(bytes).position(position);
    return bytesToCodePoint(bb.slice());
  }

  public int find(String what) {
    return find(what, 0);
  }


  public int find(String what, int start)
  {
    try
    {
      ByteBuffer src = ByteBuffer.wrap(bytes, 0, length);
      ByteBuffer tgt = encode(what);
      byte b = tgt.get();
      src.position(start);

      while (src.hasRemaining()) {
        if (b == src.get()) {
          src.mark();
          tgt.mark();
          boolean found = true;
          int pos = src.position() - 1;
          while (tgt.hasRemaining()) {
            if (!src.hasRemaining()) {
              tgt.reset();
              src.reset();
              found = false;

            }
            else if (tgt.get() != src.get()) {
              tgt.reset();
              src.reset();
              found = false;
            }
          }

          if (found) return pos;
        }
      }
      return -1;
    }
    catch (CharacterCodingException e) {
      e.printStackTrace(); }
    return -1;
  }

  public void set(String string)
  {
    try
    {
      ByteBuffer bb = encode(string, true);
      bytes = bb.array();
      length = bb.limit();
    } catch (CharacterCodingException e) {
      throw new RuntimeException("Should not have happened " + e.toString());
    }
  }


  public void set(byte[] utf8)
  {
    set(utf8, 0, utf8.length);
  }

  public void set(Text other)
  {
    set(other.getBytes(), 0, other.getLength());
  }






  public void set(byte[] utf8, int start, int len)
  {
    setCapacity(len, false);
    System.arraycopy(utf8, start, bytes, 0, len);
    length = len;
  }






  public void append(byte[] utf8, int start, int len)
  {
    setCapacity(length + len, true);
    System.arraycopy(utf8, start, bytes, length, len);
    length += len;
  }



  public void clear()
  {
    length = 0;
  }










  private void setCapacity(int len, boolean keepData)
  {
    if ((bytes == null) || (bytes.length < len)) {
      if ((bytes != null) && (keepData)) {
        bytes = Arrays.copyOf(bytes, Math.max(len, length << 1));
      } else {
        bytes = new byte[len];
      }
    }
  }



  public String toString()
  {
    try
    {
      return decode(bytes, 0, length);
    } catch (CharacterCodingException e) {
      throw new RuntimeException("Should not have happened " + e.toString());
    }
  }

  public void readFields(DataInput in)
    throws IOException
  {
    int newLength = WritableUtils.readVInt(in);
    setCapacity(newLength, false);
    in.readFully(bytes, 0, newLength);
    length = newLength;
  }

  public static void skip(DataInput in) throws IOException
  {
    int length = WritableUtils.readVInt(in);
    WritableUtils.skipFully(in, length);
  }




  public void write(DataOutput out)
    throws IOException
  {
    WritableUtils.writeVInt(out, length);
    out.write(bytes, 0, length);
  }

  public boolean equals(Object o)
  {
    if ((o instanceof Text))
      return super.equals(o);
    return false;
  }

我的第二个疑问是:当数据以文本类型存储时,那么谁会踢它开始进行serilization呢?我的意思是,一旦数据到达网络上的目的地,谁调用这个write(DataOutput out),谁调用readFields(DataInput in)?
它是如何工作的,我需要查看哪里?

我希望我问的是清楚的。

共有1个答案

晏阳飙
2023-03-14

像所有的网络或磁盘操作一样,一切都以字节的形式传输。Text类将字节反序列化为UTF-8。可写性决定数据的表示方式,可比性决定数据的排序方式。

作业中设置的InputFormat确定将哪些可写内容分配给map或reduce任务。

InputSplit决定如何拆分原始字节流并将其读入可写文件

 类似资料:
  • 问题内容: 我正在尝试在Hadoop流作业中包括一个python软件包(NLTK),但不确定如何通过CLI参数“ -file”手动不包括每个文件来执行此操作。 编辑:一种解决方案是在所有从站上安装此软件包,但是我目前没有该选项。 问题答案: 我会将软件包压缩为a或a,然后将整个tarball或归档文件作为选项传递给hadoop命令。过去我是用Perl做到的,而Python却没有。 也就是说,如果你

  • 我目前正在读一本名为Java的完整参考的书。第18章介绍了基元类型包装器。 正如本书第一部分所提到的,出于性能原因,Java使用基本类型,如int和char。这些数据类型不是对象层次结构的一部分。它们通过值传递给方法,不能直接通过引用传递。另外,有时两个方法无法引用同一个int实例,您需要为这些基元类型之一创建对象表示。例如,第19章讨论的集合类只处理对象;若要在这些类中的一个中存储基元类型,需要

  • 问题内容: 我有使用Maven创建jar文件的要求,但是需要使用“ foobar”扩展名将它们安装到存储库中,如果它们可以具有自己的打包类型,那么这很好,以便我们可以通过打包来识别这些工件。 我可以设置新的包装类型吗? 问题答案: 要做到像你描述,创建包装Maven项目 罐子 (如说在这里,因为不会有魔力的定义)。在src / main / resources / META-INF / plexu

  • 类具有类似类型的方法(让我们称其为收集器)。我认为这些方法可以通过使其返回一个包装泛型值的类来实现。 这是示例代码 此代码产生错误 我知道我可以通过使用泛型值创建收集器类来解决这个问题。我希望Collector类是单例的,我是指bean。所以我不能这么做。 有办法吗?

  • 问题内容: 这是我的问题和我的项目结构 我有一个要使用的类型 但是,当我尝试将其添加到导入中时,这些问题会出现: 虽然我在函数声明中使用了它 这是什么问题? 我试图用导入 问题答案: 您不能“ 从 包中导入”。您所能做的就是“导入 整个 程序包”。这意味着,如果您和该程序包在开始时就声明自己被称为foo ,则该程序包中的所有内容都必须由以下字符限定: 如果调用了包,则声明变量将使整个包变色:因此,

  • 为了便于操作基本类型值,JavaScript 还提供了3个特殊的引用类型:Boolean、Number 和 String。实际上,每当读取一个基本类型值的时候,后台就会创建一个对应的基本包装类型的对象,从而让我们能够调用一些方法来操作这些数据。来看下面的例子。 var s1 = "some text"; var s2 = s1.substring(2); 这个例子中的变量 s1 包含一个字符串,