当前位置: 首页 > 知识库问答 >
问题:

处理scala中spark数据集中的行组

锺离伟彦
2023-03-14

我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。

为了理解,假设我有以下输入。

假设我需要按col1和col2分组,这将给我以下分组

(1, A,1),(1, A,4),(1, A,5)---

(1,B,2)---

(1,C,3),(1,C,6)---

(2,X,7),(2,X,8)---

所以我想把这些组传递给我的函数来执行一些逻辑。现在,假设我在那个方法中求和Col3。(这不是我的要求,但是让我们假设我想在我单独的方法中求和)。生成以下o/p。

如何实现这一点,基于一些建议,我试图研究UDAF,但找不到使用它的方法。请注意,我的真实输入数据集有超过5亿条记录。谢谢

共有1个答案

翟永春
2023-03-14

下面是一个基于您的输入的简单示例,让您开始:

    from pyspark.sql.types import IntegerType
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = SparkSession.builder.getOrCreate()
    
    data = [
        (1, "A",  1),
        (1, "B",  2),
        (1, "C",  3),
        (1, "A",  4),
        (1, "A",  5),
        (1, "C",  6),
        (2, "X",  7),
        (2, "X",  8),
    ]
    
    df = spark.createDataFrame(data, ["col1", "col2", "col3"])
    df.show()
    
    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|   A|   1|
    |   1|   B|   2|
    |   1|   C|   3|
    |   1|   A|   4|
    |   1|   A|   5|
    |   1|   C|   6|
    |   2|   X|   7|
    |   2|   X|   8|
    +----+----+----+
    
    # define your function - pure Python here, no Spark needed
    def dummy_f(xs):
      return sum(xs)
    
    
    # apply your function as UDF - needs input function and return type (integer here)
    (
      df
      .groupBy(F.col("col1"), F.col("col2"))
      .agg(F.collect_list(F.col("col3").cast("int")).alias("col3"))
      .withColumn("col3sum", F.udf(dummy_f, IntegerType())(F.col("col3")))
    ).show()
    
    +----+----+---------+-------+
    |col1|col2|     col3|col3sum|
    +----+----+---------+-------+
    |   1|   A|[1, 4, 5]|     10|
    |   1|   B|      [2]|      2|
    |   1|   C|   [3, 6]|      9|
    |   2|   X|   [7, 8]|     15|
    +----+----+---------+-------+

根据输入函数的需要聚合列是关键。您可以使用create_map创建dict或collect_list,如下所示。

 类似资料:
  • 我有一个特定的要求,其中,我需要检查空的数据文件。如果为空,则填充默认值。这是我尝试过但没有得到我想要的东西。 这个想法是,如果df不是空的,就得到它。如果为空,则填写默认值为零。这似乎不起作用。以下是我得到的。 请帮忙。

  • 我试图在Spark中创建成批的行。为了保持发送到服务的记录数量,我想对项目进行批处理,这样我就可以保持数据发送的速率。对于, 对于给定的我想创建 例如,如果输入有100条记录,那么输出应该像一样,其中每个应该是记录(Person)的列表。 我试过了,但没用。 我想在Hadoop集群上运行此作业。有人能帮我吗?

  • 问题内容: 我使用Scala将PostgreSQL表导入到spark作为数据框。数据框看起来像 我正在将此数据帧转换为log_dt的数据格式为。为此,我使用了以下代码,使用函数将log_dt转换为时间戳格式。 当我使用命令打印以打印tablereader1数据帧时,得到以下结果 如何保留微秒作为时间戳的一部分?任何建议表示赞赏。 问题答案: 毫秒 您可以使用接受Java SimpleDateFor

  • 好吧,我对使用Scala/Spark还比较陌生,我想知道是否有一种设计模式可以在流媒体应用程序中使用大量数据帧(几个100k)? 在我的示例中,我有一个SparkStreaming应用程序,其消息负载类似于: 因此,当用户id为123的消息传入时,我需要使用特定于相关用户的SparkSQL拉入一些外部数据,并将其本地缓存,然后执行一些额外的计算,然后将新数据持久保存到数据库中。然后对流外传入的每条

  • 如何在Scala中对Spark StructType执行常规处理,如按名称选择字段、在映射/列表字段上迭代等? 在spark dataframe中,我有类型为“ArrayType”的列“instances”,具有以下模式: 我需要将ArrayType列“instances”转换为类型为的列“totalExperience” 注:(5 3=8和12 9=21) 等效psuedo代码: 我为此编写了U

  • 问题内容: 当我仔细观察时,我唯一提出的疑问是: 找不到适用于实际参数“ org.apache.spark.unsafe.types.UTF8String”的适用构造函数/方法;候选者为:“ public void sparkSQL.Tweet.setId(long)” 问题答案: 正如@ user9718686所写,id字段具有不同的类型:在json文件和类定义中。当您将其读入时,Spark会从