Bas*_*deh 4 django orm cqrs event-sourcing apache-kafka
我正在使用 Django 为微服务架构构建身份验证服务。它更像是一个实验,而不是现实世界的实施,以了解更多有关事件溯源的信息。
我确实知道 Django 有一个 ORM,它与框架紧密结合,使用 Flask 将是一种更简单的方法,但我正在尝试找到一种解决方法。
用例非常简单。用户注册后,他会收到一封电子邮件来激活他的帐户。用户再次被激活,我发送一封电子邮件,通知他他的帐户已激活。
根据我的理解,这是在事件源系统中。事件被触发并存储,我们可以获取最新状态并将其存储在数据库中。就我而言,它将是使用 Django ORM 的 Postgres。
我使用 django 中的信号将事件发布到 kafka。pre_save() 信号。然后模型将保存该对象。
如果有尚未实施的更新,请参见下文。我只会发布更新的字段。并更新 Django 中的对象。
有人看到这种方法有任何警告吗?或者在模型的保存方法中实现它会更好吗?
我很想听听您对此的反馈。
# app/services.py
class KafkaService:
def __init__(self):
try:
self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKERS,
value_serializer=lambda m: json.dumps(m).encode('ascii'))
except KafkaError as ke:
logger.exception(f"Something went wrong creating a kafka producer: {ke}")
self.producer = None
except Exception as ex:
logger.exception(f"Something went wrong creating a kafka producer: {ex}")
self.producer = None
def publish(self, topic, key, data):
if not self.producer:
logger.error(f"Kafka Producer could not establish a connection")
pass
try:
self.producer.send(topic, key=bytes(key, encoding='utf-8'), value=data)
self.producer.flush()
logger.info("Message has been published to Kafka")
except Exception as ex:
logger.info(f"Errored while publishing to Kafka: {ex}")
Run Code Online (Sandbox Code Playgroud)
# app/events.py
class UserEvent:
def __init__(self, event_store): # event store here is Kafka
""" A user event class which is an injectable. I am using here.
I need a key for kafka to place the correct event in the correct partition.
:parameters:
- event_store: a class form example :class:`KafkaService` which publishes data
"""
self.event_store = event_store
def user_created(self, email, data):
""" Publish an event to the event store when a user is created
:param email: string
:param data: string - json
"""
self.event_store.publish('user-created', email, data)
def user_activated(self, email, data):
""" Publish an event to the event store when a user is activated """
self.event_store.publish('user-activated', email, data)
Run Code Online (Sandbox Code Playgroud)
# app/signals.py
kafka_service = KafkaService()
user_event = UserEvent(kafka_service)
def user_added_event(sender, instance, created, **kwargs):
if created:
from users.api.v1.serializers import UserSerializer # Avoid (Apps Not Read)
value = UserSerializer(instance).data
user_event.user_created(instance.email, value)
else:
logger.info("User Updated")
Run Code Online (Sandbox Code Playgroud)
有人看到这种方法有任何警告吗?或者在模型的保存方法中实现它会更好吗?
通常的设计将领域模型(也称为业务逻辑)与持久性问题分开。
如果事件序列是您的事实来源,则需要遵循两件事:首先,您需要确保在事件发布之前成功存储它们。其次,您要确保您的写入语义是“第一个写入者获胜”。
如果你做对了,并且每个人都明白数据库中的模型可以及时“落后于”事件存储中的事件,那么你就处于良好状态。
最终一致的系统使“读你自己写的”期望变得具有挑战性。所以你可能会有一些额外的工作。