我正在使用以下代码将CSV文件拆分为多个块(源自此处)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
但是,无论我选择使用多少个块,似乎块的数量始终保持不变.例如,无论我选择有1个还是10个块,我总是在处理样本文件时获得此输出.理想情况下,我想将文件分块以便公平分配.
请注意,我正在分块的真实文件超过1300万行,这就是为什么我要一块一块地处理它.这是必须的!
6
7
1
...
1
1
94
--- 0.101687192917 …
Run Code Online (Sandbox Code Playgroud) 现在我正在编写一些Python代码来处理大量的twitter文件.这些文件非常大,无法容纳在内存中.为了与他们合作,我基本上有两个选择.
我可以将文件拆分成可以放入内存的较小文件.
我可以逐行处理大文件,所以我永远不需要将整个文件同时放入内存中.我希望后者易于实现.
但是,我想知道在整个文件中读取内存然后从那里操作它是否更快.似乎从磁盘上逐行读取文件似乎很慢.但话说回来,我并不完全理解这些过程在Python中是如何工作的.有谁知道逐行文件读取是否会导致我的代码比我将整个文件读入内存并从那里操作它更慢?
我正在尝试实现该multiprocessing
模块以使用大型csv文件.我正在使用Python 2.7并按照此处的示例进行操作.
我运行了未经修改的代码(为方便起见,下面复制了代码)并注意到函数中的print
语句worker
不起作用.无法print
理解流程和调试.
任何人都可以解释为什么print
不在这里工作?pool.map不执行打印命令吗?我在网上搜索但没有找到任何表明这一点的文件.
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
print(chunk) # <----- nothing prints
print 'working' # <----- nothing prints
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name …
Run Code Online (Sandbox Code Playgroud)