Celery包含一个能够使用amqp或其他一些芹菜后端发出异步HTTP请求的模块.我正在使用tornado-celery生产者进行异步消息发布.据我所知,龙卷风芹菜使用鼠兔为此.问题是如何使celery.task.http.URL适应龙卷风(使其无阻塞).基本上有两个地方需要改进:
HttpDispatch.make_request() 必须使用tornado async http客户端实现;URL.get_async(**kw)或者URL.post_async(**kw)必须使用tornado API重新实现相应的非阻塞代码.例如:
class NonBlockingURL(celery.task.http.URL):
@gen.coroutine
def post_async(self, **kwargs):
async_res = yield gen.Task(self.dispatcher.delay,
str(self), 'POST', **kwargs)
raise gen.Return(async_res)
Run Code Online (Sandbox Code Playgroud)但我无法理解如何以恰当和简洁的方式做到这一点.如何使其完全像非异步的非阻塞?顺便说一句,我正在使用amqp后端.
请给我一个很好的指导,甚至更好,一个例子.
我有一个棘手而有趣的问题.
在通过Twisted,Tornado中的某些传输层处理I/O任务(例如协议实现)时,我发现了类似的场景或模式.该模式相当通用而不是抽象.例如,当您使用类似MODEM的设备时,您可以向他发送命令并接收结果.
但是,有时您需要使用新命令对最后一个命令的调制解调器的响应做出反应.例如,假设调制解调器是M, - >是通信运算符,它接受一个参数,消息密钥,服务器是S.
1. s ->(a) M
1.1 M ->(b) S # modem reacts on `a` as `b`; so next we should send him command B
1.2 M ->(c) S # modem responses on `a` as `c`; so next we should send him C
2. s ->(b) M
2.1 M ->(g) S
2.2 M -> (f) S
...
2.N M -> (x) S
...
Run Code Online (Sandbox Code Playgroud)
所以,它看起来像FSM行为.在龙卷风中实现这种情况会很好,同时使用非阻塞I/O(通过流对象).通过简单地提供跟踪场景作为输入并覆盖输入中描述的状态(事件)的处理程序,我们可以达到良好的有限状态机行为.
输入可能有以下表示法:
{
a: (b, c, d),
b: (c, 'exit|silence'),
c: (a, …Run Code Online (Sandbox Code Playgroud) 在龙卷风中使用Periodic Callback创建调度程序的最佳方式(模式)是什么:在特定日期时间执行预定作业