并行python迭代

gre*_*ane 9 python pandas python-multiprocessing

我想基于a中的值创建一个类的实例pandas.DataFrame.我已经失败了.

import itertools
import multiprocessing as mp
import pandas as pd

class Toy:
    id_iter = itertools.count(1)

    def __init__(self, row):
        self.id = self.id_iter.next()
        self.type = row['type']

if __name__ == "__main__":

    table = pd.DataFrame({
        'type': ['a', 'b', 'c'],
        'number': [5000, 4000, 30000]
        })

    for index, row in table.iterrows():
        [Toy(row) for _ in range(row['number'])]
Run Code Online (Sandbox Code Playgroud)

多处理尝试

我已经能够通过添加以下内容来并行化这种(某种程度):

pool = mp.Pool(processes=mp.cpu_count())
m = mp.Manager()
q = m.Queue()

for index, row in table.iterrows():
    pool.apply_async([Toy(row) for _ in range(row['number'])])
Run Code Online (Sandbox Code Playgroud)

如果数字row['number']显着长于长度,这似乎会更快table.但在我的实际情况中,table是数千行,每个row['number']都相对较小.

尝试分解tablecpu_count()块并在表中迭代似乎更聪明.但现在我们处于我的python技能的边缘.

我已经尝试过python解释器对我尖叫的事情,比如:

pool.apply_async(
        for index, row in table.iterrows(): 
        [Toy(row) for _ in range(row['number'])]
        )
Run Code Online (Sandbox Code Playgroud)

还有"无法腌制"的事情

Parallel(n_jobs=4)(
    delayed(Toy)([row for _ in range(row['number'])]) \
            for index, row in table.iterrows()
)
Run Code Online (Sandbox Code Playgroud)

编辑

这可能让我更接近一点,但仍然没有.我在一个单独的函数中创建类实例,

def create_toys(row):
    [Toy(row) for _ in range(row['number'])]

....

Parallel(n_jobs=4, backend="threading")(
    (create_toys)(row) for i, row in table.iterrows()
)
Run Code Online (Sandbox Code Playgroud)

但我被告知'NoneType'对象不可迭代.

max*_*moo 3

我有点不清楚你期望的输出是什么。您只想要一份表格的大列表吗

[Toy(row_1) ... Toy(row_n)]
Run Code Online (Sandbox Code Playgroud)

其中每个都Toy(row_i)以多重性出现row_i.number

根据@JD Long 提到的答案,我认为你可以这样做:

def process(df):
    L = []
    for index, row in table.iterrows():
        L += [Toy(row) for _ in range(row['number'])]
    return L

table = pd.DataFrame({
    'type': ['a', 'b', 'c']*10,
    'number': [5000, 4000, 30000]*10
    })

p = mp.Pool(processes=8)
split_dfs = np.array_split(table,8)    
pool_results = p.map(process, split_dfs)
p.close()
p.join()

# merging parts processed by different processes
result = [a for L in pool_results for a in L]
Run Code Online (Sandbox Code Playgroud)