我正在尝试使用 concurrent.futures 模块将一个需要很长时间的进程拆分为多个进程。附上下面的代码
主功能:
with concurrent.futures.ProcessPoolExecutor() as executor:
for idx, score in zip([idx for idx in range(dataframe.shape[0])],executor.map(get_max_fuzzy_score,[dataframe[idx:idx+1] for idx in range(dataframe.shape[0])])):
print('processing '+str(idx+1)+' of '+str(dataframe.shape[0]+1))
dataframe['max_row_score'].iloc[idx] = score
Run Code Online (Sandbox Code Playgroud)
get_max_fuzzy_score 函数:
def get_max_fuzzy_score(picklepath_or_list, df):
import numpy as np
extracted_text_columns = list(df.filter(regex='extracted_text').columns)
data_list = [df[data].iloc[0] for data in extracted_text_columns if not df[data].isnull().values.any()]
try:
size = len(picklepath_or_list)
section_snippet_list = picklepath_or_list
except:
section_snippet_list = pickle.load(open(picklepath_or_list,'rb'))
scores = []
for section_snippet in section_snippet_list:
for data in data_list:
scores.append(fuzz.partial_ratio(data,section_snippet))
score = max(scores)
return score
Run Code Online (Sandbox Code Playgroud)
该函数采用几列的值,并从先前构建的列表中返回最大模糊分数。
这是我得到的错误: …
parallel-processing distributed-computing python-3.x concurrent.futures python-multiprocessing
我正在阅读 Python 文档和PyMotW书籍,试图学习 Async/ Await、Futures 和 Tasks。
通常不需要在应用程序级代码创建 Future 对象。
从未来的文档中,它说明了以下内容:
loop.create_future()
创建一个附加到事件循环的 asyncio.Future 对象。
这是在 asyncio 中创建 Futures 的首选方式。这让第三方事件循环提供 Future 对象的替代实现(具有更好的性能或检测)。
然而,在PyMotW 关于 Future 的章节中,作者创建了一个future这样的对象:
all_done = asyncio.Future()
Run Code Online (Sandbox Code Playgroud)
我猜是因为这本书比 Python 的当前版本稍有落后。为了纠正这个问题,我做了以下事情:
future_Obj = event_loop.create_future()
Run Code Online (Sandbox Code Playgroud)
所以作者的完整代码变成了:
import asyncio
def mark_done(future, result):
print('setting future result to {!r}'.format(result))
future.set_result(result)
event_loop = asyncio.get_event_loop()
try:
future_Obj = event_loop.create_future()
print('scheduling mark_done')
event_loop.call_soon(mark_done, future_Obj, 'the result')
print('entering event loop')
result = event_loop.run_until_complete(future_Obj)
print('returned result: {!r}'.format(result)) …Run Code Online (Sandbox Code Playgroud) 我想使用 python 中的模块并发.futures 并行更新全局变量
事实证明,使用 ThreadPoolExecutor 可以更新我的全局变量,但 CPU 没有充分利用其潜力(始终在 5-10%),速度太慢
ProcessPoolExecutor 可以使用所有核心(100%),但我的全局变量无法更新,因为它们不共享相同的全局变量
如何在并发.futures 模型中使用 ProcessPoolExecutor 共享我的全局变量。非常感谢您的帮助
我正在运行一段 python 代码,其中多个线程通过线程池执行程序运行。每个线程都应该执行一项任务(例如获取网页)。我希望能够做的是终止所有线程,即使其中一个线程失败。例如:
with ThreadPoolExecutor(self._num_threads) as executor:
jobs = []
for path in paths:
kw = {"path": path}
jobs.append(executor.submit(start,**kw))
for job in futures.as_completed(jobs):
result = job.result()
print(result)
def start(*args,**kwargs):
#fetch the page
if(success):
return True
else:
#Signal all threads to stop
Run Code Online (Sandbox Code Playgroud)
有可能这样做吗?除非所有线程都成功,否则线程返回的结果对我来说是无用的,因此即使其中一个失败,我也想节省其余线程的一些执行时间并立即终止它们。实际代码显然正在执行相对冗长的任务,有几个故障点。
我想让我的类方法并行运行,但是它只会产生某种我无法解决的错误。我的代码是:
import concurrent.futures as futures
samples = ['asfd', 'zxcv', 'asf', 'qwer']
class test:
def __init__(self, samples):
maturedb = {}
with futures.ProcessPoolExecutor() as exe:
for samplename, dResult in exe.map(self.make_readdb, samples):
maturedb[samplename] = dResult
print(maturedb)
def make_readdb(self, samplename):
return samplename, 1
test(samples)
Run Code Online (Sandbox Code Playgroud)
如果我在Ubuntu计算机上运行此代码,则会发生如下错误:
Traceback (most recent call last):
File "/usr/lib/python3.2/multiprocessing/queues.py", line 272, in _feedsend(obj)
_pickle.PicklingError: Can't pickle <class 'method'>: attribute lookup builtins.method failed
Run Code Online (Sandbox Code Playgroud)
方法make_readdb只是简化为一个示例,但是它是实际代码中的瓶颈,我需要使其平行。请帮忙。
python parallel-processing class class-method concurrent.futures
我知道Function的apply方法同步返回一个对象,AsyncFunction的应用程序异步运行并返回一个Future.
你能举个例子说明何时选择什么.
我看到的一个代码片段看起来像这样:
Futures.transform(someFuture, new AsyncFunction<A, B>() {
public B apply(A a) {
if (a != null) {
return Futures.immediateFuture(a.getData())
} else {
return Futures.immediateFailedFuture(checkException(());
}
});
});
Run Code Online (Sandbox Code Playgroud)
由于AsyncFunction内的值是立即返回的,为什么需要AsyncFunction呢?或者这只是我遇到的一个不好的例子?
我正在尝试使用Future从光滑动作返回的s将一些基本函数链接在一起,并且我正在尝试一些非常微不足道的绊脚石.
无论是andThen和onSuccess方法需要一个PartialFunction作为参数传递.我的理解可能是相当有缺陷的,但在阅读了匿名函数后,似乎andThen需要知道你的匿名函数,以满足任何Success或Failure输入.
鉴于onSuccess已经只满足了这个Success案例的原因,为什么它仍然需要PartialFunction?
这段代码我指出了我遇到的问题:
val db = Database.forConfig("h2mem1")
try {
val f = db.run(setupCommands)
.onSuccess { println(_) }
Await.ready(f, 10.seconds )
}
finally db.close
Run Code Online (Sandbox Code Playgroud)
我收到编译错误:
[error] found : Unit => Unit
[error] required: PartialFunction[Unit,?]
[error] .onSuccess { println(_) }
Run Code Online (Sandbox Code Playgroud) 我在我的并发编程课程中学习期货.我的教授在她的幻灯片中说明了这一点:
"Valid" futures are future objects associated to a
shared state, and are constructed by calling one of the following functions:
async
promise::get_future
packaged_task::get_future
Run Code Online (Sandbox Code Playgroud)
future对象仅在它们有效时才有用.默认构造的未来对象无效(除非移动分配有效的未来).
我无法理解上述的含义,尤其是"除非移动指定有效的未来"部分.有人可以用简单的术语解释一下,也许还会展示一些示例代码吗?
我试图让asyncio与子进程和限制一起工作.我已经以功能的方式实现了这一点,但是当我尝试在opp风格中实现相同的逻辑时,出现了几个问题.主要是不能腌制coroutine/generator错误.我跟踪了一些theese,但不是全部
import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
from random import randint
class async_runner(object):
def __init__(self):
self.futures = [] # container to store current futures
self.futures_total = []
self.loop = asyncio.get_event_loop() # main event_loop
self.executor = ProcessPoolExecutor()
self.limit = 1
def run(self, func, *args):
temp_loop = asyncio.new_event_loop()
try:
coro = func(*args)
asyncio.set_event_loop(temp_loop)
ret = temp_loop.run_until_complete(coro)
return ret
finally:
temp_loop.close()
def limit_futures(self, futures, limit):
self.futures_total = iter(futures)
self.futures = [future for future in islice(self.futures_total,0,limit)]
async def first_to_finish():
while …Run Code Online (Sandbox Code Playgroud) python python-asyncio concurrent.futures python-multiprocessing
试图以便宜的方式编写基于进程的超时(同步),如下所示:
from concurrent.futures import ProcessPoolExecutor
def call_with_timeout(func, *args, timeout=3):
with ProcessPoolExecutor(max_workers=1) as pool:
future = pool.submit(func, *args)
result = future.result(timeout=timeout)
Run Code Online (Sandbox Code Playgroud)
但似乎timeout传递给future.result的参数并没有像宣传的那样真正起作用。
>>> t0 = time.time()
... call_with_timeout(time.sleep, 2, timeout=3)
... delta = time.time() - t0
... print('wall time:', delta)
wall time: 2.016767978668213
Run Code Online (Sandbox Code Playgroud)
好的。
>>> t0 = time.time()
... call_with_timeout(time.sleep, 5, timeout=3)
... delta = time.time() - t0
... print('wall time:', delta)
# TimeoutError
Run Code Online (Sandbox Code Playgroud)
不正常 - 5 秒后解锁,而不是 3 秒。
相关问题显示了如何使用线程池或使用信号来做到这一点。如何在n秒后使提交到池的进程超时,而不使用任何多处理的 _private …
python ×6
future ×3
c++ ×1
class ×1
class-method ×1
futuretask ×1
guava ×1
java ×1
python-3.x ×1
scala ×1
timeout ×1
validation ×1