Gre*_*dot 6 python csv multithreading executor
我是Python的多线程新手,目前正在编写一个附加到csv文件的脚本.如果我要将多个线程提交给一个concurrent.futures.ThreadPoolExecutor
将行附加到csv文件的行.如果附加是这些线程唯一与文件相关的操作,我该怎么做才能保证线程安全?
我的代码的简化版本:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for count,ad_id in enumerate(advertisers):
downloadFutures.append(executor.submit(downloadThread, arguments.....))
time.sleep(random.randint(1,3))
Run Code Online (Sandbox Code Playgroud)
我的线程类是:
def downloadThread(arguments......):
#Some code.....
writer.writerow(re.split(',', line.decode()))
Run Code Online (Sandbox Code Playgroud)
我应该设置一个单独的单线程执行程序来处理写入,还是担心我是否只是附加?
编辑:我应该详细说明,当写入操作发生时,下一次附加文件之间的分钟数差别很大,我只关心在测试我的脚本时没有发生这种情况,我宁愿为此加以覆盖.
Cla*_*diu 11
我不确定是否csvwriter
是线程安全的.该文件没有明确规定,因此是安全的,如果多个线程使用同一个对象,你应该保护的使用有threading.Lock
:
# create the lock
import threading
csv_writer_lock = threading.Lock()
def downloadThread(arguments......):
# pass csv_writer_lock somehow
# Note: use csv_writer_lock on *any* access
# Some code.....
with csv_writer_lock:
writer.writerow(re.split(',', line.decode()))
Run Code Online (Sandbox Code Playgroud)
话虽这么说,downloadThread
向执行程序提交写入任务可能确实更优雅,而不是明确地使用这样的锁.
kun*_*phu 10
Way-late-to-the-party注意:您可以通过让单个写入器从共享队列中消费,并通过执行处理的线程将行推送到队列来以不同的方式处理此问题,而无需锁定。
from threading import Thread
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
# CSV writer setup goes here
queue = Queue()
def consume():
while True:
if not queue.empty():
i = queue.get()
# Row comes out of queue; CSV writing goes here
print(i)
if i == 4999:
return
consumer = Thread(target=consume)
consumer.setDaemon(True)
consumer.start()
def produce(i):
# Data processing goes here; row goes into queue
queue.put(i)
with ThreadPoolExecutor(max_workers=10) as executor:
for i in range(5000):
executor.submit(produce, i)
consumer.join()
Run Code Online (Sandbox Code Playgroud)
这是一些代码,它还处理了令人头疼的 unicode 问题:
def ensure_bytes(s):
return s.encode('utf-8') if isinstance(s, unicode) else s
class ThreadSafeWriter(object):
'''
>>> from StringIO import StringIO
>>> f = StringIO()
>>> wtr = ThreadSafeWriter(f)
>>> wtr.writerow(['a', 'b'])
>>> f.getvalue() == "a,b\\r\\n"
True
'''
def __init__(self, *args, **kwargs):
self._writer = csv.writer(*args, **kwargs)
self._lock = threading.Lock()
def _encode(self, row):
return [ensure_bytes(cell) for cell in row]
def writerow(self, row):
row = self._encode(row)
with self._lock:
return self._writer.writerow(row)
def writerows(self, rows):
rows = (self._encode(row) for row in rows)
with self._lock:
return self._writer.writerows(rows)
# example:
with open('some.csv', 'w') as f:
writer = ThreadSafeWriter(f)
writer.write([u'??', 'bar'])
Run Code Online (Sandbox Code Playgroud)
更详细的解决方案在这里
归档时间: |
|
查看次数: |
5890 次 |
最近记录: |