AppEngine中的单元测试任务队列

Swi*_*ler 17 google-app-engine unit-testing django-nonrel

很长一段时间以来,我一直在AppEngine上使用任务队列来安排任务,就像我应该的那样.

但我一直想知道的是如何为此编写测试?到目前为止,我只是进行了测试,以确保在排队任务的API上不会发生错误,然后为执行任务的API编写更合适的测试.

然而,最近我开始感到有点不满意,我正在寻找一种方法来实际测试正确的任务已被添加到正确的队列.希望通过部署代码并希望获得最佳代码,可以做得更好.

我正在使用django-nonrel,如果这与答案有关.

回顾一下:如何编写单元测试以确认任务已排队?

JJ *_*wax 18

如果您使用的google.appengine.ext.testbed是GAE Testbed(GAE Testbed现已弃用并被移入ext.testbed),您可以执行以下操作:

import base64
import unittest2

from google.appengine.ext import deferred
from google.appengine.ext import testbed


class TestTasks(unittest2.TestCase):
  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def test_send_contact_request(self):
    # Make the request to your app that "defers" something:
    response = ...
    self.assertEqual(response.status_int, 200)

    # Get the task out of the queue
    tasks = self.taskqueue_stub.get_filtered_tasks()
    self.assertEqual(1, len(tasks))

    # Run the task
    task = tasks[0]
    deferred.run(task.payload)

    # Assert that other things happened (ie, if the deferred was sending mail...)
    self.assertEqual(...)
Run Code Online (Sandbox Code Playgroud)


Luk*_*ncl 13

使用GAE Test Bed将允许您存根任务队列.

如果你继承FunctionalTestCaseTaskQueueTestCase,你会得到像get_tasks和的方法assertTasksInQueue.

您也可以实际执行任务.如何操作取决于您是使用任务还是延迟.

对于延迟,我有一些这样的代码:

from google.appengine.ext import deferred
import base64

# gets the most recent task -- since the task queue is reset between tests,
# this is usually what you want
def get_task(self):
    for task in self.get_task_queue_stub().GetTasks('default'):
        return task

# decode and execute the deferred method
def run_deferred(task):
    deferred.run(base64.b64decode(task['body']))
Run Code Online (Sandbox Code Playgroud)

运行任务类似,但在您获取任务后,您使用WebTest(GAE测试床构建在其上)作为POST请求以其参数提交到任务的URL.