当我尝试通过sbt pack
创建以下包时:
import org.apache.spark.sql.SparkSession
class Log(val cip: String, val scstatus: Int) {
var src: String = cip
var status: Int = scstatus
}
object IISHttpLogs {
def main(args: Array[String]) {
val logFiles = "D:/temp/tests/wwwlogs"
val spark = SparkSession.builder.appName("LogParser").getOrCreate()
val sc = spark.sparkContext;
sc.setLogLevel("ERROR")
val logs = sc.textFile(logFiles)
import spark.implicits._
val rowDF = logs.filter(l => !l.startsWith("#"))
.map(l => l.split(" "))
.map(c => new Log(c(8), c(11).trim.toInt))
.toDF();
println(s"line count: ${rowDF.count()}")
rowDF.createOrReplaceTempView("rows")
val maxHit = spark.sql("SELECT top 1 src, count(*) FROM rows group by src order by count(*) desc")
maxHit.show()
spark.stop()
}
}
我得到以下错误:
值toDF不是组织的成员。阿帕奇。火花rdd。RDD[日志]
我尝试了以下几种方法:
我就是不能编译我的代码。
欢迎任何线索来克服这个错误。
我很好地阅读了从case类生成Spark StructType/Schema,并编写了:
val schema =
StructType(
StructField("src", StringType, false) ::
StructField("status", IntegerType, true) :: Nil)
val rowRDD = logs.filter(l => !l.startsWith("#"))
.map(l => l.split(" "))
.map(c => Row(c(8), c(11).trim.toInt));
val rowDF = spark.sqlContext.createDataFrame(rowRDD, schema);
但这样做时,我不使用Log类。我想知道是否有一种方法可以通过使用定义的日志类获取数据帧,或者官方/最佳方法是使用行类?
例如,我不会写:
val rowRDD = logs.filter(l => !l.startsWith("#"))
.map(l => l.split(" "))
.map(c => new Log(c(8), c(11).trim.toInt));
val rowDF = spark.sqlContext.createDataFrame(
rowRDD,
ScalaReflection.schemaFor[Log].dataType.asInstanceOf[StructType]);
我就是不明白为什么?
你必须使用案例类。至少对我有用:
case class Log(cip: String, scstatus: Int)
//...
.map(c => Log(c(8), c(11).trim.toInt) // ommit 'new'
.toDF()
我不太确定这是否是一般规则。但是在Dataset API的公告中,明确提到了case类的使用:
Spark 1.6支持为各种类型自动生成编码器,包括基本类型(例如字符串、整数、长)、Scala case类和Java bean。(https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html)
如果不能使用case类,这个答案似乎是合适的。
我有地图的RDD,我想把它转换成数据帧,这里是RDD的输入格式 有没有办法转换成数据帧像 df.show
我尝试使用以下代码获取数据帧的分区数量: 按照我的理解,dataframe通过元数据给rdd增加了一个结构层。那么,为什么在转换成rdd时要花这么多时间呢?
我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码:
我用Avro(序列化器和反序列化器)收到Kafka主题的推文。然后,我创建了一个spark consumer,它在RDD[GenericRecord]的数据流中提取推文。现在,我想将每个rdd转换为数据帧,通过SQL分析这些推文。有什么解决方案可以将RDD[GenericRecord]转换为数据帧吗?
我的RDD的结构由三列组成,基于Tuple3,其中签名是: TemperatureRecord类型=(LocalDate,Location,Double) 字段LocalDate是来自包Java.time.LocalDate的Java对象。 字段Location是一个定制类型,由两个具有以下签名的Double(GPS坐标)组成: 关于我的应用程序/环境的一些详细信息: Scala:2.11.8 火
我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。