扭曲/执行异步http请求

use*_*284 3 python asynchronous twisted python-2.7

我有一个扭曲的反应堆,监听传入的数据。我有第二个反应堆,在一定的时间间隔内执行http请求,将结果发送到第一个反应堆。两者都运行良好。

现在,我想将其组合在一起以在一个反应​​堆中运行,但是我不知道如何实现这一目标。类似于-每60秒执行一次HTTP请求。在第一个侦听“主”反应堆内以不规则的方式

我目前所拥有的是:

# main reactor listening for incoming data forever
...
reactor.listenTCP(8123, TCPEventReceiverFactory())
Run Code Online (Sandbox Code Playgroud)

http反应堆用于twisted.internet.defer.DeferredSemaphore()执行几个http检查:

# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)

# create a list with all urls to check
dl = list()
# append deferreds to list
for url in self._urls:
    # returns deferred
    dl.append(semaphore.run(self._getPage, url))

# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addCallbacks(lambda x: reactor.stop(), self._handleError)

# start the reactor    
reactor.run()
Run Code Online (Sandbox Code Playgroud)

如何将定时的http检查添加到“主”反应堆中,以便它们以异步方式执行?究竟如何DeferredSemaphore运作?

谁能帮我这个?

[这是一种处理http checkresults的轻量级监视系统。我是Twisted和异步编程的新手。我使用的是运行Python 2.7的Xubuntu 12.04]

Sin*_*ion 5

您不需要多个反应堆。只需使用同一反应堆执行所有不同的动作即可。

如果您正在调用reactor.stop(),则可能是在做错什么,所以让我们摆脱它,并将其全部绑定到一个函数中(我们可以将其用作回调);由于它正在执行异步工作,因此它还应该返回一个延迟的,我们将使用DeferredList您已经在使用的。

def thing_that_does_http():
    # create semaphore to manage the deferreds
    semaphore = twisted.internet.defer.DeferredSemaphore(2)

    # create a list with all urls to check
    dl = DeferredList()
    # append deferreds to list
    for url in self._urls:
        # returns deferred
        dl.append(semaphore.run(self._getPage, url))

    # get a DefferedList
    dl = twisted.internet.defer.DeferredList(dl)
    # add some callbacks for error handling
    dl.addErrback(self._handleError)
    return dl
Run Code Online (Sandbox Code Playgroud)

“ 在特定时间间隔内执行x ”的自然方法是使用循环调用。有了这个回调函数,我们不需要做很多事情

reactor.listenTCP(8123, TCPEventReceiverFactory())
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http)
# run once per minute, starting now.
loop_http.start(60)
Run Code Online (Sandbox Code Playgroud)

反应器LoopingCallgetPage将用它来达到自己的目的是twisted.internet.reactor,如果你使用的是不同的反应器,例如,如果你正在做单元测试,你需要重写默认。

对于LoopingCall,这很简单,在构造之后(但调用其start()方法之前),设置其clock属性:

from twisted.internet.task import Clock
fake_reactor = Clock()
loop_http.clock = fake_reactor
fake_reactor.advance(120)  # move time forward two minutes...
Run Code Online (Sandbox Code Playgroud)

不幸的是,这种情况getPage()不太好。您不能将任何其他反应堆与该接口一起使用。您需要使用更新的shinier t.w.c.Agent。从许多方面讲Agent,它是优越的,但是当您只想将原始响应主体作为字符串时,它就不那么方便了。

除了要求将显式反应堆传递给其构造函数外,它还涉及到对请求/响应周期的精细控制,而不是getPage提供的便利。因此,它主要是根据Producers和Protocols实现的。对于前者,我们可以通过一个便利助手,FileBodyProducer以最小的麻烦发送请求正文;在后者中,我们将需要一个简单的协议来缓冲所有数据块,直到获得所有数据为止。

这是一段代码,可以getPage用大致相同的接口替换,但是将的实例Agent作为第一个参数

from cStringIO import StringIO
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
from twisted.web.client import FileBodyProducer


class GetPageProtocol(Protocol):
    def __init__(self):
        self.deferred = Deferred()
        self.data = []

    def dataReceived(self, data):
        self.data.append(data)

    def connectionLost(self, reason):
        reason.trap(ResponseDone)
        data = ''.join(self.data)
        del self.data
        self.deferred.callback(data)


def agentGetPage(agent, url,
                 method="GET",
                 headers=None,
                 postdata=None):
    if postdata is not None:
        bodyProducer = FileBodyProducer(StringIO(postdata))
    else:
        bodyProducer = None

    def _getPageResponded(response):
        if response.length != 0:
            proto = GetPageProtocol()
            response.deliverBody(proto)
            return proto.deferred
        else:
            return None

    d = agent.request(method, url, headers, bodyProducer)
    d.addCallback(_getPageResponded)
    return d
Run Code Online (Sandbox Code Playgroud)

在单元测试中,其外观类似于:

from twisted.test.proto_helpers import MemoryReactor
from twisted.web.client import Agent
fake_reactor = MemoryReactor()
agent = Agent(fake_reactor)
d = agentGetPage(agent, "http://example.com")

assert fake_reactor.tcpClients  # or some such, exercise the code by manipulating the reactor
Run Code Online (Sandbox Code Playgroud)

编辑:我最初想略过这一点,以给出ectomorph,少为混淆;但是这也是一个不错的主意,那就是尽早处理好反应堆,并避免以后不必要的痛苦。

  • 好的答案,但并不完全正确:)。“ LoopingCall”实际上会使用“ self.clock”,它只是默认情况下被初始化为“ twisted.internet.reactor”。更改它的能力很重要,尤其是对于测试。(可悲的是,`getPage`被有效地硬编码到了它,这就是为什么我们现在推荐`twisted.web.client.Agent`的原因之一。) (2认同)
  • @Glyph:更新:在那里,我认为使用反应堆的内容要轻描淡写。 (2认同)