我尝试从Kafka加载数据,这是成功的,但我无法转换为火花RDD,
val kafkaParams = Map("metadata.broker.list" -> "IP:6667,IP:6667")
val offsetRanges = Array(
OffsetRange("first_topic", 0,1,1000)
)
val ssc = new StreamingContext(new SparkConf, Seconds(60))
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
现在如何读取此流对象???我的意思是将其转换为Spark数据帧并执行一些计算
我尝试转换到dataframe
stream.foreachRDD { rdd =>
println("Hello")
import sqlContext.implicits._
val dataFrame = rdd.map {case (key, value) => Row(key, value)}.toDf()
}
但是toDf不工作错误:value toDf不是org.apache.spark.rdd.RDD的成员[org.apache.spark.sql.行]
val kafkaParams = Map("metadata.broker.list" -> "IP:6667,IP:6667")
val offsetRanges = Array(
OffsetRange("first_topic", 0,1,1000)
)
val ssc = new StreamingContext(new SparkConf, Seconds(60))
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val lines = stream.map(_.value)
val words = lines.flatMap(_.split(" ")).print() //def createDataFrame(words: RDD[Row], Schema: StructType)
// Start your computation then
ssc.start()
ssc.awaitTermination()
它很旧,但我认为您在从行创建df时忘记了添加模式:
val df = sc.parallelize(List(1,2,3)).toDF("a")
val someRDD = df.rdd
val newDF = spark.createDataFrame(someRDD, df.schema)
(在spark shell 2.2.0中测试)
我刚刚补充道 对这个项目。我有<code>suspend fun foo():Flow 我需要获得
我想用可选的。由于只能连接流,我有以下问题: 如何将可选 转换为流 ? 示例:
问题内容: //或将多部分文件保存到数据库的任何其他解决方案。我尝试用这种方式,但出现错误。 问题答案:
问题内容: 我有一个第三方图书馆给我一个图书馆。我想像Java 8那样懒惰地使用该枚举,并调用诸如此类的东西。 有没有现成的图书馆?我已经在引用Guava和Apache Commons,所以如果其中任何一个都有理想的解决方案。 另外,在保留所有内容的懒惰性质的同时将a 变成最佳/最简单的方法是什么? 问题答案: 这个答案已经提供了一个解决方案,可以解决以下问题: 应当强调的是,由此而来 的 懒任何
我已经了解了一些关于流的知识,并且知道它们可以用来代替循环。对于这个玩具示例,我使用一个图形数据库来存储一组字符串。数据库将它们存储为顶点。我想检索这些顶点,并将它们转换为字符串,而是使用流。每个顶点都有一组性质;我给它一个键,它返回一个值。如果一个顶点具有我正在寻找的属性,我将它添加到列表中。如果没有,我存储顶点ID。 我有一个for循环,但我不确定如何使用流来代替。代码如下:
我正在寻找一种简洁的方法来将转换为或者更具体地说,将迭代器作为流“查看”。 出于性能原因,我希望避免在新列表中出现迭代器的副本: 基于评论中的一些建议,我还尝试使用: 但是,我得到一个(因为没有调用) 我查看了和,但没有找到任何东西。