设置要在线程否则完成时执行的行为

Gre*_*Guy 5 python multithreading thread-safety

我的模块中有两个功能:do_something()change_behavior()

该函数默认do_something()执行ThingA。之后change_behavior()被调用,do_something()事情乙代替。

我希望此过渡是特定于线程的。也就是说,任何新线程在调用时都会发生Thing Ado_something(),但是如果该线程调用change_behavior(),则在继续调用Thing B时会发生do_something()

每个线程应该是独立的,以便一个线程调用change_behavior()不会影响do_something()for其他线程的行为。


我本能的解决方案是将行为绑定到线程的ID(通过进行评估threading.get_ident())。该函数do_something()检查本地表中是否包含线程的ID,并相应地调整其行为。同时,该函数change_behavior()只是将当前线程添加到该注册表中。这在任何给定时间都是可行的,因为永远不会有两个具有相同ID的并发线程。

当当前线程集加入并经过时间,并且父线程使一大堆线程更多时,就会出现问题。新线程之一具有与先前线程之一相同的ID,因为有时会重复使用线程ID。这线程调用do_something(),因为它已经在注册表中,它的东西乙,而不是事情

为了解决这个问题,我需要从注册表中删除线程ID,介于具有该ID的第一个线程何时结束与具有该ID的第二个线程开始之间。我提出了一些假想的想法:

  • 定期检查每个线程ID是否仍处于活动状态。这是有问题的,因为它既浪费CPU资源,又可能在线程被破坏然后在两次更新之间重新创建线程时丢失
  • 附加一个方法钩子,以便在线程加入时被调用。除了下一个想法,我不确定如何执行此操作
  • 作为的一部分change_behavior(),劫持/替换当前线程的._quit()方法,该方法首先从注册表中删除线程的ID。这似乎是一种不好的做法,并且可能会中断。

我的用例的另一个方面是,如果可能的话,我希望新线程继承其父线程的当前行为,以便用户不必手动设置他们创建的每个标志-但这与与线程结束时相比,我如何存储有关胎面状态的信息,这使得它与该特定问题的相关性略微降低。

我正在寻找有关这些特定解决方案中的任何一种是理想的,标准的还是惯用的,以及在此用例中是否有想要做的事情的指南。


threading.local()@TarunLalwani在评论中建议使用。我已经对其进行了研究,但它很有用,但是它不能解决我要处理的其他用例-当父线程创建新的子线程时,我希望它们继承父线程的状态。我当时在考虑通过替换来完成此操作Thread.__init__(),但是使用local()通常与该用例不兼容,因为我无法将变量从父线程传递给子线程。


我也一直在尝试,更简单地将属性保存到线程本身中,从而更加成功:

current_thread = threading.current_thread()
setattr(current_thread, my_reference, new_value)
Run Code Online (Sandbox Code Playgroud)

这个问题是,对于这完全令我迷惑,原因在模块的名字空间,其值是目前任何其他变量current_thread.my_reference也被设置为new_value。我不知道为什么,而且我一直无法在MVE中复制问题(尽管即使在重新启动后,它在我的IDE中也一直发生)。正如我的另一个当前活动问题所暗示的那样,我在此处设置的对象是对输出流的引用(我在该答案中描述的每个对中间IO流的实例的引用都已被该方法所使用的文件描述符替换了。 ),如果这与它有任何关系,但是我无法想象为什么在这种情况下对象的类型会影响引用的工作方式。

jfe*_*ard 4

我的回答是对你的问题的一个非常简单的回答,因此我想知道我是否错过了什么。基本上,我认为您应该避免在模块中存储外部对象的当前状态。

您需要将状态(如果change_behavior被调用,也许还有一些其他数据)存储在某处。您有两个主要选择:将状态存储在模块中或将状态存储在线程本身中。除了在模块中存储状态时遇到的问题之外,人们期望模块(主要)是无状态的,因此我认为您应该坚持后者并将数据存储在线程中。

版本1

如果将状态存储在字段中,则创建的属性名称与现有属性名称之间存在冲突的风险很小,但如果文档清晰并且选择了一个好的名称,那么这不应该是一个错误。问题。

一个简单的概念证明,没有setattror hasattr(我没有检查 CPython 的源代码,但奇怪的行为可能来自setattr):

模块1.py

import threading
import random
import time

_lock = threading.Lock()

def do_something():
    with _lock:
        t = threading.current_thread()
        try:
            if t._my_module_s:
                print(f"DoB ({t})")
            else:
                print(f"DoA ({t})")
        except AttributeError:
            t._my_module_s = 0
            print(f"DoA ({t})")

    time.sleep(random.random()*2)

def change_behavior():
    with _lock:
        t = threading.current_thread()
        print(f"Change behavior of: {t}")
        t._my_module_s = 1
Run Code Online (Sandbox Code Playgroud)

测试1.py

import random
import threading
from module1 import *

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        n = random.randint(1, 10)
        for i in range(n):
            do_something()
        change_behavior()
        for i in range(10-n):
            do_something()

thread_1 = MyThread()
thread_2 = MyThread()
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
Run Code Online (Sandbox Code Playgroud)

输出1

DoA (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoA (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-1, started 140155115792128)>)
Change behavior of: <MyThread(Thread-1, started 140155115792128)>
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
Change behavior of: <MyThread(Thread-2, started 140155107399424)>
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)
Run Code Online (Sandbox Code Playgroud)

版本2

如果您确定最终用户将在线程内使用您的模块,您可以为他/她提供一种方便的方法来执行此操作。这个想法是自己处理线程。只需将用户函数包装在一个线程中,并将线程的状态存储在这个线程中,如上所示。不同之处在于,您是子类的所有者Thread,并且避免了名称冲突的风险。另外,在我看来,代码变得更干净:

模块2.py

import threading
import random
import time

_lock = threading.Lock()

def do_something():
    with _lock:
        t = threading.current_thread()
        t.do_something() # t must be a _UserFunctionWrapper
    time.sleep(random.random()*2)

def change_behavior():
    with _lock:
        t = threading.current_thread()
        t.change_behavior() # t must be a _UserFunctionWrapper

def wrap_in_thread(f):
    return _UserFunctionWrapper(f)

class _UserFunctionWrapper(threading.Thread):
    def __init__(self, user_function):
        threading.Thread.__init__(self)
        self._user_function = user_function
        self._s = 0

    def change_behavior(self):
        print(f"Change behavior of: {self}")
        self._s = 1

    def do_something(self):
        if self._s:
            print(f"DoB ({self})")
        else:
            print(f"DoA ({self})")

    def run(self):
        self._user_function()
Run Code Online (Sandbox Code Playgroud)

测试2.py

import random
from module2 import *

def user_function():
    n = random.randint(1, 10)
    for i in range(n):
        do_something() # won't work if the function is not wrapped
    change_behavior()
    for i in range(10-n):
        do_something()

thread_1 = wrap_in_thread(user_function)
thread_2 = wrap_in_thread(user_function)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
Run Code Online (Sandbox Code Playgroud)

输出2

DoA (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
Change behavior of: <_UserFunctionWrapper(Thread-1, started 140193896072960)>
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
Change behavior of: <_UserFunctionWrapper(Thread-2, started 140193887680256)>
DoB (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
Run Code Online (Sandbox Code Playgroud)

缺点是即使不需要也必须使用线程。