多处理/for循环随机跳过元素

cod*_*ous 6 multiprocessing python-3.x

该数据集每对都有数十亿个数据点。我尝试了多处理循环以使其更快。为什么多重处理/for 循环会跳过对中的一些元素?一旦我再次运行,这会随机跳过一些其他名称,并且代码结束。


import pandas as pd
import pickle
import time
import concurrent.futures

start = time.perf_counter()
pairs = ['GBPUSD', 'AUDUSD', 'EURUSD', 'EURJPY', 'GBPJPY', 'USDJPY', 'USDCAD', 'EURGBP']


def pickling_joined(p):
    df = pd.read_csv(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_joined\\{p}.csv')
    df['LTP'] = (df['Bid'] + df['Ask']) / 2
    print(f'\n=====>> Converting Date format for {p} ....')
    df['Date'] = df['Date'].apply(pd.to_datetime)
    print(f'\n=====>> Date format converted for {p} ....')
    df.set_index('Date', inplace=True)
    df = pd.DataFrame(df)
    with open(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_pickled\\{p}.pkl', 'wb') as pickle_file:
        pickle.dump(df, pickle_file)
    print(f'\n=====>> Pickling done for {p} !!!')


if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(pickling_joined, pairs)
    finish = time.perf_counter()
    print(f'Finished in {finish - start} seconds')
Run Code Online (Sandbox Code Playgroud)

Sud*_*osh 6

Python 不能很好地处理大文件的线程/多重处理,我在这里推荐 DASK。DASK 使用集群,其工作方式类似于多处理,需要更少的时间,然后您还可以使用多处理来更快地运行它。

def pickling_joined(p):
    df = dd.read_csv(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_joined\\{p}.csv')
    df['LTP'] = (df['Bid'] + df['Ask']) / 2
    print(f'\n=====>> Converting Date format for {p} ....')
    df['Date'] = dd.to_datetime(df.Date)
    print(f'\n=====>> Date format converted for {p} ....')
    df = df.set_index('Date', sorted=True)
    df = df.compute()
    with open(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_pickled\\{p}.pkl', 'wb') as pickle_file:
        pickle.dump(df, pickle_file)
    print(f'\n=O=O=O=O=O>> Pickling done for {p} !!!')


if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(pickling_joined, pairs)
    finish = time.perf_counter()
    print(f'\nFinished in {finish - start} seconds')
Run Code Online (Sandbox Code Playgroud)