当前位置: 首页 > 知识库问答 >
问题:

在Spark数据帧行上并行操作

漆雕正奇
2023-03-14

环境:Scala、spark、结构化流媒体、Kafka

我有一个来自Kafka流的DF,具有以下模式

DF:

BATCH ID: 0
+-----------------------+-----+---------+------+
|                  value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}|  A  |        0|     0|
|{"big and nested json"}|  B  |        0|     0|
+-----------------------+-----+---------+------+

我希望使用spark并行处理每一行,并使用

DF.repartition(Number).foreach(row=> processRow(row))

我需要从值列中提取值到它自己的数据框中进行处理。我有困难与Dataframe通用行对象...

是否有办法将每个执行器中的单行转换为自己的Dataframe(使用固定模式?)在固定的地点写字?有没有更好的方法来解决我的问题?

编辑澄清:

DF im接收是使用自spark2以来存在的writeStream功能的forEachBatch功能作为一个批来的。4

目前,将DF拆分为行将使行被平均拆分为我的所有执行器,我想将单个GenericRow对象转换为数据帧,以便使用我创建的函数进行处理

例如,我会将行发送到函数

processRow(row:row)

获取值和主题,并将其转换回单行DF

+-----------------------+-----+
|                  value|topic|
+-----------------------+-----+
|{"big and nested json"}|  A  |
+-----------------------+-----+

以便进一步处理

共有2个答案

诸正谊
2023-03-14

在这种情况下,它更适合使用。映射而不是。foreach。原因是map返回一个新的数据集,而foreach只是一个函数,不返回任何内容。

另一件可以帮助您的事情是解析JSON中的模式。

我最近也有类似的要求。我的JSON对象对于主题AB都有一个类似的模式。如果情况并非如此,您可能需要在下面的解决方案中通过按主题分组来创建多个数据帧

val sanitiseJson: String => String = value => value
  .replace("\\\"", "\"")
  .replace("\\\\", "\\")
  .replace("\n", "")
  .replace("\"{", "{")
  .replace("}\"", "}")

val parsed = df.toJSON
  .map(sanitiseJson)

这将为您提供如下信息:

{
    "value": { ... },
    "topic": "A"
}

然后您可以将其传递到一个新的read函数:

var dfWithSchema = spark.read.json(parsed)

此时,您将访问嵌套JSON中的值:

dfWithSchema.select($"value.propertyInJson")

如果需要,在sanitiseJson方面可以做一些优化。

公西星海
2023-03-14

我猜你一次消耗多个Kafka数据。

首先,您需要为所有Kafka主题准备schema,例如,这里我在value列中使用了两种不同的JSON。

scala> val df = Seq(("""{"name":"Srinivas"}""","A"),("""{"age":20}""","B")).toDF("value","topic")
scala> df.show(false)
+-------------------+-----+
|value              |topic|
+-------------------+-----+
|{"name":"Srinivas"}|A    |
|{"age":20}         |B    |
+-------------------+-----+
scala> import org.apache.spark.sql.types._

主题A的模式

scala> val topicASchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]

主题B的模式

scala> val topicBSchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]

组合主题

scala> val topicSchema = Seq(("A",topicASchema),("B",topicBSchema)) // Adding Topic & Its Schema.

处理数据帧

scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)))
.foreach(_.show(false)) // Using .par & filtering dataframe based on topic & then applying schema to value column.
+----------+-----+
|value     |topic|
+----------+-----+
|[Srinivas]|A    |
+----------+-----+

+-----+-----+
|value|topic|
+-----+-----+
|[20] |B    |
+-----+-----+

写入hdfs

scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)).write.format("json").save(s"/tmp/kafka_data/${d._1}"))

最终数据存储在hdfs中

scala> import sys.process._
import sys.process._

scala> "tree /tmp/kafka_data".!
/tmp/kafka_data
├── A
│   ├── part-00000-1e854106-49de-44b3-ab18-6c98a126c8ca-c000.json
│   └── _SUCCESS
└── B
    ├── part-00000-1bd51ad7-cfb6-4187-a374-4e2d4ce9cc50-c000.json
    └── _SUCCESS

2 directories, 4 files
 类似资料:
  • Narrow转换(映射、过滤器等)的SparkSQL数据帧是否有“spark.default.parallelism”等价物? 显然,RDD和DataFrame之间的分区控制是不同的。数据帧具有spark。sql。洗牌用于控制分区的分区(如果我理解正确的话,则为宽转换)和“spark.default.parallelism”将没有效果。 Spark数据帧洗牌如何影响分区 但洗牌与分区有什么关系呢?

  • 我是新的火花,我想,使用scala,枢轴数据帧的单行如下: 我的旋转数据帧应该如下所示 我尝试使用以下方法,但我不确定我是否正确地得到了聚合表达式:

  • 在scala火花数据帧中是否有的替代方案。我想从火花数据帧的列中选择特定的行。例如,在R等效代码中的第100行

  • 我有一个熊猫数据框,如下所示。 我根据按数据帧分组。分组数据框在概念上如下所示。 现在,我正在寻找一个内置API,它将给我最大作业数的。对于上面的示例,-2具有最大计数。 更新:我希望具有最大作业计数,而不是具有最大作业计数的。对于上述示例,如果,则输出为。这能做到吗?

  • 我有一个进程,它要求处理dataframe的每一行,然后向每一行追加一个新值。这是一个很大的数据帧,一次处理一个数据帧需要几个小时。 如果我有一个将每一行发送到一个函数的迭代罗循环,我可以并行处理以加快速度吗?行的结果不相关 基本上我的代码是这样的 有没有一种简单的方法可以这样做来加快处理速度?

  • 我试图从mysql读取数据,并将其写回s3中的parquet文件,具体分区如下: 我的问题是,它只打开一个到mysql的连接(而不是4个),并且在从mysql获取所有数据之前,它不会写入parquert,因为mysql中的表很大(100M行),进程在OutOfMemory上失败。 有没有办法将Spark配置为打开多个到mysql的连接并将部分数据写入镶木地板?