我想通过Tornado流式传输长数据库结果集.我显然需要一个服务器游标,因为它不可能将整个查询加载到内存中.
所以我有以下代码:
class QueryStreamer(RequestHandler):
def get(self):
cursor.execute("Select * from ...")
chunk = cursor.fetch(1000)
while chunk:
self.write(chunk)
self.flush()
chunk = cursor.fetch(1000)
self.finish()
cursor.close()
Run Code Online (Sandbox Code Playgroud)
如果有人直到最后才读到我的请求?(即curl ... |head),该get方法很乐意将我的数据流式传输到任何地方.我希望SIGPIPE在某些时候得到并关闭数据库游标(没有将它运行到最后没有任何结果).
如何在Tornado中捕获写入错误?
更新:根据答案中的建议我尝试了以下内容:
import tornado.ioloop
import tornado.web
import time
class PingHandler(tornado.web.RequestHandler):
def get(self):
for i in range(600):
self.write("pong\n")
self.flush()
time.sleep(1)
print "pong"
self.finish()
print "ponged"
def on_connection_close(self):
print "closed"
if __name__ == "__main__":
application = tornado.web.Application([ ("/ping", PingHandler), ])
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)
我在终端1和终端2中运行此文件我调用:
curl -s http://localhost:8888/ping
Run Code Online (Sandbox Code Playgroud)
在第一次回复后我点击了CTRL-C.但是在终端1中,我看到它很高兴地保持着"乒乓球",并且on_connection_close永远不会被召唤.
底线 - 仍然不起作用.
您需要使处理程序异步并使用ioloop.add_timeout而不是 time.sleep,因为这会阻止循环:
import tornado.ioloop
import tornado.web
import tornado.gen
class PingHandler(tornado.web.RequestHandler):
connection_closed = False
def on_connection_close(self):
print "closed"
self.connection_closed = True
@tornado.gen.coroutine # <= async handler
def get(self):
for i in range(600):
if self.connection_closed:
# `on_connection_close()` has been called,
# break out of the loop
break
self.write("pong %s\n" % i)
self.flush()
# Add a timeout. Similar to time.sleep(1), but non-blocking:
yield tornado.gen.Task(
tornado.ioloop.IOLoop.instance().add_timeout,
tornado.ioloop.IOLoop.instance().time() + 1,
)
self.finish()
print "finished"
if __name__ == "__main__":
application = tornado.web.Application([("/ping", PingHandler), ])
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)