构建微服务事件总线和 REST api (python / Flask)

G. *_*and 5 python rest event-bus microservices

背景

我正在使用微服务架构构建我的第一个应用程序。我将主要使用 Flask 在 Python 中工作。

我正在考虑实现事件/消息总线来协调服务之间的操作。我打算实现的一些服务是:Auth、Users、Posts 和 Chat。该应用程序有两个实体(“用户”和“组”),几乎每个服务都使用它们。我为每个服务都有一个单独的数据库,每个数据库都有自己的usersgroups来管理特定于该服务的用户/组数据。现在,当我考虑创建新用户之类的事件时,每个服务都需要在表中创建一个新条目users,这就是我考虑使用事件总线的原因。

我阅读了这篇文章,其中讨论了 CQRS 和使用 HTTP (REST) 进行服务之间的外部通信,同时使用事件总线进行内部通信。服务处理 (HTTP) 请求,并发出有关数据更改的事件(例如,通过 Auth 服务创建新用户)。其他服务使用可能触发其他进程(和更多事件)的事件。

问题

我困惑的是如何实际实现(在 Python 中)一个服务,该服务侦听 HTTP 请求以及一组订阅通道中的新事件。我知道您需要使用像 redis/rabbitMQ 这样的工具,但是是否可以在同一进程中处理两种类型的请求,或者您是否需要运行两个服务器(一个用于 REST 请求,另一个用于事件处理)?

另外,如果您对上述一般方法/架构有任何意见,我洗耳恭听。

G. *_*and 7

因此,在进行更多研究并构建原型之后,单个服务器可以侦听来自消息代理的 HTTP 请求和事件。但是,它需要运行两个单独的进程(一个用于侦听 HTTP 的 Web 服务器进程,一个用于侦听消息代理的事件进程)。

这是我为原型开发的架构: 在此输入图像描述

核心模块(由文件夹图标表示)代表服务的核心,这是实际更改数据的所有代码。HTTP Server 和Event Worker 都调用核心模块中的方法。HTTP Server 或 Event Worker 都不产生事件,只有核心模块产生事件。

这是一个文件结构:

Project
 |-Foo
 |  |- foo.py
 |  |- web.py
 |  |- worker.py
 |  |- revent.py
 |-Bar
 |  |- bar.py
 |  |- web.py
 |  |- worker.py
 |  |- revent.py
Run Code Online (Sandbox Code Playgroud)

这些web.py文件是简单的烧瓶应用程序:

# bar.py
from flask import Flask, request
from bar import Bar


app = Flask(__name__)

@app.route('/bar')
def bar():
    return Bar.bar_action()

if __name__ == "__main__":
    app.run(port=5001, debug=1)
Run Code Online (Sandbox Code Playgroud)

对于事件工作人员和核心模块,我使用了revent.py我创建的模块(redis + 事件)。它由三个类组成:

  1. 事件——事件的抽象
  2. 生产者——核心模块使用的服务/类,将事件生成到其事件流中。
  3. Worker——一个事件服务器,您可以将事件映射到函数(有点像 Flask 中的路由 HTTP 端点),它还运行事件循环来侦听事件。

在底层,该模块使用redis Streams。我将粘贴下面的代码revent.py

但首先,这里有一个示例 exmaple for bar.py,它由 http 服务器和工作线程调用来完成工作,并向 redis 中的“bar”流发出有关其正在执行的工作的事件。

# Bar/bar.py
from revent import Producer
import redis

class Bar():
    ep = Producer("bar", host="localhost", port=6379, db=0)

    @ep.event("update")
    def bar_action(self, foo, **kwargs):
        print("BAR ACTION")
        #ep.send_event("update", {"test": str(True)})
        return "BAR ACTION"

if __name__ == '__main__':
    Bar().bar_action("test", test="True")
Run Code Online (Sandbox Code Playgroud)

最后,这是一个示例工作程序,它将侦听“bar”流上的事件Foo/worker.py

# Foo/worker.py
from revent import Worker

worker = Worker()

@worker.on('bar', "update")
def test(foo, test=False):
    if bool(test) == False:
        print('test')
    else:
        print('tested')

if __name__ == "__main__":
    worker.listen(host='127.0.0.1', port=6379, db=0)

Run Code Online (Sandbox Code Playgroud)

正如所承诺的,这是我构建的模块的代码revent.py。可能值得向 pypl 添加一个更进一步开发的版本,但我只是使用符号链接来保持两个版本同步。

# revent.py
import redis
from datetime import datetime
import functools

class Worker:
    # streams = {
    #   "bar": {
    #       "update": Foo.foo_action
    #   },
    # }

    def __init__(self):
        self._events = {}


    def on(self, stream, action, **options):
        """
        Wrapper to register a function to an event
        """
        def decorator(func):
            self.register_event(stream, action, func, **options)
            return func
        return decorator

    def register_event(self, stream, action, func, **options):
        """
        Map an event to a function
        """
        if stream in self._events.keys():
            self._events[stream][action] = func
        else:
            self._events[stream] = {action: func}

    def listen(self, host, port, db):
        """ 
        Main event loop
        Establish redis connection from passed parameters
        Wait for events from the specified streams
        Dispatch to appropriate event handler
        """
        self._r = redis.Redis(host=host, port=port, db=db)
        streams = " ".join(self._events.keys())
        while True:
            event = self._r.xread({streams: "$"}, None, 0) 
            # Call function that is mapped to this event
            self._dispatch(event)

    def _dispatch(self, event):
        """
        Call a function given an event

        If the event has been registered, the registered function will be called with the passed params.
        """
        e = Event(event=event)
        if e.action in self._events[e.stream].keys():
            func = self._events[e.stream][e.action]
            print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
            return func(**e.data)


class Event():
    """
    Abstraction for an event 
    """
    def __init__(self, stream="", action="", data={}, event=None):
        self.stream = stream
        self.action = action
        self.data = data
        self.event_id=None
        if event:
            self.parse_event(event)

    def parse_event(self, event):
        # event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
        self.stream = event[0][0].decode('utf-8')
        self.event_id = event[0][1][0][0].decode('utf-8')
        self.data = event[0][1][0][1]
        self.action = self.data.pop(b'action').decode('utf-8')
        params = {}
        for k, v in self.data.items():
            params[k.decode('utf-8')] = v.decode('utf-8')
        self.data = params

    def publish(self, r):
        body = {
            "action": self.action
        }
        for k, v in self.data.items():
            body[k] = v
        r.xadd(self.stream, body)

class Producer:
    """
    Abstraction for a service (module) that publishes events about itself

    Manages stream information and can publish events
    """
    # stream = None
    # _r = redis.Redis(host="localhost", port=6379, db=0)

    def __init__(self, stream_name, host, port, db):
        self.stream = stream_name
        self._r = redis.Redis(host="localhost", port=6379, db=0)

    def send_event(self, action, data):
        e = Event(stream=self.stream, action=action, data=data)
        e.publish(self._r)

    def event(self, action, data={}):
        def decorator(func):
            @functools.wraps(func)
            def wrapped(*args, **kwargs):
                result = func(*args, **kwargs)
                arg_keys = func.__code__.co_varnames[1:-1]
                for i in range(1, len(args)):
                    kwargs[arg_keys[i-1]] = args[i]
                self.send_event(action, kwargs)
                return result           
            return wrapped
        return decorator


Run Code Online (Sandbox Code Playgroud)

所以,把它们放在一起。和foo.py模块bar.py分别执行 Foo 和 Bar 服务的实际工作。它们的方法由 HTTP 服务器和事件工作者调用来处理请求/事件。在完成工作时,这两个模块会发出有关其状态更改的事件,以便其他感兴趣的服务可以采取相应的行动。HTTP 服务器只是一个使用 Flask 等的普通 Web 应用程序。事件工作者在概念上类似于 Web 服务器,它监听 Redis 中的事件而不是 http 请求。这两个进程(Web 服务器和事件工作线程)需要单独运行。因此,如果您在本地开发,则需要在不同的终端窗口中运行它们或使用容器/进程编排器。

那是很多。我希望它对某人有所帮助,如果您有疑问,请在评论中告诉我。

编辑

我将 revent.py 文件作为包上传到 pypi - redisevents。我将在本周晚些时候添加更多有关如何使用/扩展它的文档。