当前位置: 首页 > 知识库问答 >
问题:

如何动态定义流数据集的模式以写入CSV?

丌官信厚
2023-03-14

我有一个流数据集,阅读Kafka并试图写到CSV

case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
  .readStream
  .format("kafka")
  .load()
  .select("value")
  .as[Array[Byte]]
  .map(decodeEvent)
val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
     columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)
eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)

有没有一种方法可以通过编程模式和结构化流数据集来实现这一点?

共有1个答案

葛深
2023-03-14

我会使用更简单的方法:

import org.apache.spark.sql.functions._

eventDataset.select(columns.map(
  c => coalesce($"map".getItem(c), lit("")).alias(c)
): _*).writeStream.format("csv").start(path)

但是如果您想要更接近当前解决方案的东西,请跳过RDD转换

import org.apache.spark.sql.catalyst.encoders.RowEncoder

eventDataset.rdd.map(event =>
  Row.fromSeq(columns.map(c => event.getOrElse(c,"")))
)(RowEncoder(schema)).writeStream.format("csv").start(path)
 类似资料:
  • 问题内容: 我有这样的功能: 如何定义的,而不打字了他们中的每一个?当然,我宁愿传递一个参数,但是出于我的目的,这是不可能的。 问题答案: 您是否要在源文件中静态地单独定义它们?那么最好的选择是编写一个脚本来生成它们。 另一方面,如果您希望在运行时使用这些功能,则可以使用更高阶的功能。例如 您可以生成这些列表并再次在运行时进行存储。

  • 由于我刚接触DataFlow/Beam,概念还不太清楚(或者至少我在开始编写代码时有困难),我有很多问题: 什么是最好的模板或模式,我可以用来做到这一点?我应该先执行BigQuery的PTransform(然后执行PubSub的PTransform)还是先执行PubSub的PTransform? 我怎么做加入?比如? PubSub的最佳窗口设置是什么?BigQuery的PTransform部分的窗

  • 我尝试在Google Cloud数据流中运行Apache Beam管道(Python),由Google Cloud Coomposer中的DAG触发。 我的dags文件夹在各自的GCS桶中的结构如下: setup.py是非常基本的,但是根据Apache Beam文档和SO上的答案: 在DAG文件(dataflow.py)中,我设置了选项并将其传递给Dataflow: 在管道文件(pipeline.

  • 作为以下问答的后续问题: https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey 我想与谷歌数据流工程团队(@jkff)确认尤金提出的第三个选项是否有可能使用谷歌数据流:

  • 据Beam网站报道, 通常,对管道代码执行本地单元测试比调试管道的远程执行更快更简单。 出于这个原因,我想对写到Bigtable的Beam/DataFlow应用程序使用测试驱动开发。 但是,在Beam测试文档之后,我遇到了一个僵局--Passert并不有用,因为输出PCollection包含org.apache.hadoop.hbase.client.Put对象,这些对象不重写equals方法。