在实时网络服务器中进行批处理和排队

ank*_*kit 7 python queue webserver batch-processing

添加了服务器设计我需要一个Web服务器,它将传入的请求路由到后端工作者,每0.5秒对它们进行一次批处理,或者当它有50个http请求时,以先发生者为准.在python/tornado或任何其他语言中实现它的好方法是什么?

我的想法是将传入的请求发布到rabbitMQ队列,然后以某种方式将它们一起批处理,然后再发送到后端服务器.我无法弄清楚的是如何从rabbitMq队列中选择多个请求.有人能指出我正确的方向或建议一些替代的apporach?

Cas*_*mon 1

我建议使用简单的 python 微型网络框架,例如 Bottle。然后,您将通过队列将请求发送到后台进程(从而允许连接结束)。

然后,后台进程将有一个连续的循环来检查您的条件(时间和数量),并在满足条件后执行工作。

编辑:

下面是一个示例 Web 服务器,它在将项目发送到您想要使用的任何排队系统之前对其进行批处理(在我看来,使用 Python 来使用 RabbitMQ 总是过于复杂。我之前使用过 Celery 和其他更简单的排队系统)。这样,后端只需从队列中获取一个“项目”,其中将包含所有必需的 50 个请求。

import bottle
import threading
import Queue


app = bottle.Bottle()

app.queue = Queue.Queue()


def send_to_rabbitMQ(items):
    """Custom code to send to rabbitMQ system"""
    print("50 items gathered, sending to rabbitMQ")


def batcher(queue):
    """Background thread that gathers incoming requests"""
    while True:
        batcher_loop(queue)


def batcher_loop(queue):
    """Loop that will run until it gathers 50 items,
    then will call then function 'send_to_rabbitMQ'"""
    count = 0
    items = []
    while count < 50:
        try:
            next_item = queue.get(timeout=.5)
        except Queue.Empty:
            pass
        else:
            items.append(next_item)
            count += 1

    send_to_rabbitMQ(items)


@app.route("/add_request", method=["PUT", "POST"])
def add_request():
    """Simple bottle request that grabs JSON and puts it in the queue"""
    request = bottle.request.json['request']
    app.queue.put(request)


if __name__ == '__main__':
    t = threading.Thread(target=batcher, args=(app.queue, ))
    t.daemon = True  # Make sure the background thread quits when the program ends
    t.start()

    bottle.run(app)
Run Code Online (Sandbox Code Playgroud)

用于测试它的代码:

import requests
import json

for i in range(101):
    req = requests.post("http://localhost:8080/add_request",
                        data=json.dumps({"request": 1}),
                        headers={"Content-type": "application/json"})
Run Code Online (Sandbox Code Playgroud)

  • 谢谢@CasualDemon,当后端完成处理时,我还需要将结果响应给浏览器。如何才能做到这一点? (3认同)