当前位置: 首页 > 知识库问答 >
问题:

在Spark中处理大量数据框/数据集/RDD

花俊雄
2023-03-14

好吧,我对使用Scala/Spark还比较陌生,我想知道是否有一种设计模式可以在流媒体应用程序中使用大量数据帧(几个100k)?

在我的示例中,我有一个SparkStreaming应用程序,其消息负载类似于:

{"user_id":123, "data":"ABC"}
{"user_id":456, "data":"DEF"}
{"user_id":123, "data":"GHI"}

因此,当用户id为123的消息传入时,我需要使用特定于相关用户的SparkSQL拉入一些外部数据,并将其本地缓存,然后执行一些额外的计算,然后将新数据持久保存到数据库中。然后对流外传入的每条消息重复该过程。

现在我的问题是,我想缓存为每个用户拉入的数据,然后在每次需要为该用户处理来自流的消息时重用用户数据的缓存副本(如果存在)。我有数百万可能的用户,在任何给定时刻都有大约10万个活动用户,我将跨多个(大约50个)执行器处理这些数据。

我知道缓存的数据帧/RDD会耗尽内存,但如果我将每个用户缓存的计算数据存储在映射中,以便在每个执行器上本地快速查找和检索,例如:

Map[id: INT, user_data: DataFrame]

我是否会创建这样一个场景,即我保留对旧数据帧的引用,而这些旧数据帧永远不会得到GC,因为我有对它们的活动引用,并且最终会耗尽内存?

我是否遗漏了一些基本的东西,是否有更好更有效的方法来实现这一点?

谢谢,任何帮助都非常感谢!

共有1个答案

周麒
2023-03-14

如果用户元数据是静态的,我只需保留一个包含所有用户id和元数据的地图,并将其广播给工作人员。当您处理这样的小地图时,这是最有效的解决方案。更困难的情况是,这个“地图”需要及时更新。在这种情况下,我会定期将数据加载到rdd中,并将其作为管道的一部分与流数据连续连接。

 类似资料:
  • 问题内容: 我有一个Java应用程序,它需要显示大量数据(大约一百万个数据点)。数据并不需要全部同时显示,而仅在用户请求时才显示。该应用程序是桌面应用程序,未与应用程序服务器一起运行或未与任何集中式数据库连接。 我的想法是在计算机上运行数据库并在其中加载数据。在大多数时候,数据库都是只读的,因此我应该能够建立索引以帮助优化查询。如果我在本地系统上运行,则不确定是否应该尝试实现一些缓存(我不确定查询

  • 我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (

  • 如果我只有一个内存为25 GB的执行器,并且如果它一次只能运行一个任务,那么是否可以处理(转换和操作)1 TB的数据?如果可以,那么将如何读取它以及中间数据将存储在哪里? 同样对于相同的场景,如果hadoop文件有300个输入拆分,那么RDD中会有300个分区,那么在这种情况下这些分区会在哪里?它会只保留在hadoop磁盘上并且我的单个任务会运行300次吗?

  • 问题内容: 在我的代码中,用户可以上传一个excel文档,希望其中包含电话联系人列表。作为开发人员,我应阅读excel文件,将其转换为dataTable并将其插入数据库。问题是某些客户拥有大量的联系人,例如说5000个和更多的联系人,而当我尝试将这种数据量插入数据库时​​,它崩溃了,并给了我一个超时异常。避免这种异常的最佳方法是什么?它们的任何代码都可以减少insert语句的时间,从而使用户不必等

  • 问题内容: 我用来并行化一些繁重的计算。 目标函数返回大量数据(庞大的列表)。我的RAM用完了。 如果不使用,我只需将生成的元素依次计算出来,就将目标函数更改为生成器。 我了解多处理不支持生成器- 它等待整个输出并立即返回,对吗?没有屈服。有没有一种方法可以使工作人员在数据可用时立即产生数据,而无需在RAM中构造整个结果数组? 简单的例子: 这是Python 2.7。 问题答案: 这听起来像是队列

  • 我正在尝试用H2O(3.14)训练机器学习模型。我的数据集大小是4Gb,我的计算机RAM是2Gb,带有2G交换,JDK 1.8。参考本文,H2O可以使用2Gb RAM处理大型数据集。 关于大数据和GC的说明:当Java堆太满时,我们会进行用户模式的磁盘交换,即,您使用的大数据比物理DRAM多。我们不会因GC死亡螺旋而死亡,但我们会降级到核心外的速度。我们将以磁盘允许的速度运行。我个人测试过将12G