我正在Scala中实现Spark Streaming,我从Kafka主题中提取JSON字符串,并希望将其加载到数据帧中。有没有一种方法可以让Spark根据RDD[字符串]自己推断模式?
您可以使用下面的代码从Kafka读取消息流,提取JSON值并将它们转换为DataFrame:
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
messages.foreachRDD { rdd =>
//extracting the values only
val df = sqlContext.read.json(rdd.map(x => x._2))
df.show()
}
在Spark 1.4中,您可以尝试以下方法从rdd生成Dataframe:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val yourDataFrame = hiveContext.createDataFrame(yourRDD)
是的,您可以使用以下内容:
sqlContext.read
//.schema(schema) //optional, makes it a bit faster, if you've processed it before you can get the schema using df.schema
.json(jsonRDD) //RDD[String]
我现在也在尝试做同样的事情。不过,我很好奇你是如何从Kafka中获得RDD[String]的,我仍然觉得Spark Kafka只做流式传输,而不是“取出现在里面的东西”一次性批次。:)
我开发了一个Python Kafka生成器,它将多个json记录作为nd-json二进制字符串发送到一个Kafka主题。然后,我尝试用PySpark在Spark结构化流媒体中读取这些消息,如下所示:
OS:Red Hat Enterprise Linux Server 6.5版JRE:Oracle 1.8.0.144-b01 spark-streaming2.11:2.1.0 spark-streaming-kafka-0-102.11:2.1.0 Spark stream Kafka jar由Spark提交-提交到独立的Spark集群,并运行良好几天。但是最近,我们发现没有为流生成新的作业,
我有一个火花数据框,我需要写入MongoDB。我想知道如何在mongoDB中将数据框的一些列写成嵌套/分层JSON。假设数据框有6列,col1,col2,…… col5,col6我想要col1,col2,col3作为第一层次结构,其余列col4到col6作为第二层次结构。像这样的东西, 我如何在pyspark中实现这一点?
我正在做星火流媒体项目。从Kafka那里得到数据。我想限制Spark-Streaming消耗的记录。关于Kafka的资料非常多。我已经使用属性来限制Spark中的记录。但在5分钟的批处理中,我收到了13400条消息。我的星火程序每5分钟不能处理超过1000条消息。Kafka主题有三个分区。我的spark驱动程序内存是5GB,有3个执行器,每个3GB。如何限制Kafka在spark Streamin
本文向大家介绍JavaScript从JSON数据创建数组?,包括了JavaScript从JSON数据创建数组?的使用技巧和注意事项,需要的朋友参考一下 要根据JSON数据创建数组,请使用from JavaScript的概念。假设以下是我们的数据- 以下是根据上述数据创建数组的代码- 示例 要运行上述程序,您需要使用以下命令- 在这里,我的文件名为demo82.js。 输出结果 这将产生以下输出-
我确信这是一个简单的SQLContext问题,但我在Spark docs或Stackoverflow中找不到任何答案 我想从MySQL上的SQL查询创建一个Spark数据框 例如,我有一个复杂的MySQL查询,如 我想要一个带有X、Y和Z列的数据帧 我想出了如何将整个表加载到Spark中,然后可以将它们全部加载,然后在那里进行连接和选择。然而,这是非常低效的。我只想加载SQL查询生成的表。 这是我