我multiprocessing.Pool()
用来并行化一些繁重的计算。
目标函数返回大量数据(庞大的列表)。我的RAM用完了。
如果不使用multiprocessing
,我只需yield
将生成的元素依次计算出来,就将目标函数更改为生成器。
我了解多处理不支持生成器-
它等待整个输出并立即返回,对吗?没有屈服。有没有一种方法可以使Pool
工作人员在数据可用时立即产生数据,而无需在RAM中构造整个结果数组?
简单的例子:
def target_fnc(arg):
result = []
for i in xrange(1000000):
result.append('dvsdbdfbngd') # <== would like to just use yield!
return result
def process_args(some_args):
pool = Pool(16)
for result in pool.imap_unordered(target_fnc, some_args):
for element in result:
yield element
这是Python 2.7。
这听起来像是队列的理想用例:http :
//docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-
processes
只需将结果从汇集的工作人员输入队列中,然后将其摄入主服务器即可。
请注意,除非排干队列的速度几乎与工人填充队列的速度一样快,否则您仍然可能会遇到内存压力问题。您可以限制队列大小(队列中可容纳的最大对象数),在这种情况下,池化工作器将阻塞queue.put
语句,直到队列中有可用空间为止。这将限制内存使用量。
但是, 如果执行此操作,则可能是时候重新考虑是否需要全部池化和/或使用更少的工作程序是否有意义。
问题内容: 我有一个Java应用程序,它需要显示大量数据(大约一百万个数据点)。数据并不需要全部同时显示,而仅在用户请求时才显示。该应用程序是桌面应用程序,未与应用程序服务器一起运行或未与任何集中式数据库连接。 我的想法是在计算机上运行数据库并在其中加载数据。在大多数时候,数据库都是只读的,因此我应该能够建立索引以帮助优化查询。如果我在本地系统上运行,则不确定是否应该尝试实现一些缓存(我不确定查询
问题内容: 我正在尝试使用多重处理来并行化应用程序,该处理程序会处理一个非常大的csv文件(64MB至500MB),逐行执行一些工作,然后输出一个固定大小的小文件。 目前,我正在执行,不幸的是,它已完全加载到内存中(我认为),然后我将该列表分成了n个部分,n是我要运行的进程数。然后,我在分类列表上执行。 与单线程,仅打开文件并迭代的方法相比,这似乎具有非常非常糟糕的运行时。有人可以提出更好的解决方
问题内容: 在我的代码中,用户可以上传一个excel文档,希望其中包含电话联系人列表。作为开发人员,我应阅读excel文件,将其转换为dataTable并将其插入数据库。问题是某些客户拥有大量的联系人,例如说5000个和更多的联系人,而当我尝试将这种数据量插入数据库时,它崩溃了,并给了我一个超时异常。避免这种异常的最佳方法是什么?它们的任何代码都可以减少insert语句的时间,从而使用户不必等
好吧,我对使用Scala/Spark还比较陌生,我想知道是否有一种设计模式可以在流媒体应用程序中使用大量数据帧(几个100k)? 在我的示例中,我有一个SparkStreaming应用程序,其消息负载类似于: 因此,当用户id为123的消息传入时,我需要使用特定于相关用户的SparkSQL拉入一些外部数据,并将其本地缓存,然后执行一些额外的计算,然后将新数据持久保存到数据库中。然后对流外传入的每条
我正在尝试用H2O(3.14)训练机器学习模型。我的数据集大小是4Gb,我的计算机RAM是2Gb,带有2G交换,JDK 1.8。参考本文,H2O可以使用2Gb RAM处理大型数据集。 关于大数据和GC的说明:当Java堆太满时,我们会进行用户模式的磁盘交换,即,您使用的大数据比物理DRAM多。我们不会因GC死亡螺旋而死亡,但我们会降级到核心外的速度。我们将以磁盘允许的速度运行。我个人测试过将12G
hive怎么进行增量更新呢?看到很多人是先分区例如根据create_time分区。每天根据create_time 新增数据。但是如果我的数据是会经常变动的呢?例如去年的数据,今年修改了。我应该如何更新这条数据进去。假设我现在数据是上亿的,应该怎么处理。 假设数据初始数据: 1 2024-08-10 15:18:00.000 wang 2 2024-08-10 15:18:00.000 xxx 3