当前位置: 首页 > 知识库问答 >
问题:

如何使用用户定义的类和toDF将RDD转换为数据帧

岑和风
2023-03-14

当我尝试通过sbt pack创建以下包时:

import org.apache.spark.sql.SparkSession

class Log(val cip: String, val scstatus: Int) {
    var src: String = cip
    var status: Int = scstatus
}

object IISHttpLogs {
  def main(args: Array[String]) {
    val logFiles = "D:/temp/tests/wwwlogs" 
    val spark = SparkSession.builder.appName("LogParser").getOrCreate()
    val sc = spark.sparkContext;
    sc.setLogLevel("ERROR")

    val logs = sc.textFile(logFiles)        

    import spark.implicits._
    val rowDF = logs.filter(l => !l.startsWith("#"))
        .map(l => l.split(" "))
        .map(c => new Log(c(8), c(11).trim.toInt))
        .toDF();
    println(s"line count: ${rowDF.count()}")        
    rowDF.createOrReplaceTempView("rows")
    val maxHit = spark.sql("SELECT top 1 src, count(*) FROM rows group by src order by count(*) desc")
    maxHit.show()

    spark.stop()
  }
}

我得到以下错误:

值toDF不是组织的成员。阿帕奇。火花rdd。RDD[日志]

我尝试了以下几种方法:

  • 托德弗洛格

我就是不能编译我的代码

欢迎任何线索来克服这个错误。

我很好地阅读了从case类生成Spark StructType/Schema,并编写了:

val schema =
    StructType(
        StructField("src", StringType, false) ::
        StructField("status", IntegerType, true) :: Nil)

val rowRDD = logs.filter(l => !l.startsWith("#"))
    .map(l => l.split(" "))
    .map(c => Row(c(8), c(11).trim.toInt));

val rowDF = spark.sqlContext.createDataFrame(rowRDD, schema); 

但这样做时,我不使用Log类。我想知道是否有一种方法可以通过使用定义的日志类获取数据帧,或者官方/最佳方法是使用行类?

例如,我不会写:

val rowRDD = logs.filter(l => !l.startsWith("#"))
    .map(l => l.split(" "))
    .map(c => new Log(c(8), c(11).trim.toInt));
val rowDF = spark.sqlContext.createDataFrame(
    rowRDD,
    ScalaReflection.schemaFor[Log].dataType.asInstanceOf[StructType]);

我就是不明白为什么?

共有1个答案

谭奕
2023-03-14

你必须使用案例类。至少对我有用:

case class Log(cip: String,  scstatus: Int)
//...
.map(c =>  Log(c(8), c(11).trim.toInt) // ommit 'new'
.toDF()

我不太确定这是否是一般规则。但是在Dataset API的公告中,明确提到了case类的使用:

Spark 1.6支持为各种类型自动生成编码器,包括基本类型(例如字符串、整数、长)、Scala case类和Java bean。(https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html)

如果不能使用case类,这个答案似乎是合适的。

 类似资料:
  • 我有地图的RDD,我想把它转换成数据帧,这里是RDD的输入格式 有没有办法转换成数据帧像 df.show

  • 我尝试使用以下代码获取数据帧的分区数量: 按照我的理解,dataframe通过元数据给rdd增加了一个结构层。那么,为什么在转换成rdd时要花这么多时间呢?

  • 我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码:

  • 我用Avro(序列化器和反序列化器)收到Kafka主题的推文。然后,我创建了一个spark consumer,它在RDD[GenericRecord]的数据流中提取推文。现在,我想将每个rdd转换为数据帧,通过SQL分析这些推文。有什么解决方案可以将RDD[GenericRecord]转换为数据帧吗?

  • 我的RDD的结构由三列组成,基于Tuple3,其中签名是: TemperatureRecord类型=(LocalDate,Location,Double) 字段LocalDate是来自包Java.time.LocalDate的Java对象。 字段Location是一个定制类型,由两个具有以下签名的Double(GPS坐标)组成: 关于我的应用程序/环境的一些详细信息: Scala:2.11.8 火

  • 我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。