当前位置: 首页 > 知识库问答 >
问题:

如何在单个MapReaver中读取多种类型的Avro数据

姚乐家
2023-03-14

我有两种不同类型的Avro数据,它们有一些公共字段。我想在映射器中读取这些公共字段。我想通过在集群中生成单个作业来阅读本文。

下面是示例avro模式

模式1:

{“type”:“record”,“name”:“Test”,“namespace”:“com.abc.schema.SchemaOne”,“doc”:“Avro使用MR.存储模式”,“fields”:[{“name”:“EE”,“type”:“string”,“default”:null},{“name”:“AA”,“type”:[“null”,“long”],“default”:null},{“name”:“BB”,“type”:[“null”,“string”],“default”:null},{“name”:“CC”,“type”:[“null”,“string”;“null”,“string”;“default

模式2:

{“type”:“record”,“name”:“Test”,“namespace”:“com.abc.schema.SchemaTwo”,“doc”:“Avro使用MR.存储模式”,“fields”:[{“name”:“EE”,“type”:“string”,“default”:null},{“name”:“AA”,“type”:[“null”,“long”],“default”:null},{“name”:“CC”,“type”:[“null”,“string”],“default”:null},{“name”:“DD

驾驶员等级:

package com.mango.schema.aggrDaily;

import java.util.Date;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(super.getConf(), getClass());
        conf.setJobName("DF");

        args[0] = "hdfs://localhost:9999/home/hadoop/work/alok/aggrDaily/data/avro512MB/part-m-00000.avro";
        args[1] = "/home/hadoop/work/alok/tmp"; // temp location
        args[2] = "hdfs://localhost:9999/home/hadoop/work/alok/tmp/10";

        FileInputFormat.addInputPaths(conf, args[0]);
        FileOutputFormat.setOutputPath(conf, new Path(args[2]));

        AvroJob.setInputReflect(conf);
        AvroJob.setMapperClass(conf, AvroMapper.class);

        AvroJob.setOutputSchema(
                conf,
                Pair.getPairSchema(Schema.create(Schema.Type.STRING),
                        Schema.create(Schema.Type.INT)));

        RunningJob job = JobClient.runJob(conf);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        long startTime = new Date().getTime();
        System.out.println("Start Time :::::" + startTime);
        Configuration conf = new Configuration();
        int exitCode = ToolRunner.run(conf, new AvroDriver(), args);
        long endTime = new Date().getTime();
        System.out.println("End Time :::::" + endTime);
        System.out.println("Total Time Taken:::"
                + new Double((endTime - startTime) * 0.001) + "Sec.");
        System.exit(exitCode);
    }
}

映射器类:

package com.mango.schema.aggrDaily;

import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.mapred.Reporter;

public class AvroMapper extends
        AvroMapper<GenericData, Pair<CharSequence, Integer>> {

    @Override
    public void map(GenericData record,
        AvroCollector<Pair<CharSequence, Integer>> collector, Reporter reporter) throws IOException {
        System.out.println("record :: " + record);
    }

}

通过设置输入模式,我可以用这段代码读取Avro数据。

AvroJob。setInputSchema(conf,new AggrDaily()。getSchema())

由于Avro数据在数据中内置了模式,我不想将特定的模式显式地传递给作业。我在猪身上做到了这一点。但现在我想在MapReduce中实现同样的效果。

有人能帮我通过MR代码实现这一点吗?或者让我知道我哪里出错了?


共有2个答案

罗新
2023-03-14

我们不能使用org.apache.hadoop.mapreduce.lib.input.MultipleIn来读取多个avro数据,因为每个avro输入都有一个与其关联的模式,并且当前上下文只能存储其中一个输入的模式。所以其他映射器将无法读取数据。

HCatInputFormat也是如此(因为每个输入都有一个与之关联的模式)。但是,在Hcatalog 0.14以后的版本中,有相同的规定。

AvroMultipleIn可以用来完成相同的。它只适用于特定映射和反射映射。它可从版本1.7.7起。

章飞章
2023-03-14

由*组织。阿帕奇。hadoop。mapreduce。自由党。输入多输入类我们可以通过单个MR作业读取多个avro数据

 类似资料:
  • 我有一个主题,从中我可以接收不同类型的JSON。然而,当使用者试图读取消息时,我似乎得到了一个异常。我尝试添加其他bean名称,但没有成功。它似乎试图从主题中阅读,并试图转换到从主题中阅读的所有类型。是否有一种方法可以指定只对特定输入类型启用特定工厂。还有其他方法可以解决这个问题吗。 服务

  • 我有一个建立在Kafka之上的事件源应用程序。目前,我有一个主题中有多个消息类型。所有序列化/反序列化的JSON。 那么这种方法如何与Kafka流媒体应用程序一起工作呢?在该应用程序中,您需要指定一个键和值serde? 我是不是应该忘了Avro而改用protobuff呢?

  • 问题内容: 我已经在一个类的单个方法中初始化了InputStream,并将其传递给下一个方法进行处理。InputStream本质上封装了CSV文件以进行处理。 另一个方法调用传入同一个InputStream的2个不同方法,一个用于检索标头,另一个用于处理内容。该结构如下所示: 我在这里做错什么了吗?有什么方法可以在不同的方法调用之间重用InputStream。 我正在提出可以模仿以下问题的完整程序

  • 我有以下avro模式 然而,当我通过kafka流式传输一些事件以与此模式产生火花时,流式数据帧将数据字段描述为一个结构,其成员具有模式中指定的DataType,如下图所示。

  • 问题内容: 我通过本机方法获得了。 在3个开始s,则仅包含双打。第三个告诉我接下来的双打次数。 我能够阅读前三秒。 为什么在我尝试读取双打时代码崩溃? 获得前三个整数的相关代码: 获得剩余双打的相关代码: 问题答案: 在您发布的代码中,您将其称为: 这会将12加到 ,而不是数字。 返回一个地址;由于前3 个字节各为4个字节,因此我相信您正确地添加了12个字节,但未 在正确的位置添加它 。 您可能打

  • 问题内容: 很抱歉,如果这是重复的,尽管我无法在任何地方找到确切的答案: 有没有办法在 postgreSQL中 创建一个包含多个数据类型的数组? 我有一个类型的列(类型文本数组);尽管我想从type插入三个条目,然后插入第四个条目到此数组中。 有办法吗?如果是这样,怎么办? 问题答案: 我不认为有一种方法可以声明具有多个类型的数组。但是,我认为您可以使用复合类型来完成您想做的事情,例如, 然后,您