我想并行化以下代码:
for row in df.iterrows():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
multiprocessing.Pool()
由于每一行都可以独立处理,因此我尝试使用它,但是我不知道如何共享DataFrame。我也不确定这是否是与熊猫并行化的最佳方法。有什么帮助吗?
就像@Khris在他的评论中说的那样,您应该将数据帧分成几个大块,并并行地遍历每个块。您可以将数据帧任意分成随机大小的块,但是根据您计划使用的进程数将数据帧分成大小相等的块更有意义。幸运的是,已经有人想出了如何为我们做这部分工作:
# don't forget to import
import pandas as pd
import multiprocessing
# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()
# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)
# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]
这将创建一个列表,其中包含我们的数据框。现在,我们需要将其与将操纵数据的函数一起传递到池中。
def func(d):
# let's create a function that squares every value in the dataframe
return d * d
# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)
# apply our function to each chunk in the list
result = pool.map(func, chunks)
此时,result
将是一个列表,其中包含每个已被操作的块。在这种情况下,所有值均已平方。现在的问题是原始数据框尚未修改,因此我们必须用池中的结果替换其所有现有值。
for i in range(len(result)):
# since result[i] is just a dataframe
# we can reassign the original dataframe based on the index of each chunk
df.ix[result[i].index] = result[i]
现在,我对数据框进行操作的功能已经过矢量化处理,如果我将其简单地应用于整个数据框而不是拆分成块的话,可能会更快。但是,在您的情况下,您的函数将遍历每个块的每一行,然后返回该块。这使您可以一次处理num_process
行。
def func(d):
for row in d.iterrow():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
# return the chunk!
return d
然后,您可以在原始数据帧中重新分配值,并成功并行化了此过程。
您的最佳性能将取决于此问题的答案。而“所有过程!!!!”
是一个答案,更好的答案更细微。在某一点之后,在一个问题上投入更多的流程实际上会产生超出其价值的开销。这就是阿姆达尔定律。再次,我们很幸运
一个很好的默认值是使用multiprocessing.cpu_count()
,这是的默认行为multiprocessing.Pool
。
根据文档“如果进程为None,则使用cpu_count()返回的数字。”
这就是为什么我一num_processes
开始就设置为multiprocessing.cpu_count()
。这样,如果您使用功能更强大的机器,则无需num_processes
直接更改变量即可从中受益。
问题内容: 我曾经在之后进行并行化,例如: 但是,有没有人想出如何并行化返回DataFrame的函数?如预期,此代码对于失败。 问题答案: 尽管确实应该将其内置到熊猫中,但这似乎可行
问题内容: 我需要将多行合并为单行,这将是简单的带有空间的连接 我希望数据帧像这样转换:(空格分隔)tempx值 我试过的 这行得通,但是它将所有列中的行合并在一起,如下所示: 有什么优雅的解决方案吗? 问题答案: 您可以使用和功能:
我有两排像这样的, 我希望将它们合并为一个单独的: 不知道如何在熊猫身上做到这一点。任何提示都将受到高度赞赏!提前谢谢
问题内容: 如何在两个数据框中找出同名列之间的区别?我的意思是,我有一个名为X的数据框A和一个名为X的数据框B,如果这样做的话,我将获得A和B的通用X值,但是我如何获得“非通用”的X值? 问题答案: 如果将合并类型更改为,这将添加一列以告诉您这些值是否仅是左/左右/右: 然后,您可以在col上过滤结果合并的df : 您也可以使用和否定掩码以查找不在B中的值:
问题内容: Python 3.4和Pandas 0.15.0 df是一个数据框,而col1是一列。使用下面的代码,我正在检查是否存在值10,并将此类值替换为1000。 这是另一个例子。这次,我将基于索引更改col2中的值。 这两种都会产生以下警告: 最后, 这会产生类似的警告,并带有以下建议: 我不确定我是否理解警告中指出的讨论。编写这三行代码的更好方法是什么? 请注意,该操作有效。 问题答案:
问题内容: 我想将 大于任意数(在这种情况下为100)的值替换为(因为如此大的值表示实验失败)。以前,我使用它来替换不需要的值: 但是,出现以下错误: 从这个StackExchange问题来看,有时似乎可以忽略此警告,但是我不能很好地跟踪讨论,无法确定这是否适用于我的情况。警告基本上是让我知道我将覆盖我的某些值吗? 编辑:据我所知,一切都按其应有的方式进行。作为后续措施,我的替换值方法是否非标