ank*_*kit 7 python queue webserver batch-processing
我需要一个Web服务器,它将传入的请求路由到后端工作者,每0.5秒对它们进行一次批处理,或者当它有50个http请求时,以先发生者为准.在python/tornado或任何其他语言中实现它的好方法是什么?
我的想法是将传入的请求发布到rabbitMQ队列,然后以某种方式将它们一起批处理,然后再发送到后端服务器.我无法弄清楚的是如何从rabbitMq队列中选择多个请求.有人能指出我正确的方向或建议一些替代的apporach?
我建议使用简单的 python 微型网络框架,例如 Bottle。然后,您将通过队列将请求发送到后台进程(从而允许连接结束)。
然后,后台进程将有一个连续的循环来检查您的条件(时间和数量),并在满足条件后执行工作。
编辑:
下面是一个示例 Web 服务器,它在将项目发送到您想要使用的任何排队系统之前对其进行批处理(在我看来,使用 Python 来使用 RabbitMQ 总是过于复杂。我之前使用过 Celery 和其他更简单的排队系统)。这样,后端只需从队列中获取一个“项目”,其中将包含所有必需的 50 个请求。
import bottle
import threading
import Queue
app = bottle.Bottle()
app.queue = Queue.Queue()
def send_to_rabbitMQ(items):
"""Custom code to send to rabbitMQ system"""
print("50 items gathered, sending to rabbitMQ")
def batcher(queue):
"""Background thread that gathers incoming requests"""
while True:
batcher_loop(queue)
def batcher_loop(queue):
"""Loop that will run until it gathers 50 items,
then will call then function 'send_to_rabbitMQ'"""
count = 0
items = []
while count < 50:
try:
next_item = queue.get(timeout=.5)
except Queue.Empty:
pass
else:
items.append(next_item)
count += 1
send_to_rabbitMQ(items)
@app.route("/add_request", method=["PUT", "POST"])
def add_request():
"""Simple bottle request that grabs JSON and puts it in the queue"""
request = bottle.request.json['request']
app.queue.put(request)
if __name__ == '__main__':
t = threading.Thread(target=batcher, args=(app.queue, ))
t.daemon = True # Make sure the background thread quits when the program ends
t.start()
bottle.run(app)
Run Code Online (Sandbox Code Playgroud)
用于测试它的代码:
import requests
import json
for i in range(101):
req = requests.post("http://localhost:8080/add_request",
data=json.dumps({"request": 1}),
headers={"Content-type": "application/json"})
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
611 次 |
| 最近记录: |