dou*_*gle 29 python testing multithreading timer race-condition
上下文
我最近发布了一个计时器类,用于审查Code Review.我有一种直觉感觉有并发错误,因为我曾经看过1单元测试失败,但无法重现失败.因此,我的代码审查发布.
我得到了一些很好的反馈,突出了代码中的各种竞争条件.(我想)我理解了问题和解决方案,但在进行任何修复之前,我想通过单元测试来暴露这些错误.当我尝试时,我意识到这很困难.各种堆栈交换答案建议我必须控制线程的执行以暴露bug,并且任何人为的时间不一定可移植到不同的机器上.这似乎是我试图解决的问题之外的许多偶然的复杂性.
相反,我尝试使用最好的静态分析(SA)工具python,PyLint,看看它是否会挑出任何错误,但它不能.为什么人类可以通过代码审查(实质上是SA)找到错误,但SA工具不能?
由于害怕尝试让Valgrind使用python(听起来像牦牛皮),我决定在修复错误时不要先复制它们.现在我在泡菜.
这是现在的代码.
from threading import Timer, Lock
from time import time
class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass
class KitchenTimer(object):
'''
Loosely models a clockwork kitchen timer with the following differences:
You can start the timer with arbitrary duration (e.g. 1.2 seconds).
The timer calls back a given function when time's up.
Querying the time remaining has 0.1 second accuracy.
'''
PRECISION_NUM_DECIMAL_PLACES = 1
RUNNING = "RUNNING"
STOPPED = "STOPPED"
TIMEUP = "TIMEUP"
def __init__(self):
self._stateLock = Lock()
with self._stateLock:
self._state = self.STOPPED
self._timeRemaining = 0
def start(self, duration=1, whenTimeup=None):
'''
Starts the timer to count down from the given duration and call whenTimeup when time's up.
'''
with self._stateLock:
if self.isRunning():
raise AlreadyRunningError
else:
self._state = self.RUNNING
self.duration = duration
self._userWhenTimeup = whenTimeup
self._startTime = time()
self._timer = Timer(duration, self._whenTimeup)
self._timer.start()
def stop(self):
'''
Stops the timer, preventing whenTimeup callback.
'''
with self._stateLock:
if self.isRunning():
self._timer.cancel()
self._state = self.STOPPED
self._timeRemaining = self.duration - self._elapsedTime()
else:
raise NotRunningError()
def isRunning(self):
return self._state == self.RUNNING
def isStopped(self):
return self._state == self.STOPPED
def isTimeup(self):
return self._state == self.TIMEUP
@property
def timeRemaining(self):
if self.isRunning():
self._timeRemaining = self.duration - self._elapsedTime()
return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)
def _whenTimeup(self):
with self._stateLock:
self._state = self.TIMEUP
self._timeRemaining = 0
if callable(self._userWhenTimeup):
self._userWhenTimeup()
def _elapsedTime(self):
return time() - self._startTime
Run Code Online (Sandbox Code Playgroud)
题
在这个代码示例的上下文中,我如何公开竞争条件,修复它们并证明它们已被修复?
加分
适用于其他实现和问题的测试框架的额外点,而不是专门针对此代码.
带走
我的想法是,重现已识别的竞争条件的技术解决方案是控制两个线程的同步,以确保它们按照将暴露错误的顺序执行.这里重点是他们已经确定了竞争条件.我发现识别竞争条件的最佳方法是将代码用于代码审查,并鼓励更多专家分析它.
传统上,使用信号量强制执行多线程代码中的竞争条件,因此您可以强制线程等待,直到另一个线程在继续之前达到某个边缘条件.
例如,start如果对象已在运行,则您的对象有一些代码可以检查是否未调用.您可以通过执行以下操作来强制此条件以确保其行为符合预期:
KitchenTimerAlreadyRunningError要做到这一点,您可能需要扩展KitchenTimer类.正式单元测试通常使用模拟对象,这些对象被定义为在关键时刻阻塞.模拟对象是一个比我在这里可以解决的更大的主题,但谷歌搜索"python模拟对象"将出现很多文档和许多实现可供选择.
这是一种可以强制代码抛出的方法AlreadyRunningError:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def start(self, duration=1, whenTimeUp=None):
KitchenTimer.start(self, duration, whenTimeUp)
with self._runningLock:
print "waiting on _runningLock"
self._runningLock.wait()
def resume(self):
with self._runningLock:
self._runningLock.notify()
timer = TestKitchenTimer()
# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()
# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
thread_2 = threading.Thread(target = timer.start, args = (10, None))
thread_2.start()
except AlreadyRunningError:
print "AlreadyRunningError"
timer.resume()
timer.stop()
Run Code Online (Sandbox Code Playgroud)
阅读代码,找出你想要测试的一些边界条件,然后考虑你需要暂停计时器以强制出现这种情况的位置,并添加条件,信号量,事件等来实现它.例如,如果计时器运行whenTimeUp回调,另一个线程试图阻止它会发生什么?您可以通过使计时器在输入_whenTimeUp后立即等待来强制执行该条件:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def _whenTimeup(self):
with self._runningLock:
self._runningLock.wait()
KitchenTimer._whenTimeup(self)
def resume(self):
with self._runningLock:
self._runningLock.notify()
def TimeupCallback():
print "TimeupCallback was called"
timer = TestKitchenTimer()
# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)
# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()
# Now allow the countdown thread to resume.
timer.resume()
Run Code Online (Sandbox Code Playgroud)
对要测试的类进行子类化并不是一种很好的方法来测试它:为了测试每个方法中的竞争条件,你必须基本上覆盖所有方法,并且在这一点上有一个很好的论据是使你没有真正测试原始代码.相反,您可能会发现将信号量放在KitchenTimer对象中更加清晰,但默认情况下初始化为None,并if testRunningLock is not None:在获取或等待锁之前检查方法.然后,您可以强制执行您提交的实际代码.
关于Python模拟框架的一些阅读可能会有所帮助.事实上,我不确定模拟是否有助于测试此代码:它几乎完全是自包含的,并且不依赖于许多外部对象.但是模拟教程有时会触及这些问题.我没有使用过这些,但是关于这些的文档就像一个开始的好地方:
测试线程(un)安全代码的最常见解决方案是启动大量线程并希望获得最佳.问题我,我可以想象其他人,有这个问题,它依赖于机会,它使测试'重'.
正如我前一段时间遇到的那样,我想要精确而不是蛮力.结果是一条测试代码通过让线程颈部到颈部来引起竞争条件.
spam = []
def set_spam():
spam[:] = foo()
use(spam)
Run Code Online (Sandbox Code Playgroud)
如果set_spam从多个线程调用,则在修改和使用之间存在竞争条件spam.让我们尝试一致地重现它.
class TriggeredThread(threading.Thread):
def __init__(self, sequence=None, *args, **kwargs):
self.sequence = sequence
self.lock = threading.Condition()
self.event = threading.Event()
threading.Thread.__init__(self, *args, **kwargs)
def __enter__(self):
self.lock.acquire()
while not self.event.is_set():
self.lock.wait()
self.event.clear()
def __exit__(self, *args):
self.lock.release()
if self.sequence:
next(self.sequence).trigger()
def trigger(self):
with self.lock:
self.event.set()
self.lock.notify()
Run Code Online (Sandbox Code Playgroud)
然后演示这个线程的用法:
spam = [] # Use a list to share values across threads.
results = [] # Register the results.
def set_spam():
thread = threading.current_thread()
with thread: # Acquires the lock.
# Set 'spam' to thread name
spam[:] = [thread.name]
# Thread 'releases' the lock upon exiting the context.
# The next thread is triggered and this thread waits for a trigger.
with thread:
# Since each thread overwrites the content of the 'spam'
# list, this should only result in True for the last thread.
results.append(spam == [thread.name])
threads = [
TriggeredThread(name='a', target=set_spam),
TriggeredThread(name='b', target=set_spam),
TriggeredThread(name='c', target=set_spam)]
# Create a shifted sequence of threads and share it among the threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
thread.sequence = thread_sequence
# Start each thread
[thread.start() for thread in threads]
# Trigger first thread.
# That thread will trigger the next thread, and so on.
threads[0].trigger()
# Wait for each thread to finish.
[thread.join() for thread in threads]
# The last thread 'has won the race' overwriting the value
# for 'spam', thus [False, False, True].
# If set_spam were thread-safe, all results would be true.
assert results == [False, False, True], "race condition triggered"
assert results == [True, True, True], "code is thread-safe"
Run Code Online (Sandbox Code Playgroud)
我想我已经解释了这个结构,所以你可以根据自己的情况实现它.我认为这非常适合"加分"部分:
适用于其他实现和问题的测试框架的额外点,而不是专门针对此代码.
每个线程问题都以自己特定的方式解决.在上面的例子中,我通过在线程之间共享一个值来引发竞争条件.使用全局变量(例如模块属性)时可能会出现类似问题.解决此类问题的关键可能是使用线程本地存储:
# The thread local storage is a global.
# This may seem weird at first, but it isn't actually shared among threads.
data = threading.local()
data.spam = [] # This list only exists in this thread.
results = [] # Results *are* shared though.
def set_spam():
thread = threading.current_thread()
# 'get' or set the 'spam' list. This actually creates a new list.
# If the list was shared among threads this would cause a race-condition.
data.spam = getattr(data, 'spam', [])
with thread:
data.spam[:] = [thread.name]
with thread:
results.append(data.spam == [thread.name])
# Start the threads as in the example above.
assert all(results) # All results should be True.
Run Code Online (Sandbox Code Playgroud)
常见的线程问题是多线程同时读取和/或写入数据持有者的问题.通过实现读写锁来解决此问题.读写锁的实际实现可能不同.您可以选择先读锁定,先写入锁定或随机选择.
我确信有一些例子描述了这种锁定技术.我可能稍后会写一个例子,因为这已经是一个很长的答案了.;-)
看一下线程模块文档并稍微进行实验.由于每个线程问题都不同,因此适用不同的解决方案.
关于线程的主题,请看一下Python GIL(全局解释器锁).重要的是要注意,线程实际上可能不是优化性能的最佳方法(但这不是您的目标).我发现这个演示文稿非常好:https://www.youtube.com/watch?v = zEaosS1U5qY