是否可以在不设置/检查任何标志/信号量/等的情况下终止正在运行的线程?
您使用什么Python事件系统?我已经知道了pydispatcher,但我想知道还能找到什么,或者是常用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用一个我可以轻松扩展的小型简单解决方案.
请帮助我根据功能的不同来澄清这两个python语句的概念:
sys.exit(0)
os._exit(0)
我使用pthread_create(&thread1, &attrs, //... , //...);并需要一些条件发生需要杀死这个线程如何杀死这个?
我想创建一个运行多个轻量级线程的程序,但是将其自身限制为一个恒定的,预定义数量的并发运行任务,就像这样(但没有竞争条件的风险):
import threading
def f(arg):
global running
running += 1
print("Spawned a thread. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
running -= 1
print("Done")
running = 0
while True:
if running < 8:
arg = get_task()
threading.Thread(target=f, args=[arg]).start()
Run Code Online (Sandbox Code Playgroud)
实现这一目标的最安全/最快的方法是什么?
我正在使用Python 2.5并尝试excepthook在我的程序中使用自定义.在主线程中,它完美地运行.但是在一个以线程模块开始的线程中,通常excepthook会调用它.
这是一个显示问题的示例.取消注释注释会显示所需的行为.
import threading, sys
def myexcepthook(type, value, tb):
print 'myexcepthook'
class A(threading.Thread, object):
def __init__(self):
threading.Thread.__init__(self, verbose=True)
# raise Exception('in main')
self.start()
def run(self):
print 'A'
raise Exception('in thread')
if __name__ == "__main__":
sys.excepthook = myexcepthook
A()
Run Code Online (Sandbox Code Playgroud)
那么,我如何excepthook在一个线程中使用自己的?
有一些类似的问题,但没有提供我需要的答案.
如果我通过创建线程threading.Thread然后抛出未处理的异常,则终止这些线程.我希望使用堆栈跟踪保留缺省打印的异常详细信息,但也要关闭整个过程.
我认为有可能捕获线程中的所有异常,并在主线程对象上重新加载它们,或者可能手动执行默认的异常处理,然后SystemExit在主线程上引发一个.
最好的方法是什么?
Python concurrent.futures和ProcessPoolExecutor提供了一个简洁的界面来安排和监视任务.期货甚至提供 .cancel()方法:
cancel():尝试取消通话.如果当前正在执行调用且无法取消,则该方法将返回False,否则将取消调用并且该方法将返回True.
不幸的是,在一个类似的问题(关于asyncio)中,回答声称运行任务是不可取消的,使用这个剪切的文档,但文档不要说,只有它们运行和不可解决.
向进程提交multiprocessing.Events也是不可能的(通过参数执行此操作,如在multiprocess.Process中返回RuntimeError)
我想做什么?我想分区搜索空间并为每个分区运行任务.但它足以拥有一个解决方案,而且这个过程是CPU密集型的.那么有一种实际的舒适方式可以通过使用ProcessPool开始来抵消收益吗?
例:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps) …Run Code Online (Sandbox Code Playgroud) 我目前正在使用 pytest 来测试现有的(根据文档的unittest测试套件)。我目前正在编写一个线程,等待分配 IP 地址,然后将其返回给回调函数,并且我正在编写单元测试来配合它。
这是我编写的测试用例类。
class TestGetIpAddressOnNewThread(unittest.TestCase):
def test_get_existing_ip(self):
def func(ip):
assert ip == "192.168.0.1" # Not the real IP
# Even when I introduce an assert statement that should fail, test still passes
assert ip == "shouldn't be the ip"
ip_check = GetInstanceIpThread(instance, func)
ip_check.start()
ip_check.join()
if __name__ == '__main__':
pytest.main()
Run Code Online (Sandbox Code Playgroud)
这是GetInstanceIpThread伪定义:
class GetInstanceIpThread(threading.Thread):
def __init__(self, instance, callback):
threading.Thread.__init__(self)
self.event = threading.Event()
self.instance = instance
self.callback = callback
def run(self):
self.instance.wait_until_running()
ip = self.instance.ip_address
self.callback(ip) …Run Code Online (Sandbox Code Playgroud) 我正在使用unittest框架来自动化多线程python代码,外部硬件和嵌入式C的集成测试.尽管我公然滥用单元测试框架进行集成测试,但它的效果非常好.除了一个问题:如果从任何生成的线程引发异常,我需要测试失败.这是单元测试框架的可能吗?
一个简单但不可行的解决方案是:a)重构代码以避免多线程或b)分别测试每个线程.我无法做到这一点,因为代码与外部硬件异步交互.我还考虑过实现某种消息传递来将异常转发给主单元测试线程.这将需要对正在测试的代码进行重大的测试相关更改,我想避免这种情况.
是时候了.我是否可以在不修改x.ExceptionRaiser类的情况下修改my_thread中引发的异常,从而修改下面的测试脚本?
import unittest
import x
class Test(unittest.TestCase):
def test_x(self):
my_thread = x.ExceptionRaiser()
# Test case should fail when thread is started and raises
# an exception.
my_thread.start()
my_thread.join()
if __name__ == '__main__':
unittest.main()
Run Code Online (Sandbox Code Playgroud) python ×9
exception ×2
c ×1
dispatcher ×1
events ×1
kill ×1
multiprocess ×1
posix ×1
pthreads ×1
pytest ×1
systemexit ×1
terminate ×1
testing ×1