当前位置: 首页 > 知识库问答 >
问题:

并行处理数据帧[副本]

林鹭洋
2023-03-14

我有一个进程,它要求处理dataframe的每一行,然后向每一行追加一个新值。这是一个很大的数据帧,一次处理一个数据帧需要几个小时。

如果我有一个将每一行发送到一个函数的迭代罗循环,我可以并行处理以加快速度吗?行的结果不相关

基本上我的代码是这样的

for index, row in df.iterrows():
   row['data'] = function[row]

有没有一种简单的方法可以这样做来加快处理速度?

共有1个答案

方博学
2023-03-14

虽然对行进行迭代不是好的实践,并且可以使用Grouby/Transform聚合等替换逻辑,但如果在最坏的情况下您确实需要这样做,请遵循答案。此外,您可能不需要重新实现这里的所有内容,您可以使用Dask这样的库,它构建在pandas之上。

但是为了给出概念,您可以将多处理pool.map)与结合使用。读取chunk中的csv(或如答案末尾所述制作chucks)并将其映射到池,在处理每个chunk时添加新行(或将其添加到列表中并制作新chunk)并从函数返回。

最后,在执行所有池时组合数据流。

import pandas as pd
import numpy as np
import multiprocessing


def process_chunk(df_chunk):
        
        for index, row in df_chunk.reset_index(drop = True).iterrows():
                    #your logic for updating this chunk or making new chunk here
                         
                    print(row)
                    
                    print("index is " + str(index))
        #if you can added to same df_chunk, return it, else if you appended
        #rows to have list_of_rows, make a new df with them and return
        #pd.Dataframe(list_of_rows)  

        return df_chunk   


if __name__ == '__main__':
            #use all available cores , otherwise specify the number you want as an argument,
            #for example if you have 12 cores,  leave 1 or 2 for other things
            pool = multiprocessing.Pool(processes=10) 
            
            results = pool.map(process_chunk, [c for c in pd.read_csv("your_csv.csv", chunksize=7150)])
            pool.close()
            pool.join()
            
            #make new df by concatenating
            
            concatdf = pd.concat(results, axis=0, ignore_index=True)
            

注意:您可以通过相同的逻辑传递chuck,而不是读取csv,为了计算块大小,您可能需要类似round_of((df的长度)/(可用核心数-2))的内容,例如100000/14=round(7142.85)=7150行每块

 results = pool.map(process_chunk,
        [df[c:c+chunk_size] for c in range(0,len(df),chunk_size])
 类似资料:
  • 译者:bat67 最新版会在译者仓库首先同步。 在这个教程里,我们将学习如何使用数据并行(DataParallel)来使用多GPU。 PyTorch非常容易的就可以使用GPU,可以用如下方式把一个模型放到GPU上: device = torch.device("cuda:0") model.to(device) 然后可以复制所有的张量到GPU上: mytensor = my_tensor.to(

  • 我有两个数据帧,都有一个键列,可能有重复项,但数据帧大部分都有相同的重复键。我希望在该键上合并这些数据帧,但这样做的方式是,当两者具有相同的副本时,这些副本将分别合并。此外,如果一个数据帧的键的副本比另一个多,我希望它的值填充为NaN。例如: 我正在尝试获得以下输出 所以基本上,我想把复制的K2键当作K2_1,K2_2。。。然后在数据帧上进行how='outer'合并。你知道我怎样才能做到这一点吗

  • 数据帧: 我有一段代码逐列遍历数据帧: 我需要删除它是NaN的行。我该怎么做?我已经尝试了. isnull()和. Notnull(),但它们返回错误

  • 我有两个不同的DataFrame:A、B。column事件有相似的数据,我正在使用这些数据来比较这两个DataFrames。我想给Dataframe A一个新列dfa.newContext#。 为此,我需要使用Event列。我想遍历Dataframe A以找到事件的匹配项,并将dfb.context#分配给dfa.newcontext# 我认为循环是最好的方法,因为我有一些条件需要检查。 这可能要

  • 我有一个数据帧在下面的形状。

  • 环境:Scala、spark、结构化流媒体、Kafka 我有一个来自Kafka流的DF,具有以下模式 DF: 我希望使用spark并行处理每一行,并使用 我需要从值列中提取值到它自己的数据框中进行处理。我有困难与Dataframe通用行对象... 是否有办法将每个执行器中的单行转换为自己的Dataframe(使用固定模式?)在固定的地点写字?有没有更好的方法来解决我的问题? 编辑澄清: DF im