我想学习如何使用Python的multiprocessing
包,但我不明白之间的差别map_async
和imap
.我注意到,这两个map_async
和imap
是异步执行的.那我什么时候应该使用另一个呢?我应该如何检索返回的结果map_async
?
我应该使用这样的东西吗?
def test():
result = pool.map_async()
pool.close()
pool.join()
return result.get()
result=test()
for i in result:
print i
Run Code Online (Sandbox Code Playgroud) 使用Python 3的函数注释,可以指定同类列表(或其他集合)中包含的项目类型,以便在PyCharm和其他IDE中进行类型提示?
int列表的伪python代码示例:
def my_func(l:list<int>):
pass
Run Code Online (Sandbox Code Playgroud)
我知道可以使用Docstring ...
def my_func(l):
"""
:type l: list[int]
"""
pass
Run Code Online (Sandbox Code Playgroud)
...但如果有可能,我更喜欢注释样式.
如何await
在构造函数或类体中定义类?
例如我想要的:
import asyncio
# some code
class Foo(object):
async def __init__(self, settings):
self.settings = settings
self.pool = await create_pool(dsn)
foo = Foo(settings)
# it raises:
# TypeError: __init__() should return None, not 'coroutine'
Run Code Online (Sandbox Code Playgroud)
或者类body属性的示例:
class Foo(object):
self.pool = await create_pool(dsn) # Sure it raises syntax Error
def __init__(self, settings):
self.settings = settings
foo = Foo(settings)
Run Code Online (Sandbox Code Playgroud)
我的解决方案(但我希望看到更优雅的方式)
class Foo(object):
def __init__(self, settings):
self.settings = settings
async def init(self):
self.pool = await create_pool(dsn)
foo = Foo(settings)
await foo.init()
Run Code Online (Sandbox Code Playgroud) 几乎每个人都知道他们第一次看到Python中的线程时,GIL会让那些真正希望并行处理的人生活变得悲惨 - 或者至少给它一个机会.
我目前正在研究像Reactor模式这样的实现.实际上我想在一个线程上监听传入的套接字连接,当有人试图连接时,接受该连接并将其传递给另一个线程进行处理.
我(还)不确定我可能会遇到什么样的负担.我知道目前设置的传入消息上限为2MB.从理论上讲,我们每秒可以获得数千(虽然我不知道我们是否已经看到过类似的东西).处理消息所花费的时间并不是非常重要,但显然更快会更好.
我正在研究Reactor模式,并开发了一个使用该multiprocessing
库的小例子(至少在测试中)似乎工作得很好.但是,现在/很快我们将有asyncio库可用,它将为我处理事件循环.
是否有任何东西可以通过组合asyncio
和咬我multiprocessing
?
python multithreading asynchronous multiprocessing python-asyncio
我正在使用I/O非阻塞python服务器Tornado.我有一类GET
请求可能需要很长时间才能完成(想想在5-10秒的范围内).问题是Tornado会阻止这些请求,以便随后的快速请求被保留,直到缓慢的请求完成.
我查看了:https://github.com/facebook/tornado/wiki/Threading-and-concurrency,得出结论我想要#3(其他进程)和#4(其他线程)的某种组合.#4本身有问题,当有另一个线程正在进行"重举"时,我无法获得可靠的控制回ioloop.(我假设这是由于GIL以及heavy_lifting任务具有高CPU负载并且不断控制远离主ioloop的事实,但这是猜测).
所以我一直在原型化如何通过GET
在单独的进程中在这些缓慢的请求中执行"繁重的"任务来解决这个问题,然后在完成该请求的过程中将回调放回到Tornado ioloop中.这释放了ioloop来处理其他请求.
我创建了一个演示可能解决方案的简单示例,但我很想从社区获得反馈.
我的问题有两个方面:如何简化当前的方法?它可能存在哪些陷阱?
利用Tornado的内置asynchronous
装饰器,允许请求保持打开状态并继续ioloop.
使用python的multiprocessing
模块为"繁重的"任务生成一个单独的过程.我首先尝试使用该threading
模块,但无法将任何可靠的放弃控制权交还给ioloop.它似乎mutliprocessing
也会利用多核.
使用threading
正在工作的模块在主ioloop进程中启动一个"观察者"线程,multiprocessing.Queue
以便在完成时查看"繁重"任务的结果.这是必要的,因为我需要一种方法来知道重载任务已经完成,同时仍能通知ioloop此请求现已完成.
确保'观察者'线程经常通过time.sleep(0)
调用放弃对主ioloop循环的控制,以便继续处理其他请求.
当队列中有结果时,从"观察者"线程添加回调,使用tornado.ioloop.IOLoop.instance().add_callback()
该回调记录是从其他线程调用ioloop实例的唯一安全方法.
请务必调用finish()
回调以完成请求并移交回复.
下面是一些显示此方法的示例代码. multi_tornado.py
是实现上述大纲的服务器,call_multi.py
是一个示例脚本,它以两种不同的方式调用服务器来测试服务器.两个测试都调用服务器3个慢GET
请求,然后是20个快速GET
请求.结果显示在打开和不打开线程的情况下运行.
在使用"无线程"运行它的情况下,3个慢速请求阻塞(每个需要花费一点多秒才能完成).20个快速请求中的一些请求在ioloop中的一些慢速请求之间挤压(不完全确定如何发生 - 但可能是我在同一台机器上运行服务器和客户端测试脚本的工件).这里的要点是所有快速请求都被保持不同程度.
如果在启用线程的情况下运行它,则20个快速请求立即首先完成,然后三个慢速请求在几乎同时完成,因为它们各自并行运行.这是期望的行为.三个慢速请求并行完成需要2.5秒 - 而在非线程情况下,三个慢速请求总共需要3.5秒.所以总体上加速了大约35%(我假设由于多核共享).但更重要的是 - 快速请求立即以慢速列表处理.
我对多线程编程没有很多经验 - 所以虽然这看起来很有用,但我很想知道:
有没有更简单的方法来实现这一目标?在这种方法中潜藏着什么怪物?
(注意:未来的权衡可能是使用反向代理运行更多Tornado实例,如nginx进行负载平衡.无论我将使用负载均衡器运行多个实例 - 但我担心只是抛出硬件来解决这个问题因为看起来硬件在阻塞方面与问题直接相关.)
multi_tornado.py
(样本服务器):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous …
Run Code Online (Sandbox Code Playgroud) 我正在尝试以下代码:
import asyncio
@asyncio.coroutine
def func_normal():
print("A")
yield from asyncio.sleep(5)
print("B")
return 'saad'
@asyncio.coroutine
def func_infinite():
i = 0
while i<10:
print("--"+str(i))
i = i+1
return('saad2')
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(func_normal()),
asyncio.async(func_infinite())]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Run Code Online (Sandbox Code Playgroud)
我无法弄清楚如何从这些函数中获取变量中的值.我不能这样做:
asyncio.async(a = func_infinite())
Run Code Online (Sandbox Code Playgroud)
因为这会使这成为一个关键字参数.我该如何完成这项工作?
多处理是python中一个强大的工具,我想更深入地理解它.我想知道何时使用常规 锁和队列以及何时使用多处理管理器在所有进程之间共享这些.
我提出了以下测试场景,其中包含四种不同的多处理条件:
使用池和NO管理器
使用池和管理器
使用单个流程和NO Manager
使用单个进程和Manager
所有条件都执行作业功能the_job
.the_job
由一些由锁固定的印刷组成.此外,函数的输入只是放入队列(以查看它是否可以从队列中恢复).该输入是一个简单的索引idx
从range(10)
在称为主脚本创建start_scenario
(在底部示出).
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' …
Run Code Online (Sandbox Code Playgroud) 我对我编写的一些代码感到非常困惑.我惊讶地发现:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(f, iterable))
Run Code Online (Sandbox Code Playgroud)
和
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(map(lambda x: executor.submit(f, x), iterable))
Run Code Online (Sandbox Code Playgroud)
产生不同的结果.第一个产生任何类型f
返回的列表,第二个产生一个concurrent.futures.Future
对象列表,然后需要使用它们的result()
方法进行评估,以获得f
返回的值.
我主要担心的是,这意味着executor.map
无法利用concurrent.futures.as_completed
,这似乎是一种非常方便的方法来评估我正在进行的数据库长期运行调用的结果.
关于concurrent.futures.ThreadPoolExecutor
对象是如何工作的我一点都不清楚- 天真地,我更喜欢(稍微冗长一点):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
results = [f.result() for f in futures.as_completed(result_futures)]
Run Code Online (Sandbox Code Playgroud)
executor.map
为了利用可能的性能提升,更简洁.我错了吗?
python multithreading python-multithreading python-3.x concurrent.futures
对于ASYNCIO文档给出了如何打印的"Hello World"每两秒钟两个例子: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio-hello-world-callback https://开头docs.python.org/3/library/asyncio-task.html#asyncio-hello-world-coroutine
我可以从解释器运行那些,但如果我这样做,我将无法访问解释器.是否可以在后台运行asyncio事件循环,以便我可以在解释器中输入命令?
我有一个非常小的测试程序,除了执行asyncio
事件循环之外什么都不做:
import asyncio
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)
当我在Linux上运行此程序并按Ctrl+时C,程序将正常终止,但有KeyboardInterrupt
异常.在Windows上按Ctrl+ C什么也没做(用Python 3.4.2测试).即使在Windows上,一个简单的无限循环也能正确time.sleep()
引发KeyboardInterrupt
:
import time
while True:
time.sleep(3600)
Run Code Online (Sandbox Code Playgroud)
为什么asyncio的事件循环会抑制Windows上的KeyboardInterrupt?
python ×10
python-3.x ×4
python-3.4 ×2
asynchronous ×1
concurrency ×1
locking ×1
pycharm ×1
tornado ×1
type-hinting ×1
windows ×1