假设我有一个非常大的列表,我正在执行这样的操作:
for item in items:
try:
api.my_operation(item)
except:
print 'error with item'
Run Code Online (Sandbox Code Playgroud)
我的问题有两个方面:
我想使用多线程一次启动一堆api.my_operations,这样我就可以同时处理5个或10个甚至100个项目.
如果my_operation()返回一个异常(因为我可能已经处理过该项) - 那没关系.它不会破坏任何东西.循环可以继续到下一个项目.
注意:这适用于Python 2.7.3
我正在使用I/O非阻塞python服务器Tornado.我有一类GET请求可能需要很长时间才能完成(想想在5-10秒的范围内).问题是Tornado会阻止这些请求,以便随后的快速请求被保留,直到缓慢的请求完成.
我查看了:https://github.com/facebook/tornado/wiki/Threading-and-concurrency,得出结论我想要#3(其他进程)和#4(其他线程)的某种组合.#4本身有问题,当有另一个线程正在进行"重举"时,我无法获得可靠的控制回ioloop.(我假设这是由于GIL以及heavy_lifting任务具有高CPU负载并且不断控制远离主ioloop的事实,但这是猜测).
所以我一直在原型化如何通过GET在单独的进程中在这些缓慢的请求中执行"繁重的"任务来解决这个问题,然后在完成该请求的过程中将回调放回到Tornado ioloop中.这释放了ioloop来处理其他请求.
我创建了一个演示可能解决方案的简单示例,但我很想从社区获得反馈.
我的问题有两个方面:如何简化当前的方法?它可能存在哪些陷阱?
利用Tornado的内置asynchronous装饰器,允许请求保持打开状态并继续ioloop.
使用python的multiprocessing模块为"繁重的"任务生成一个单独的过程.我首先尝试使用该threading模块,但无法将任何可靠的放弃控制权交还给ioloop.它似乎mutliprocessing也会利用多核.
使用threading正在工作的模块在主ioloop进程中启动一个"观察者"线程,multiprocessing.Queue以便在完成时查看"繁重"任务的结果.这是必要的,因为我需要一种方法来知道重载任务已经完成,同时仍能通知ioloop此请求现已完成.
确保'观察者'线程经常通过time.sleep(0)调用放弃对主ioloop循环的控制,以便继续处理其他请求.
当队列中有结果时,从"观察者"线程添加回调,使用tornado.ioloop.IOLoop.instance().add_callback()该回调记录是从其他线程调用ioloop实例的唯一安全方法.
请务必调用finish()回调以完成请求并移交回复.
下面是一些显示此方法的示例代码. multi_tornado.py是实现上述大纲的服务器,call_multi.py是一个示例脚本,它以两种不同的方式调用服务器来测试服务器.两个测试都调用服务器3个慢GET请求,然后是20个快速GET请求.结果显示在打开和不打开线程的情况下运行.
在使用"无线程"运行它的情况下,3个慢速请求阻塞(每个需要花费一点多秒才能完成).20个快速请求中的一些请求在ioloop中的一些慢速请求之间挤压(不完全确定如何发生 - 但可能是我在同一台机器上运行服务器和客户端测试脚本的工件).这里的要点是所有快速请求都被保持不同程度.
如果在启用线程的情况下运行它,则20个快速请求立即首先完成,然后三个慢速请求在几乎同时完成,因为它们各自并行运行.这是期望的行为.三个慢速请求并行完成需要2.5秒 - 而在非线程情况下,三个慢速请求总共需要3.5秒.所以总体上加速了大约35%(我假设由于多核共享).但更重要的是 - 快速请求立即以慢速列表处理.
我对多线程编程没有很多经验 - 所以虽然这看起来很有用,但我很想知道:
有没有更简单的方法来实现这一目标?在这种方法中潜藏着什么怪物?
(注意:未来的权衡可能是使用反向代理运行更多Tornado实例,如nginx进行负载平衡.无论我将使用负载均衡器运行多个实例 - 但我担心只是抛出硬件来解决这个问题因为看起来硬件在阻塞方面与问题直接相关.)
multi_tornado.py (样本服务器):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous …Run Code Online (Sandbox Code Playgroud) 关于将线程设置为守护进程意味着什么,我有点困惑.文档说明了这一点:
线程可以标记为"守护程序线程".这个标志的意义在于,当只剩下守护进程线程时,整个Python程序都会退出.初始值继承自创建线程.可以通过守护程序属性设置该标志.
我不确定是什么让这与普通线程不同.这是说这个程序永远不会完成吗?
def threadfunc():
while True:
time.sleep(1)
threading.Thread(target=threadfunc).start()
Run Code Online (Sandbox Code Playgroud)
即使主线程完成它的执行.而
def threadfunc():
while True:
time.sleep(1)
th = threading.Thread(target=threadfunc)
th.daemon = True
th.start()
Run Code Online (Sandbox Code Playgroud)
马上完成?
我问,因为我有一个情况,在我的主线程中我正在调用sys.exit(),并且进程只是挂起而我的其他线程正在运行,因为我可以看到日志.这与使用线程活动调用的sys.exit()有什么关系吗?
某些功能应在Web服务器上异步运行.发送电子邮件或数据后处理是典型的用例.
编写装饰器函数以异步运行函数的最佳(或最pythonic)方法是什么?
我的设置很常见:Python,Django,Gunicorn或Waitress,AWS EC2标准Linux
例如,这是一个开始:
from threading import Thread
def postpone(function):
def decorator(*args, **kwargs):
t = Thread(target = function, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return decorator
Run Code Online (Sandbox Code Playgroud)
所需用法:
@postpone
def foo():
pass #do stuff
Run Code Online (Sandbox Code Playgroud) python django multithreading decorator python-multithreading
我想创建一个运行多个轻量级线程的程序,但是将其自身限制为一个恒定的,预定义数量的并发运行任务,就像这样(但没有竞争条件的风险):
import threading
def f(arg):
global running
running += 1
print("Spawned a thread. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
running -= 1
print("Done")
running = 0
while True:
if running < 8:
arg = get_task()
threading.Thread(target=f, args=[arg]).start()
Run Code Online (Sandbox Code Playgroud)
实现这一目标的最安全/最快的方法是什么?
我对我编写的一些代码感到非常困惑.我惊讶地发现:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(f, iterable))
Run Code Online (Sandbox Code Playgroud)
和
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(map(lambda x: executor.submit(f, x), iterable))
Run Code Online (Sandbox Code Playgroud)
产生不同的结果.第一个产生任何类型f返回的列表,第二个产生一个concurrent.futures.Future对象列表,然后需要使用它们的result()方法进行评估,以获得f返回的值.
我主要担心的是,这意味着executor.map无法利用concurrent.futures.as_completed,这似乎是一种非常方便的方法来评估我正在进行的数据库长期运行调用的结果.
关于concurrent.futures.ThreadPoolExecutor对象是如何工作的我一点都不清楚- 天真地,我更喜欢(稍微冗长一点):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
results = [f.result() for f in futures.as_completed(result_futures)]
Run Code Online (Sandbox Code Playgroud)
executor.map为了利用可能的性能提升,更简洁.我错了吗?
python multithreading python-multithreading python-3.x concurrent.futures
我正在尝试编写一个倒计时到给定时间的方法,除非给出重启命令,否则它将执行任务.但我认为Python threading.Timer类不允许计时器被取消.
import threading
def countdown(action):
def printText():
print 'hello!'
t = threading.Timer(5.0, printText)
if (action == 'reset'):
t.cancel()
t.start()
Run Code Online (Sandbox Code Playgroud)
我知道上面的代码是错误的.非常感谢这里的一些指导.
我想在创建Thread对象时将命名参数传递给目标函数.
以下是我写的代码:
import threading
def f(x=None, y=None):
print x,y
t = threading.Thread(target=f, args=(x=1,y=2,))
t.start()
Run Code Online (Sandbox Code Playgroud)
我在第6行得到了"x = 1"的语法错误.我想知道如何将关键字参数传递给目标函数.
python multithreading kwargs python-multithreading python-2.7
对于一些简单的线程相关代码,即:
\nimport threading\n\n\na = 0\nthreads = []\n\n\ndef x():\n global a\n for i in range(1_000_000):\n a += 1\n\n\nfor _ in range(10):\n thread = threading.Thread(target=x)\n threads.append(thread)\n thread.start()\n\n\nfor thread in threads:\n thread.join()\n\n\nprint(a)\nassert a == 10_000_000\nRun Code Online (Sandbox Code Playgroud)\n根据 Python 版本,我们得到了不同的行为。
\n对于 3.10,输出为:
\n\xe2\x9d\xaf python3.10 b.py\n10000000\nRun Code Online (Sandbox Code Playgroud)\n对于 3.9,输出为:
\n\xe2\x9d\xaf python3.9 b.py\n2440951\nTraceback (most recent call last):\n File "/Users/romka/t/threads-test/b.py", line 24, in <module>\n assert a == 10_000_000\nAssertionError\nRun Code Online (Sandbox Code Playgroud)\n由于我们没有获取任何锁,对我来说,3.9 的结果是显而易见的并且是预期的。问题是为什么 3.10 得到了“正确”的结果,而不应该得到“正确”的结果?
\n我正在查看 Python 3.10 的变更日志,没有任何与线程或 GIL 相关的内容可以带来这样的结果。
\npython multithreading python-multithreading python-internals
假设我有一个看起来像这样的函数:
def _thread_function(arg1, arg2=None, arg3=None):
#Random code
Run Code Online (Sandbox Code Playgroud)
现在我想使用该函数创建一个线程,并给它arg2而不是arg3.我正在尝试如下:
#Note: in this code block I have already set a variable called arg1 and a variable called arg2
threading.Thread(target=self._thread_function, args=(arg1, arg2=arg2), name="thread_function").start()
Run Code Online (Sandbox Code Playgroud)
上面的代码给了我一个语法错误.我如何修复它以便我可以将参数作为arg2传递给线程?
python syntax multithreading syntax-error python-multithreading
python ×10
python-2.7 ×2
daemon ×1
decorator ×1
django ×1
kwargs ×1
python-3.x ×1
syntax ×1
syntax-error ×1
timer ×1
tornado ×1