我定义了一个AVRO模式,并使用AVRO工具为这些模式生成了一些类。现在,我想将数据序列化到磁盘。我找到了一些关于scala的答案,但不适用于Java。类<code>文章
下面是我如何尝试这样做的代码的简化版本:
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String fileContent = fileNameContent._2();
// An object from my avro schema
Article a = new Article(fileContent);
Processing processing = new Processing();
// .... some processing of the content here ... //
processing.serializeArticleToDisk(avroFileName);
return a;
});
其中序列化ArticleToDisk(avroFileName)
定义如下:
public void serializeArticleToDisk(String filename) throws IOException{
// Serialize article to disk
DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
dataFileWriter.create(this.article.getSchema(), new File(filename));
dataFileWriter.append(this.article);
dataFileWriter.close();
}
其中文章
是我的avro模式。
现在,映射器向我抛出错误:
java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)
. . . rest of the stacktrace ...
虽然文件路径是正确的。
之后我使用了 collect()
方法,因此 map
函数中的其他所有内容都可以正常工作(序列化部分除外)。
我对Spark很陌生,所以我不确定这是否可能是微不足道的事情。我怀疑我需要使用一些写入函数,而不是在映射器中进行写入(尽管不确定这是否属实)。有什么想法如何解决这个问题?
编辑:
我展示的错误堆栈跟踪的最后一行实际上在这一部分:
dataFileWriter.create(this.article.getSchema(), new File(filename));
这是抛出实际错误的部分。我假设< code>dataFileWriter需要用其他东西来替换。有什么想法吗?
看来你使用Spark的方式不对。
< code>Map是一个转换函数。仅调用< code>map不会调用< code>RDD的计算。您必须调用像< code>forEach()或< code>collect()这样的操作。
还要注意,提供给< code > map lambda将在驱动程序中序列化,并传输到群集中的某个< code >节点。
添加
尝试使用 Spark SQL 和 Spark-Avro 以 Avro 格式保存 Spark DataFrame
:
// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("/examples/people.txt")
.map(Person::parse);
// Apply a schema to an RDD
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class);
peopleDF.write()
.format("com.databricks.spark.avro")
.save("/output");
此解决方案不使用数据帧并且不会抛出任何错误:
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;
. . . . .
// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {
return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
});
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class,
job.getConfiguration());
其中AvroUtils.getJobOutputKeyAvroSchema
是:
public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
Job job;
try {
job = new Job();
} catch (IOException e) {
throw new RuntimeException(e);
}
AvroJob.setOutputKeySchema(job, avroSchema);
return job;
}
Spark Avro的类似内容可以在这里找到 -
我正在使用kafka从源接收数据,我正在使用用< code>Node.js编写的消费者应用程序,并使用< code>kafka-node连接到kafka服务器。另一方面,生产者是用< code>Java编写的,他们使用一些kafka流库来产生带有模式的avro消息。我可以接收消息,但它们是avro序列化的,下面是我接收的序列化消息格式- 我正在尝试反序列化它,但无法使用 npm模块,因为avsc只
我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。 下面是AvroSerializer用于生成Avro数据的部分。 Kafka中出现的序列化数据如下所示。
当我尝试基于avro模式将avro数据写入s3时 DF数据类型: finaldf.write().option(“avroschema”,string.valueof(inAvroSchema)).format(“com.databricks.spark.avro”).mode(“overwrite”).save(“target_s3_path”); 我得到了错误:
我有Flume Avro水槽和SparkStreams程序来读取水槽。CDH 5.1、Flume 1.5.0、Spark 1.0,使用Scala作为Spark上的程序lang 我能够制作Spark示例并计算Flume Avro事件。 但是我无法将 Flume Avro 事件反序列化为字符串\文本,然后解析结构行。 有人能举例说明如何使用Scala做到这一点吗?
我收到Kafka主题中的二进制Avro文件,我必须对它们进行反序列化。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的Avro文件分离是一种更好的做法,但我无法控制生产者,也无法更改。 我的代码在Apache Storm上运行。首先,我创建一个读卡器: 然后,我尝试反序列化消息,但不声明架构: 但当消息到达时,我会收到一个错误: 我看到的所有答案都是关于
我正在从Cloudera包裹中运行带有Spark 0.9.0的CDH 4.4。 我有一堆Avro文件是通过Pig的AvroStorage UDF创建的。我想在 Spark 中加载这些文件,使用通用记录或载入 Avro 文件的架构。到目前为止,我已经尝试过这个: 这适用于一个文件,但它不能扩展——我将所有数据加载到本地RAM中,然后从那里跨spark节点分发。