当前位置: 首页 > 知识库问答 >
问题:

如何在Spark中将数据序列化为AVRO模式(用Java)?

章稳
2023-03-14

我定义了一个AVRO模式,并使用AVRO工具为这些模式生成了一些类。现在,我想将数据序列化到磁盘。我找到了一些关于scala的答案,但不适用于Java。类<code>文章

下面是我如何尝试这样做的代码的简化版本:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String fileContent = fileNameContent._2();

    // An object from my avro schema
    Article a = new Article(fileContent);

    Processing processing = new Processing();
    // .... some processing of the content here ... //

    processing.serializeArticleToDisk(avroFileName);

    return a;
});

其中序列化ArticleToDisk(avroFileName)定义如下:

public void serializeArticleToDisk(String filename) throws IOException{
    // Serialize article to disk
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
    dataFileWriter.create(this.article.getSchema(), new File(filename));
    dataFileWriter.append(this.article);
    dataFileWriter.close();
}

其中文章是我的avro模式。

现在,映射器向我抛出错误:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)   
at java.io.FileOutputStream.open0(Native Method)    
at java.io.FileOutputStream.open(FileOutputStream.java:270)     
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)   
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)   
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)   
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)     
. . . rest of the stacktrace ... 

虽然文件路径是正确的。

之后我使用了 collect() 方法,因此 map 函数中的其他所有内容都可以正常工作(序列化部分除外)。

我对Spark很陌生,所以我不确定这是否可能是微不足道的事情。我怀疑我需要使用一些写入函数,而不是在映射器中进行写入(尽管不确定这是否属实)。有什么想法如何解决这个问题?

编辑:

我展示的错误堆栈跟踪的最后一行实际上在这一部分:

dataFileWriter.create(this.article.getSchema(), new File(filename));

这是抛出实际错误的部分。我假设< code>dataFileWriter需要用其他东西来替换。有什么想法吗?

共有2个答案

曹茂材
2023-03-14

看来你使用Spark的方式不对。

< code>Map是一个转换函数。仅调用< code>map不会调用< code>RDD的计算。您必须调用像< code>forEach()或< code>collect()这样的操作。

还要注意,提供给< code > map lambda将在驱动程序中序列化,并传输到群集中的某个< code >节点。

添加

尝试使用 Spark SQL 和 Spark-Avro 以 Avro 格式保存 Spark DataFrame

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("/examples/people.txt")
    .map(Person::parse);

// Apply a schema to an RDD
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class);
peopleDF.write()
    .format("com.databricks.spark.avro")
    .save("/output");
王宜
2023-03-14

此解决方案不使用数据帧并且不会抛出任何错误:

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;

   .  .  .  .  .

// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {    
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
});
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
        job.getConfiguration());

其中AvroUtils.getJobOutputKeyAvroSchema是:

public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
    Job job;

    try {
        job = new Job();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    AvroJob.setOutputKeySchema(job, avroSchema);
    return job;
}

Spark Avro的类似内容可以在这里找到 -

 类似资料:
  • 我正在使用kafka从源接收数据,我正在使用用< code>Node.js编写的消费者应用程序,并使用< code>kafka-node连接到kafka服务器。另一方面,生产者是用< code>Java编写的,他们使用一些kafka流库来产生带有模式的avro消息。我可以接收消息,但它们是avro序列化的,下面是我接收的序列化消息格式- 我正在尝试反序列化它,但无法使用 npm模块,因为avsc只

  • 我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。 下面是AvroSerializer用于生成Avro数据的部分。 Kafka中出现的序列化数据如下所示。

  • 当我尝试基于avro模式将avro数据写入s3时 DF数据类型: finaldf.write().option(“avroschema”,string.valueof(inAvroSchema)).format(“com.databricks.spark.avro”).mode(“overwrite”).save(“target_s3_path”); 我得到了错误:

  • 我有Flume Avro水槽和SparkStreams程序来读取水槽。CDH 5.1、Flume 1.5.0、Spark 1.0,使用Scala作为Spark上的程序lang 我能够制作Spark示例并计算Flume Avro事件。 但是我无法将 Flume Avro 事件反序列化为字符串\文本,然后解析结构行。 有人能举例说明如何使用Scala做到这一点吗?

  • 我收到Kafka主题中的二进制Avro文件,我必须对它们进行反序列化。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的Avro文件分离是一种更好的做法,但我无法控制生产者,也无法更改。 我的代码在Apache Storm上运行。首先,我创建一个读卡器: 然后,我尝试反序列化消息,但不声明架构: 但当消息到达时,我会收到一个错误: 我看到的所有答案都是关于

  • 我正在从Cloudera包裹中运行带有Spark 0.9.0的CDH 4.4。 我有一堆Avro文件是通过Pig的AvroStorage UDF创建的。我想在 Spark 中加载这些文件,使用通用记录或载入 Avro 文件的架构。到目前为止,我已经尝试过这个: 这适用于一个文件,但它不能扩展——我将所有数据加载到本地RAM中,然后从那里跨spark节点分发。