当前位置: 首页 > 知识库问答 >
问题:

如何使用Avro文件上的模式在Spark中加载Avro?

段干帅
2023-03-14

我正在从Cloudera包裹中运行带有Spark 0.9.0的CDH 4.4。

我有一堆Avro文件是通过Pig的AvroStorage UDF创建的。我想在 Spark 中加载这些文件,使用通用记录或载入 Avro 文件的架构。到目前为止,我已经尝试过这个:

import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)

val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader =  new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema

val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext)  {
  buf += dataFileReader.next
}
sc.parallelize(buf)

这适用于一个文件,但它不能扩展——我将所有数据加载到本地RAM中,然后从那里跨spark节点分发。

共有2个答案

傅志用
2023-03-14

这对我很有效:

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable

...
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
廖琨
2023-03-14

回答我自己的问题:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import java.io.BufferedInputStream
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.hadoop.mapred.JobConf
import java.io.File
import java.net.URI

// spark-shell -usejavacp -classpath "*.jar"

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"

val jobConf= new JobConf(sc.hadoopConfiguration)
val rdd = sc.hadoopFile(
  input,
  classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
  classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
  classOf[org.apache.hadoop.io.NullWritable],
  10)
val f1 = rdd.first
val a = f1._1.datum
a.get("rawLog") // Access avro fields
 类似资料:
  • 我通过安装了spark ~/miniconda3/envs/audience/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(应答,gateway_client,target_id,name)318 Rise Py4JJavaError(319“调用{0}{1}{2}时出错.\n”。-->320格式(target_i

  • 我一直在尝试使用Spark中的加载本地文件。 我已经读过[问题]:如何在sc.textfile中加载本地文件,而不是HDFS Centos 7.0上中有本地文件 使用时,出现如下错误。 16/12/27 12:15:56警告TaskSetManager:stage 5.0中丢失任务0.0(TID 36,):java.io.FileNotFoundException:File File:/home/

  • 有没有办法从Apache spark生成无模式的avro?我可以看到一种使用apache avro库通过Java/Scala和融合avro生成它的方法。当我用下面的方式从Spark编写Avro时,它用模式创建了Avro。我想在没有模式的情况下创建,以减少最终数据集的大小。

  • 当我在spack-2.2.0中加载xml文件时,如下所示: 它向我展示了一个错误: JAVAlang.ClassNotFoundException:未能找到数据源:xml。请在http://spark.apache.org/third-party-projects.html在org。阿帕奇。火花sql。处决数据源。数据源$。org上的lookUpdateSource(DataSource.scal

  • 问题内容: 我想将Spark参数(例如输入文件,输出文件)存储到Java属性文件中,然后将该文件传递到Spark Driver中。我正在使用spark- submit提交作业,但是找不到参数来传递属性文件。你有什么建议吗? 问题答案: 在这里,我找到了一种解决方案: props文件 :(mypropsfile.conf)// 注意:密钥的前缀为“ spark”。 否则道具将被忽略。 发射 如何调用

  • 根据Avro文档中“default”属性的定义:“此字段的默认值,用于读取缺少此字段的实例(可选)。” 这意味着,如果缺少相应的字段,则采用默认值。 但事实似乎并非如此。考虑下面的<代码>学生<代码>模式: 模式表示:如果“年龄”字段丢失,则将值视为-1。“名称”字段也是如此。 现在,如果我尝试从以下JSON构建学生模型: 我得到一个例外: 看起来默认设置未按预期工作。那么,违约的作用到底是什么呢