当前位置: 首页 > 知识库问答 >
问题:

带有udf的Pyspark groupby:在本地机器上的性能较差

史高阳
2023-03-14

我正在尝试对由几个每日文件(每个15GB)组成的巨大数据集进行一些分析。为了更快,仅出于测试目的,我创建了一个非常小的数据集,其中包括所有相关场景。我必须分析每个用户的正确操作顺序(即类似于日志或审计)。

为了做到这一点,我定义了一个udf函数,然后应用了一个groupby。下面的代码重现了我的用例:

import pyspark
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import time
sc = SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession.builder.appName('example').getOrCreate()

d = spark.createDataFrame(
    [(133515, "user1", 100, 'begin'),
     (133515, "user1", 125, 'ok'),
     (133515, "user1", 150, 'ok'),
     (133515, "user1", 200, 'end'),
     (133515, "user1", 250, 'begin'),
     (133515, "user1", 300, 'end'),
     (133515, "user1", 310, 'begin'),
     (133515, "user1", 335, 'ok'),
     (133515, "user1", 360, 'ok'),
     # user1 missing END and STOPPED
     (789456, "user2", 150, 'begin'),
     (789456, "user2", 175, 'ok'),
     (789456, "user2", 200, 'end'),
     # user2 stopped
     (712346, "user3", 100, 'begin'),
     (712346, "user3", 125, 'ok'),
     (712346, "user3", 150, 'ok'),
     (712346, "user3", 200, 'end'),
     #user3 stopped
     (789456, "user4", 150, 'begin'),
     (789456, "user4", 300, 'end'),
     (789456, "user4", 350, 'begin'),
     (789456, "user4", 375, 'ok'),
     (789456, "user4", 450, 'end'),
     (789456, "user4", 475, 'ok'),
     #user4 missing BEGIN but ALIVE

    ], ("ID", "user", "epoch", "ACTION")).orderBy(F.col('epoch'))
d.show()
zip_lists = F.udf(lambda x, y: [list(z) for z in zip(x, y)], ArrayType(StringType()))

start=time.time()
d2 = d.groupBy(F.col('ID'), F.col('user'))\
.agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))
d2.show(50, False)
end = time.time()
print(end-start)

这给我带来了以下结果:

+------+-----+--------------------------------------------------------------------------------------------------------------+
|ID    |user |couples                                                                                                       |
+------+-----+--------------------------------------------------------------------------------------------------------------+
|789456|user4|[[150, begin], [300, end], [350, begin], [375, ok], [450, end], [475, ok]]                                    |
|712346|user3|[[100, begin], [125, ok], [150, ok], [200, end]]                                                              |
|133515|user1|[[100, begin], [125, ok], [150, ok], [200, end], [250, begin], [300, end], [310, begin], [335, ok], [360, ok]]|
|789456|user2|[[150, begin], [175, ok], [200, end]]                                                                         |
+------+-----+--------------------------------------------------------------------------------------------------------------+

189.9082863330841

是不是太慢了?

我正在用一台现代笔记本电脑和康达。我使用conda navigator安装了pyspark。

我做错什么了吗?对于如此小的数据集来说,这太多了

共有1个答案

邢臻
2023-03-14

我没有对两列进行聚合,而是尝试创建一个新列并对其进行收集:

start=time.time()

d2 = d.groupBy(F.col('ID'), F.col('user'))\
      .agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))\
      .collect()

end = time.time()
print('first solution:', end-start)



start = time.time()

d3 = d.select(d.ID, d.user, F.struct([d.epoch, d.ACTION]).alias('couple'))
d4 = d3.groupBy(d3.ID, d3.user)\
       .agg(F.collect_list(d3.couple).alias('couples'))\
       .collect()

end = time.time()
print('second solution:', end-start)

在我的机器上,此更改使结果更好!: D:

first solution: 2.247227907180786
second solution: 0.8280930519104004
 类似资料:
  • 问题内容: 我正在尝试建立一个PHP网站,并且想要测试我的PHP文件而不将其上传到主机。在上载它们之前,基本上在我自己的机器上对其进行测试。我怎么做? 问题答案: 安装并运行XAMPP:http : //www.apachefriends.org/en/xampp.html

  • 我一直在尝试用py函数在pyspark中实现udf,如下所示: 它采用了我之前训练过的bin模型。 input_text列包含普通文本,df是包含整个数据的数据框。 我在哪里得到以下错误: ​ Fasttext当前正在运行,python函数在同一个笔记本上运行没有任何问题。 谢谢你的帮助,

  • 我有一个RMI服务器在本地主机上运行,导出类型的对象: 然后我想启动一个客户端并获取该对象的存根。似乎找到了注册表,但随后在块中抛出了: java.rmi.NotBoundexception:M在sun.rmi.registry.registryimpl.lookup(registryimpl.java:136)在sun.rmi.server.unicastserverref.olddispatc

  • 设置:NEO4J官方docker图像 neo4j_auth none 端口localhost:7474 localhost:7687 版本neo4j-community-4.3.3-unix.tar.gz NODEJS图像 null 我可以通过http://localhost:7474/browser/远程到neo4j映像,所以它正在运行。 我不能使用服务器映像调用本地neo4j实例。 当我调用服

  • 我已经在SO上浏览了近10篇类似的帖子,但我仍然对我得到的结果感到困惑:在42K文档集合和19条记录集合之间的单个$查找聚合上,在外部字段上进行排序需要5秒。也就是798K的总向量积。 不幸的是,在这里,非规范化不是一个很好的选择,因为“to”集合中的文档被大量共享,并且在进行更改时需要在整个数据库中进行大量更新。 话虽如此,我似乎不明白为什么接下来要花这么长时间。我觉得我一定做错了什么。 上下文