使用线程时Django中的数据库错误

Jos*_*iño 11 python django postgresql multithreading

我正在使用Django Web应用程序,它需要查询PostgreSQL数据库.使用Python 线程接口实现并发时,我收到DoesNotExist查询项的错误.当然,顺序执行查询时不会发生这些错误.

让我展示一个单元测试,我写这个测试来演示意外的行为:

class ThreadingTest(TestCase):
    fixtures = ['demo_city',]

    def test_sequential_requests(self):
        """
        A very simple request to database, made sequentially.

        A fixture for the cities has been loaded above. It is supposed to be
        six cities in the testing database now. We will made a request for
        each one of the cities sequentially.
        """
        for number in range(1, 7):
            c = City.objects.get(pk=number)
            self.assertEqual(c.pk, number)

    def test_threaded_requests(self):
        """
        Now, to test the threaded behavior, we will spawn a thread for
        retrieving each city from the database.
        """

        threads = []
        cities = []

        def do_requests(number):
            cities.append(City.objects.get(pk=number))

        [threads.append(threading.Thread(target=do_requests, args=(n,))) for n in range(1, 7)]

        [t.start() for t in threads]
        [t.join() for t in threads]

        self.assertNotEqual(cities, [])
Run Code Online (Sandbox Code Playgroud)

如您所见,第一个测试按顺序执行一些数据库请求,这确实没有问题.但是,第二个测试执行的请求完全相同,但每个请求都是在一个线程中生成的.这实际上是失败的,返回DoesNotExist异常.

执行此单元测试的输出如下:

test_sequential_requests (cesta.core.tests.threadbase.ThreadingTest) ... ok
test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest) ...

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
    cities.append(City.objects.get(pk=number))
  File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
    return self.get_query_set().get(*args, **kwargs)
  File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
    % self.model._meta.object_name)
DoesNotExist: City matching query does not exist.
Run Code Online (Sandbox Code Playgroud)

...其他线程返回类似的输出...

Exception in thread Thread-6:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
    cities.append(City.objects.get(pk=number))
  File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
    return self.get_query_set().get(*args, **kwargs)
  File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
    % self.model._meta.object_name)
DoesNotExist: City matching query does not exist.


FAIL

======================================================================
FAIL: test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 52, in test_threaded_requests
    self.assertNotEqual(cities, [])
AssertionError: [] == []

----------------------------------------------------------------------
Ran 2 tests in 0.278s

FAILED (failures=1)
Destroying test database for alias 'default' ('test_cesta')...
Run Code Online (Sandbox Code Playgroud)

请记住,所有这些都发生在PostgreSQL数据库中,该数据库应该是线程安全的,而不是SQLite或类似的.测试也使用PostgreSQL运行.

在这一点上,我完全迷失了可能失败的东西.有什么想法或建议吗?

谢谢!

编辑:我写了一个小视图只是为了检查它是否在测试中运行.以下是视图的代码:

def get_cities(request):
    queue = Queue.Queue()

    def get_async_cities(q, n):
        city = City.objects.get(pk=n)
        q.put(city)

    threads = [threading.Thread(target=get_async_cities, args=(queue, number)) for number in range(1, 5)]

    [t.start() for t in threads]
    [t.join() for t in threads]

    cities = list()

    while not queue.empty():
        cities.append(queue.get())

    return render_to_response('async/cities.html', {'cities': cities},
        context_instance=RequestContext(request))
Run Code Online (Sandbox Code Playgroud)

(请不要考虑在视图代码中编写应用程序逻辑的愚蠢.请记住,这只是一个概念证明,并不会永远不会出现在真正的应用程序中.)

结果是代码运行良好,请求在线程中成功完成,视图最终在调用URL后显示城市.

所以,我认为使用线程进行查询只会在需要测试代码时出现问题.在生产中,它将毫无问题地工作.

有没有成功测试这种代码的有用建议?

Tis*_*sho 12

尝试使用TransactionTestCase:

class ThreadingTest(TransactionTestCase):
Run Code Online (Sandbox Code Playgroud)

TestCase将数据保存在内存中,不会向数据库发出COMMIT.可能线程正在尝试直接连接到DB,而数据尚未提交.请点击此处:https://docs.djangoproject.com/en/dev/topics/testing/ from = olddocs #django.test.TransactionTestCase

TransactionTestCase和TestCase是相同的,除了数据库重置为已知状态的方式以及测试代码测试提交和回滚效果的能力.TransactionTestCase通过截断所有表并重新加载初始数据,在测试运行之前重置数据库.TransactionTestCase可以调用commit和rollback,并观察这些调用对数据库的影响.


Dan*_*man 2

这听起来像是交易的问题。如果您在当前请求(或测试)中创建元素,则它们几乎肯定处于未提交的事务中,无法从其他线程中的单独连接访问。您可能需要手动管理交易才能使其发挥作用。