我有两种不同类型的Avro数据,它们有一些公共字段。我想在映射器中读取这些公共字段。我想通过在集群中生成单个作业来阅读本文。
下面是示例avro模式
模式1:
{“type”:“record”,“name”:“Test”,“namespace”:“com.abc.schema.SchemaOne”,“doc”:“Avro使用MR.存储模式”,“fields”:[{“name”:“EE”,“type”:“string”,“default”:null},{“name”:“AA”,“type”:[“null”,“long”],“default”:null},{“name”:“BB”,“type”:[“null”,“string”],“default”:null},{“name”:“CC”,“type”:[“null”,“string”;“null”,“string”;“default
模式2:
{“type”:“record”,“name”:“Test”,“namespace”:“com.abc.schema.SchemaTwo”,“doc”:“Avro使用MR.存储模式”,“fields”:[{“name”:“EE”,“type”:“string”,“default”:null},{“name”:“AA”,“type”:[“null”,“long”],“default”:null},{“name”:“CC”,“type”:[“null”,“string”],“default”:null},{“name”:“DD
驾驶员等级:
package com.mango.schema.aggrDaily;
import java.util.Date;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvroDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(super.getConf(), getClass());
conf.setJobName("DF");
args[0] = "hdfs://localhost:9999/home/hadoop/work/alok/aggrDaily/data/avro512MB/part-m-00000.avro";
args[1] = "/home/hadoop/work/alok/tmp"; // temp location
args[2] = "hdfs://localhost:9999/home/hadoop/work/alok/tmp/10";
FileInputFormat.addInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
AvroJob.setInputReflect(conf);
AvroJob.setMapperClass(conf, AvroMapper.class);
AvroJob.setOutputSchema(
conf,
Pair.getPairSchema(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.INT)));
RunningJob job = JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
long startTime = new Date().getTime();
System.out.println("Start Time :::::" + startTime);
Configuration conf = new Configuration();
int exitCode = ToolRunner.run(conf, new AvroDriver(), args);
long endTime = new Date().getTime();
System.out.println("End Time :::::" + endTime);
System.out.println("Total Time Taken:::"
+ new Double((endTime - startTime) * 0.001) + "Sec.");
System.exit(exitCode);
}
}
映射器类:
package com.mango.schema.aggrDaily;
import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.mapred.Reporter;
public class AvroMapper extends
AvroMapper<GenericData, Pair<CharSequence, Integer>> {
@Override
public void map(GenericData record,
AvroCollector<Pair<CharSequence, Integer>> collector, Reporter reporter) throws IOException {
System.out.println("record :: " + record);
}
}
通过设置输入模式,我可以用这段代码读取Avro数据。
AvroJob。setInputSchema(conf,new AggrDaily()。getSchema())
由于Avro数据在数据中内置了模式,我不想将特定的模式显式地传递给作业。我在猪身上做到了这一点。但现在我想在MapReduce中实现同样的效果。
有人能帮我通过MR代码实现这一点吗?或者让我知道我哪里出错了?
我们不能使用org.apache.hadoop.mapreduce.lib.input.MultipleIn来读取多个avro数据,因为每个avro输入都有一个与其关联的模式,并且当前上下文只能存储其中一个输入的模式。所以其他映射器将无法读取数据。
HCatInputFormat也是如此(因为每个输入都有一个与之关联的模式)。但是,在Hcatalog 0.14以后的版本中,有相同的规定。
AvroMultipleIn可以用来完成相同的。它只适用于特定映射和反射映射。它可从版本1.7.7起。
由*组织。阿帕奇。hadoop。mapreduce。自由党。输入多输入类我们可以通过单个MR作业读取多个avro数据
我有一个主题,从中我可以接收不同类型的JSON。然而,当使用者试图读取消息时,我似乎得到了一个异常。我尝试添加其他bean名称,但没有成功。它似乎试图从主题中阅读,并试图转换到从主题中阅读的所有类型。是否有一种方法可以指定只对特定输入类型启用特定工厂。还有其他方法可以解决这个问题吗。 服务
我有一个建立在Kafka之上的事件源应用程序。目前,我有一个主题中有多个消息类型。所有序列化/反序列化的JSON。 那么这种方法如何与Kafka流媒体应用程序一起工作呢?在该应用程序中,您需要指定一个键和值serde? 我是不是应该忘了Avro而改用protobuff呢?
问题内容: 我已经在一个类的单个方法中初始化了InputStream,并将其传递给下一个方法进行处理。InputStream本质上封装了CSV文件以进行处理。 另一个方法调用传入同一个InputStream的2个不同方法,一个用于检索标头,另一个用于处理内容。该结构如下所示: 我在这里做错什么了吗?有什么方法可以在不同的方法调用之间重用InputStream。 我正在提出可以模仿以下问题的完整程序
我有以下avro模式 然而,当我通过kafka流式传输一些事件以与此模式产生火花时,流式数据帧将数据字段描述为一个结构,其成员具有模式中指定的DataType,如下图所示。
问题内容: 我通过本机方法获得了。 在3个开始s,则仅包含双打。第三个告诉我接下来的双打次数。 我能够阅读前三秒。 为什么在我尝试读取双打时代码崩溃? 获得前三个整数的相关代码: 获得剩余双打的相关代码: 问题答案: 在您发布的代码中,您将其称为: 这会将12加到 ,而不是数字。 返回一个地址;由于前3 个字节各为4个字节,因此我相信您正确地添加了12个字节,但未 在正确的位置添加它 。 您可能打
问题内容: 很抱歉,如果这是重复的,尽管我无法在任何地方找到确切的答案: 有没有办法在 postgreSQL中 创建一个包含多个数据类型的数组? 我有一个类型的列(类型文本数组);尽管我想从type插入三个条目,然后插入第四个条目到此数组中。 有办法吗?如果是这样,怎么办? 问题答案: 我不认为有一种方法可以声明具有多个类型的数组。但是,我认为您可以使用复合类型来完成您想做的事情,例如, 然后,您