标签: concurrent.futures

AttributeError: 当我在 python 3.6 中尝试并行处理时,模块 'concurrent' 没有属性 'futures'

我正在尝试使用 concurrent.futures 模块将一个需要很长时间的进程拆分为多个进程。附上下面的代码

主功能:

with concurrent.futures.ProcessPoolExecutor() as executor:
for idx, score in zip([idx for idx in range(dataframe.shape[0])],executor.map(get_max_fuzzy_score,[dataframe[idx:idx+1] for idx in range(dataframe.shape[0])])):
    print('processing '+str(idx+1)+' of '+str(dataframe.shape[0]+1))
    dataframe['max_row_score'].iloc[idx] = score
Run Code Online (Sandbox Code Playgroud)

get_max_fuzzy_score 函数:

def get_max_fuzzy_score(picklepath_or_list, df):
import numpy as np
extracted_text_columns = list(df.filter(regex='extracted_text').columns)
data_list = [df[data].iloc[0] for data in extracted_text_columns if not df[data].isnull().values.any()]
try:
    size = len(picklepath_or_list)
    section_snippet_list = picklepath_or_list
except:
    section_snippet_list = pickle.load(open(picklepath_or_list,'rb'))
scores = []
for section_snippet in section_snippet_list:
    for data in data_list:
        scores.append(fuzz.partial_ratio(data,section_snippet))
score = max(scores)

return score
Run Code Online (Sandbox Code Playgroud)

该函数采用几列的值,并从先前构建的列表中返回最大模糊分数。

这是我得到的错误: …

parallel-processing distributed-computing python-3.x concurrent.futures python-multiprocessing

8
推荐指数
1
解决办法
6387
查看次数

正确使用 loop.create_future

我正在阅读 Python 文档和PyMotW书籍,试图学习 Async/ Await、Futures 和 Tasks。

协程和任务文档

通常不需要在应用程序级代码创建 Future 对象。

未来的文档中,它说明了以下内容:

loop.create_future()

创建一个附加到事件循环的 asyncio.Future 对象。

这是在 asyncio 中创建 Futures 的首选方式。这让第三方事件循环提供 Future 对象的替代实现(具有更好的性能或检测)。

然而,在PyMotW 关于 Future 的章节中,作者创建了一个future这样的对象:

all_done = asyncio.Future()
Run Code Online (Sandbox Code Playgroud)

我猜是因为这本书比 Python 的当前版本稍有落后。为了纠正这个问题,我做了以下事情:

future_Obj = event_loop.create_future()
Run Code Online (Sandbox Code Playgroud)

所以作者的完整代码变成了:

import asyncio


def mark_done(future, result):
    print('setting future result to {!r}'.format(result))
    future.set_result(result)


event_loop = asyncio.get_event_loop()

try:

    future_Obj = event_loop.create_future()
    print('scheduling mark_done')
    event_loop.call_soon(mark_done, future_Obj, 'the result')

    print('entering event loop')
    result = event_loop.run_until_complete(future_Obj)
    print('returned result: {!r}'.format(result)) …
Run Code Online (Sandbox Code Playgroud)

python python-asyncio concurrent.futures

8
推荐指数
1
解决办法
3951
查看次数

concurrent.futures.ProcessPoolExecutor() python 中的共享变量

我想使用 python 中的模块并发.futures 并行更新全局变量

事实证明,使用 ThreadPoolExecutor 可以更新我的全局变量,但 CPU 没有充分利用其潜力(始终在 5-10%),速度太慢

ProcessPoolExecutor 可以使用所有核心(100%),但我的全局变量无法更新,因为它们不共享相同的全局变量

如何在并发.futures 模型中使用 ProcessPoolExecutor 共享我的全局变量。非常感谢您的帮助

python concurrent.futures

8
推荐指数
1
解决办法
1万
查看次数

Python ThreadPoolExecutor 终止所有线程

我正在运行一段 python 代码,其中多个线程通过线程池执行程序运行。每个线程都应该执行一项任务(例如获取网页)。我希望能够做的是终止所有线程,即使其中一个线程失败。例如:

with ThreadPoolExecutor(self._num_threads) as executor:
    jobs = []
    for path in paths:
        kw = {"path": path}
        jobs.append(executor.submit(start,**kw))
    for job in futures.as_completed(jobs):
        result = job.result()
        print(result)
def start(*args,**kwargs):
    #fetch the page
    if(success):
        return True
    else:
        #Signal all threads to stop
Run Code Online (Sandbox Code Playgroud)

有可能这样做吗?除非所有线程都成功,否则线程返回的结果对我来说是无用的,因此即使其中一个失败,我也想节省其余线程的一些执行时间并立即终止它们。实际代码显然正在执行相对冗长的任务,有几个故障点。

python multithreading threadpoolexecutor concurrent.futures

8
推荐指数
1
解决办法
2319
查看次数

为什么我不能在类方法中使用python模块current.futures?

我想让我的类方法并行运行,但是它只会产生某种我无法解决的错误。我的代码是:

import concurrent.futures as futures

samples = ['asfd', 'zxcv', 'asf', 'qwer']

class test:
    def __init__(self, samples):
        maturedb = {}
        with futures.ProcessPoolExecutor() as exe:
            for samplename, dResult in exe.map(self.make_readdb, samples):
                maturedb[samplename] = dResult
        print(maturedb)

    def make_readdb(self, samplename):
        return samplename, 1

test(samples)
Run Code Online (Sandbox Code Playgroud)

如果我在Ubuntu计算机上运行此代码,则会发生如下错误:

Traceback (most recent call last):
    File "/usr/lib/python3.2/multiprocessing/queues.py", line 272, in _feedsend(obj)
    _pickle.PicklingError: Can't pickle <class 'method'>: attribute lookup builtins.method failed
Run Code Online (Sandbox Code Playgroud)

方法make_readdb只是简化为一个示例,但是它是实际代码中的瓶颈,我需要使其平行。请帮忙。

python parallel-processing class class-method concurrent.futures

7
推荐指数
1
解决办法
4027
查看次数

在Futures.transform中,使用Function和AsyncFunction有什么区别

我知道Function的apply方法同步返回一个对象,AsyncFunction的应用程序异步运行并返回一个Future.

你能举个例子说明何时选择什么.

我看到的一个代码片段看起来像这样:

Futures.transform(someFuture, new AsyncFunction<A, B>() {
  public B apply(A a) {
    if (a != null) {
      return Futures.immediateFuture(a.getData())
    } else {
      return Futures.immediateFailedFuture(checkException(());
    }
  });
});
Run Code Online (Sandbox Code Playgroud)

由于AsyncFunction内的值是立即返回的,为什么需要AsyncFunction呢?或者这只是我遇到的一个不好的例子?

java future guava concurrent.futures

7
推荐指数
2
解决办法
8077
查看次数

为什么Future.onSuccess需要部分功能

我正在尝试使用Future从光滑动作返回的s将一些基本函数链接在一起,并且我正在尝试一些非常微不足道的绊脚石.

无论是andThenonSuccess方法需要一个PartialFunction作为参数传递.我的理解可能是相当有缺陷的,但在阅读了匿名函数后,似乎andThen需要知道你的匿名函数,以满足任何SuccessFailure输入.

鉴于onSuccess已经只满足了这个Success案例的原因,为什么它仍然需要PartialFunction

这段代码我指出了我遇到的问题:

val db = Database.forConfig("h2mem1")

try {
  val f = db.run(setupCommands)
    .onSuccess { println(_) }

  Await.ready(f, 10.seconds )
}
finally db.close
Run Code Online (Sandbox Code Playgroud)

我收到编译错误:

[error]  found   : Unit => Unit
[error]  required: PartialFunction[Unit,?]
[error]         .onSuccess { println(_) }
Run Code Online (Sandbox Code Playgroud)

scala future concurrent.futures

7
推荐指数
1
解决办法
1845
查看次数

有效期货与违约构造期货

我在我的并发编程课程中学习期货.我的教授在她的幻灯片中说明了这一点:

"Valid" futures are future objects associated to a 
shared state, and are constructed by calling one of the following functions:

async
promise::get_future
packaged_task::get_future
Run Code Online (Sandbox Code Playgroud)

future对象仅在它们有效时才有用.默认构造的未来对象无效(除非移动分配有效的未来).

我无法理解上述的含义,尤其是"除非移动指定有效的未来"部分.有人可以用简单的术语解释一下,也许还会展示一些示例代码吗?

c++ validation future futuretask concurrent.futures

7
推荐指数
1
解决办法
840
查看次数

在类中使用ProcessPoolExecutor时,无法pickle coroutine对象

我试图让asyncio与子进程和限制一起工作.我已经以功能的方式实现了这一点,但是当我尝试在opp风格中实现相同的逻辑时,出现了几个问题.主要是不能腌制coroutine/generator错误.我跟踪了一些theese,但不是全部

import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
from random import randint

class async_runner(object):
    def __init__(self):
        self.futures = [] # container to store current futures
        self.futures_total = []
        self.loop = asyncio.get_event_loop() # main event_loop
        self.executor = ProcessPoolExecutor()
        self.limit = 1

    def run(self, func, *args):
        temp_loop = asyncio.new_event_loop()
        try:
            coro = func(*args)
            asyncio.set_event_loop(temp_loop)
            ret = temp_loop.run_until_complete(coro)
            return ret
        finally:
            temp_loop.close()
    def limit_futures(self, futures, limit):
        self.futures_total = iter(futures)
        self.futures = [future for future in islice(self.futures_total,0,limit)]
        async def first_to_finish():
            while …
Run Code Online (Sandbox Code Playgroud)

python python-asyncio concurrent.futures python-multiprocessing

7
推荐指数
1
解决办法
2024
查看次数

任何 concurrent.futures 超时实际工作?

试图以便宜的方式编写基于进程的超时(同步),如下所示:

from concurrent.futures import ProcessPoolExecutor

def call_with_timeout(func, *args, timeout=3):
    with ProcessPoolExecutor(max_workers=1) as pool:
        future = pool.submit(func, *args)
        result = future.result(timeout=timeout)
Run Code Online (Sandbox Code Playgroud)

但似乎timeout传递给future.result的参数并没有像宣传的那样真正起作用。

>>> t0 = time.time()
... call_with_timeout(time.sleep, 2, timeout=3)
... delta = time.time() - t0
... print('wall time:', delta)
wall time: 2.016767978668213
Run Code Online (Sandbox Code Playgroud)

好的。

>>> t0 = time.time()
... call_with_timeout(time.sleep, 5, timeout=3)
... delta = time.time() - t0
... print('wall time:', delta)
# TimeoutError
Run Code Online (Sandbox Code Playgroud)

不正常 - 5 秒后解锁,而不是 3 秒。

相关问题显示了如何使用线程池或使用信号来做到这一点。如何在n秒后使提交到池的进程超时,而不使用任何多处理的 _private …

python timeout multiprocessing concurrent.futures

7
推荐指数
1
解决办法
2856
查看次数