小编Fac*_*ano的帖子

用于芹菜单元测试的内存代理

我有一个用Django编写的REST API,其中和端点在发布芹菜任务时对其进行排队.响应包含任务ID,我想用它来测试任务是否已创建并获得结果.所以,我想做的事情如下:

def test_async_job():
    response = self.client.post("/api/jobs/", some_test_data, format="json")
    task_id = response.data['task_id']
    result = my_task.AsyncResult(task_id).get()
    self.assertEquals(result, ...)
Run Code Online (Sandbox Code Playgroud)

我显然不想经营一个芹菜工人来进行单元测试,我希望以某种方式嘲笑它.我不能使用CELERY_ALWAYS_EAGER因为这似乎完全绕过经纪人,防止我使用AsyncResult通过其ID来获得任务(如说在这里).

通过芹菜和kombu文档,我发现有一个内存传输单元测试,这将做我正在寻找的.我尝试覆盖BROKER_URL设置以在测试中使用它:

@override_settings(BROKER_URL='memory://')
def test_async_job():
Run Code Online (Sandbox Code Playgroud)

但行为与ampq代理相同:它会阻止测试等待结果.任何想法我怎么配置这个经纪人让它在测试中工作?

django celery kombu

28
推荐指数
3
解决办法
7838
查看次数

使用Server-Sent事件进行双向客户端 - 服务器通信(而不是WebSockets)的缺点

最近我发现Server-Sent事件是WebSockets的一个更简单的替代方法,用于从服务器进行推送.大多数比较它们的地方(比如这里,这里这里)说,如果你不需要客户端和服务器之间的全双工通信,那么WebSockets就太过分了,SSE就足够了.

我的问题是当你需要双向通信(例如聊天),使用常规的ajax请求从客户端发送消息和服务器流接收它们时,使用SSE的缺点是什么?考虑到我必须在服务器端做很少甚至没有配置来使用SSE,它似乎是一个更吸引人的选择.

real-time server-push websocket server-sent-events

13
推荐指数
1
解决办法
5141
查看次数

重置Celery任务的倒计时

我需要在倒计时后运行芹菜任务,但能够在某些条件下重置倒计时.例如,我要打电话apply_asynccountdown=15,但如果某些事件发生时执行任务之前,我想再次设定倒计时15秒.

从文档和谷歌上搜索我想我可以通过保存的任务ID,然后撤销,如果事件发生重建任务的完成这个任务.我想知道是否有更优雅的方法.

django celery celery-task

8
推荐指数
1
解决办法
631
查看次数