Gab*_*ava 7 python multithreading pytest xdist
我正在尝试使用 pytest-xdist 以使我的测试并行运行,问题是每个线程都将转到与所有测试共享的夹具并根据线程数执行它。
它给我带来了一个问题,因为该夹具的作用是为我的测试创建数据,一旦它已经创建,我就会得到和错误,因为它已经创建(通过 REST)。
conftest.py:
lock = threading.Lock()
@pytest.fixture(scope=session)
def init_data(request):
lock.acquire()
try:
data = []
if checking_data_not_created():
data.append(some_rest_command_creating_data_1())
data.append(some_rest_command_creating_data_2())
finally:
lock.release()
yield data
lock.acquire()
try:
remove_all_data()
finally:
lock.release()
Run Code Online (Sandbox Code Playgroud)
测试类.py:
class classTests():
def first_test(init_data):
test body and validation....
def second_test(init_data):
test body and validation....
Run Code Online (Sandbox Code Playgroud)
我正在使用命令:pytest -v -n2
假设第一个线程应该运行 first_test() 并且第二个线程应该运行 second_test() 其中一个总是会失败,因为第一个已经在夹具部分创建了数据,另一个线程将出现异常并且他应该运行的所有测试都将失败。
如您所见,我尝试使用锁来同步线程,但它也不起作用。
知道如何克服这个问题吗?
谢谢。
这种方法不适用于 pytest-xdist,因为它使用多处理而不是多线程,但是它可以使用--tests-per-worker 选项与pytest-parallel一起使用,它将使用多个线程运行测试。
在使用以下固定装置的多线程 pytest 执行中,数据仅设置一次并清理一次:
conftest.py:
lock = threading.Lock()
threaded_count = 0
@pytest.fixture(scope='session')
def init_data():
global lock
global threaded_count
lock.acquire()
threaded_count += 1
try:
if threaded_count == 1:
# Setup Data Once
data = setup_data()
finally:
lock.release()
yield data
lock.acquire()
threaded_count -= 1
try:
if threaded_count == 0:
# Cleanup Data Once
data = cleaup_data()
finally:
lock.release()
Run Code Online (Sandbox Code Playgroud)
命令:
pytest -v --每个工人测试 2
归档时间: |
|
查看次数: |
3593 次 |
最近记录: |