Python在类中进行线程化

Nie*_*els 33 python multithreading

我最近开始使用python的线程模块.经过一些试验和错误后,我设法使用大多数教程中给出的以下示例代码来使基本线程工作.

class SomeThread(threading.Thread):
    def __init__(self, count):
        threading.Thread.__init__(self)

    def run(self):
        print "Do something"
Run Code Online (Sandbox Code Playgroud)

我的问题是:我有一个具有类变量的类和一个我希望在一个单独的线程中运行的函数.但是,该函数使用类变量并写入类变量.像这样:

class MyClass:
    somevar = 'someval'

    def func_to_be_threaded(self):
        # Uses other class functions
        # Do something with class variables
Run Code Online (Sandbox Code Playgroud)

那么我将如何将线程类放入MyClass中呢?因此,如果调用MyClass().func_to_threaded(),它将在一个线程中运行.

fre*_*ish 85

如果我理解正确,你想在一个单独的线程中运行一个函数?有几种方法可以做到这一点.但基本上你像这样包装你的函数:

class MyClass:
    somevar = 'someval'

    def _func_to_be_threaded(self):
        # main body

    def func_to_be_threaded(self):
        threading.Thread(target=self._func_to_be_threaded).start()
Run Code Online (Sandbox Code Playgroud)

它可以用装饰器缩短:

def threaded(fn):
    def wrapper(*args, **kwargs):
        threading.Thread(target=fn, args=args, kwargs=kwargs).start()
    return wrapper

class MyClass:
    somevar = 'someval'

    @threaded
    def func_to_be_threaded(self):
        # main body
Run Code Online (Sandbox Code Playgroud)

使用句柄编辑更新版本:

def threaded(fn):
    def wrapper(*args, **kwargs):
        thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return wrapper

class MyClass:
    somevar = 'someval'

    @threaded
    def func_to_be_threaded(self):
        print 'xyz'
Run Code Online (Sandbox Code Playgroud)

这可以使用如下:

>>> my_obj = MyClass()
>>> handle = my_obj.func_to_be_threaded()
>>> handle.join()
Run Code Online (Sandbox Code Playgroud)

现在,如果您希望从函数返回值,则可以进一步扩展它.考虑一下:

from threading import Thread
from concurrent.futures import Future

def call_with_future(fn, future, args, kwargs):
    try:
        result = fn(*args, **kwargs)
        future.set_result(result)
    except Exception as exc:
        future.set_exception(exc)

def threaded(fn):
    def wrapper(*args, **kwargs):
        future = Future()
        Thread(target=call_with_future, args=(fn, future, args, kwargs)).start()
        return future
    return wrapper


class MyClass:
    @threaded
    def get_my_value(self):
        return 1

>>> my_obj = MyClass()
>>> fut = my_obj.get_my_value()  # this will run in a separate thread
>>> fut.result()  # will block until result is computed
1
Run Code Online (Sandbox Code Playgroud)

如果你没有concurrent.futures.Future类(因为你使用的是Python2.7或更早版本),那么你可以使用这个简化的实现:

from threading import Event

class Future(object):
    def __init__(self):
        self._ev = Event()

    def set_result(self, result):
        self._result = result
        self._ev.set()

    def set_exception(self, exc):
        self._exc = exc
        self._ev.set()

    def result(self):
        self._ev.wait()
        if hasattr(self, '_exc'):
            raise self._exc
        return self._result
Run Code Online (Sandbox Code Playgroud)

我建议阅读concurrent.futures模块,因为它有很多简洁的工具.例如,Thread类应该用ThreadPoolExecutor实例替换以限制并发(例如,您不希望垃圾邮件10k线程).此外,ThreadPoolExecutor代码更简单(并且更不容易出错):

from concurrent.futures import ThreadPoolExecutor

tp = ThreadPoolExecutor(10)  # max 10 threads

def threaded(fn):
    def wrapper(*args, **kwargs):
        return tp.submit(fn, *args, **kwargs)  # returns Future object
    return wrapper
Run Code Online (Sandbox Code Playgroud)

记住你tp.shutdown()完成所有并行工作后必须要做的事情.

  • 是否有允许它的注释版本?如果可以的话,这将非常酷. (3认同)
  • @jeremyjjbrown对于这个极其迟到的回应,我感到非常抱歉.不知怎的,我错过了你的评论,我刚刚通过我的旧帖子注意到它.我的错.:(无论如何,我正在给予扩展实现. (3认同)

ndp*_*dpu 6

您可以将类实例传递给线程:

class SomeThread(threading.Thread):
    def __init__(self, count, instance):
        threading.Thread.__init__(self)
        self.instance = instance

    def run(self):
        print "Do something"
        self.instance.some_param = data
        self.instance.some_function()
Run Code Online (Sandbox Code Playgroud)