Ali*_*ani 11 python queue concurrency multithreading python-3.x
使用Python 3的concurrent.futures模块进行并行工作相当容易,如下所示.
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
for future in concurrent.futures.as_completed(future_to):
data = future.result()
Run Code Online (Sandbox Code Playgroud)
将项目插入和检索到队列中也非常方便.
q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
q.get()
Run Code Online (Sandbox Code Playgroud)
我有一个在后台运行的脚本,用于监听更新.现在,理论上假设,当这些更新到达时,我会将它们排队并使用它们同时处理它们ThreadPoolExecutor.
现在,单独地说,所有这些组件都是孤立的,并且有意义,但我如何一起使用它们呢?我不知道是否可以ThreadPoolExecutor实时从队列中提取工作,除非预先确定的数据是什么?
简而言之,我想做的就是,每秒接收4条消息的更新,将它们推入队列,然后让我的concurrent.futures对它们进行处理.如果我不这样做,那么我会陷入一种缓慢的顺序方法.
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
Run Code Online (Sandbox Code Playgroud)
列表URLS是固定的.是否有可能实时提供此列表并让工作人员在它们来时处理它,可能是为了管理目的从队列中进行处理?我对我的方法是否真的有可能感到困惑?
Ste*_*uch 12
Python文档中的示例,扩展为从队列中获取其工作.需要注意的是,此代码使用concurrent.futures.wait而不是concurrent.futures.as_completed允许在等待其他工作完成时启动新工作.
import concurrent.futures
import urllib.request
import time
import queue
q = queue.Queue()
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def feed_the_workers(spacing):
""" Simulate outside actors sending in work to do, request each url twice """
for url in URLS + URLS:
time.sleep(spacing)
q.put(url)
return "DONE FEEDING"
def load_url(url, timeout):
""" Retrieve a single page and report the URL and contents """
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# start a future for a thread which sends work in through the queue
future_to_url = {
executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}
while future_to_url:
# check for status of the futures which are currently working
done, not_done = concurrent.futures.wait(
future_to_url, timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED)
# if there is incoming work, start a new future
while not q.empty():
# fetch a url from the queue
url = q.get()
# Start the load operation and mark the future with its URL
future_to_url[executor.submit(load_url, url, 60)] = url
# process any completed futures
for future in done:
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
if url == 'FEEDER DONE':
print(data)
else:
print('%r page is %d bytes' % (url, len(data)))
# remove the now completed future
del future_to_url[future]
Run Code Online (Sandbox Code Playgroud)
每次取url两次输出:
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes
Run Code Online (Sandbox Code Playgroud)
在工作中,我发现了一种情况,我想对无限的数据流进行并行工作。我创建了一个小型图书馆,灵感来自 Stephen Rauch 已经提供的优秀答案。
我最初通过考虑两个单独的线程来解决这个问题,一个将工作提交到队列,另一个监视队列中是否有任何已完成的任务,并为新工作的进入腾出更多空间。这类似于 Stephen Rauch 提出的,其中他使用feed_the_workers在单独线程中运行的函数来使用流。
与我的一位同事交谈时,他帮助我意识到,如果您定义一个缓冲迭代器,允许您控制每次准备好从输入流中释放多少元素,那么您可以在单个线程中完成所有工作向线程池提交更多工作。
所以我们介绍BufferedIter类
class BufferedIter(object):
def __init__(self, iterator):
self.iter = iterator
def nextN(self, n):
vals = []
for _ in range(n):
vals.append(next(self.iter))
return vals
Run Code Online (Sandbox Code Playgroud)
这允许我们以下列方式定义流处理器
import logging
import queue
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
level = logging.DEBUG
log = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
handler.setLevel(level)
log.addHandler(handler)
log.setLevel(level)
WAIT_SLEEP = 1 # second, adjust this based on the timescale of your tasks
def stream_processor(input_stream, task, num_workers):
# Use a queue to signal shutdown.
shutting_down = queue.Queue()
def shutdown(signum, frame):
log.warning('Caught signal %d, shutting down gracefully ...' % signum)
# Put an item in the shutting down queue to signal shutdown.
shutting_down.put(None)
# Register the signal handler
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
def is_shutting_down():
return not shutting_down.empty()
futures = dict()
buffer = BufferedIter(input_stream)
with ThreadPoolExecutor(num_workers) as executor:
num_success = 0
num_failure = 0
while True:
idle_workers = num_workers - len(futures)
if not is_shutting_down():
items = buffer.nextN(idle_workers)
for data in items:
futures[executor.submit(task, data)] = data
done, _ = wait(futures, timeout=WAIT_SLEEP, return_when=ALL_COMPLETED)
for f in done:
data = futures[f]
try:
f.result(timeout=0)
except Exception as exc:
log.error('future encountered an exception: %r, %s' % (data, exc))
num_failure += 1
else:
log.info('future finished successfully: %r' % data)
num_success += 1
del futures[f]
if is_shutting_down() and len(futures) == 0:
break
log.info("num_success=%d, num_failure=%d" % (num_success, num_failure))
Run Code Online (Sandbox Code Playgroud)
下面我们举例说明如何使用流处理器
import itertools
def integers():
"""Simulate an infinite stream of work."""
for i in itertools.count():
yield i
def task(x):
"""The task we would like to perform in parallel.
With some delay to simulate a time consuming job.
With a baked in exception to simulate errors.
"""
time.sleep(3)
if x == 4:
raise ValueError('bad luck')
return x * x
stream_processor(integers(), task, num_workers=3)
Run Code Online (Sandbox Code Playgroud)
此示例的输出如下所示
2019-01-15 22:34:40,193 future finished successfully: 1
2019-01-15 22:34:40,193 future finished successfully: 0
2019-01-15 22:34:40,193 future finished successfully: 2
2019-01-15 22:34:43,201 future finished successfully: 5
2019-01-15 22:34:43,201 future encountered an exception: 4, bad luck
2019-01-15 22:34:43,202 future finished successfully: 3
2019-01-15 22:34:46,208 future finished successfully: 6
2019-01-15 22:34:46,209 future finished successfully: 7
2019-01-15 22:34:46,209 future finished successfully: 8
2019-01-15 22:34:49,215 future finished successfully: 11
2019-01-15 22:34:49,215 future finished successfully: 10
2019-01-15 22:34:49,215 future finished successfully: 9
^C <=== THIS IS WHEN I HIT Ctrl-C
2019-01-15 22:34:50,648 Caught signal 2, shutting down gracefully ...
2019-01-15 22:34:52,221 future finished successfully: 13
2019-01-15 22:34:52,222 future finished successfully: 14
2019-01-15 22:34:52,222 future finished successfully: 12
2019-01-15 22:34:52,222 num_success=14, num_failure=1
Run Code Online (Sandbox Code Playgroud)