使用Kafka作为(CQRS)Eventstore.好主意?

Gee*_*Jan 200 cqrs event-sourcing dddd apache-kafka

虽然我之前遇到过Kafka,但我最近才意识到Kafka可能会被用作CQRS,eventstore(的基础).

Kafka支持的要点之一:

  • 事件捕获/存储,当然都是HA.
  • 发布/子结构
  • 能够重放事件日志,允许新订户在事后注册系统.

诚然,我不是100%精通CQRS /事件采购,但这看起来非常接近eventstore应该是什么.有趣的是:我真的找不到关于Kafka被用作事件存储的那么多,所以也许我必须遗漏一些东西.

那么,卡夫卡缺少什么东西才能成为一个好的活动商店?会有用吗?用它生产?对洞察力,链接等感兴趣

基本上,系统的状态是根据系统收到的事务/事件保存的,而不是仅仅保存系统的当前状态/快照,这是通常所做的.(将其视为会计总帐:所有交易最终都会累加到最终状态)这允许各种很酷的事情,但只需阅读所提供的链接.

小智 270

我是卡夫卡的原作者之一.Kafka将作为事件采购的日志工作得非常好.它具有容错能力,可扩展到巨大的数据大小,并具有内置的分区模型.

我们在LinkedIn使用此表单的几个用例.例如,我们的开源流处理系统Apache Samza 内置了对事件源的支持.

我认为你没有太多关于使用Kafka进行事件采购的原因,主要是因为事件采购术语在Kafka最受欢迎的消费者网络空间中似乎并不普遍.

我已经写了一些关于这种风格卡夫卡的使用这里.

  • @Jay:既然我已经对这个话题重新表现出兴趣了,你能否详细说明Kafka*似乎*被设计为让它的已发布消息在一段时间后过期?如果使用Kafka作为事件源,则应无限期地存储消息.它可能是可配置的,但这会造成问题吗? (6认同)
  • 我也对@ Geert-Jan对Jay的问题感兴趣.Kafka不适合实际的事件采购交易方,因为每个域聚合需要一个事件流(主题)(想想数百万).但是,它非常适合从GetEventStore中获取事件.但这只适用于无限保留的事件(在我们的例子中),除了一些简短的评论之外,这似乎不是Kafka支持的用例?我错了吗?例如,Samza假设只有两种情况:基于时间的保留或基于密钥的保留.还有其他.. (4认同)
  • @eulerfx假设我们想使用Kafka作为事件源系统的存储器,应该如何实现乐观锁定/并发? (3认同)
  • 打算发布那个链接:)真棒博客文章.能够发表评论会很好,因为我有很多问题.@ Geert-Jan还看一看"Lambda架构",这个很相似,名字来自Storm作者,大多使用某种基于hadoop的事件日志,很多例子 (2认同)
  • kafka和eventstore之间有什么比较吗?具体来说,我喜欢在事件存储中称为Projections的FRP上的关注。在卡夫卡/萨姆扎有类似的东西吗? (2认同)
  • 事件源要求对日志条目进行排序。据我了解,Kafka不支持跨分区的消息排序,而分区是扩展主题的方法。那么,如何在基于Kafka的事件源系统中与按分区缩放的事件日志一起最佳地处理消息排序? (2认同)

eul*_*rfx 110

Kafka是一个消息传递系统,它与事件存储有很多相似之处,但引用它们的介绍:

Kafka群集保留所有已发布的消息 - 无论它们是否已被消耗 - 在可配置的时间段内.例如,如果保留设置为两天,那么在发布消息后的两天内,它可供消费,之后将被丢弃以释放空间.Kafka的性能在数据大小方面实际上是恒定的,因此保留大量数据不是问题.

因此,虽然可以无限期地保留消息,但期望它们将被删除.这并不意味着您不能将其用作事件存储,但使用其他东西可能更好.看看EventStore的另一种选择.

UPDATE

卡夫卡文件:

事件源是一种应用程序设计风格,其中状态更改被记录为按时间排序的记录序列.Kafka对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的出色后端.

更新2

使用Kafka进行事件采购的一个问题是所需主题的数量.通常在事件源中,每个实体(例如用户,产品等)存在事件流(主题).这样,可以通过重新应用流中的所有事件来重构实体的当前状态.每个Kafka主题由一个或多个分区组成,每个分区都存储为文件系统上的目录.随着znode数量的增加,ZooKeeper也会有压力.

  • 我正在看Kafka并有另一个担忧:我没有注意到乐观并发的任何事情.理想情况下,我可以说:"仅当对象的最新事件仍为N时,才将此事件添加为项目N + 1." (16认同)
  • @John我想如果你已经对非冲突事件进行了权威排序,这意味着他们居住的地方就是你的实际事件存储技术,而Kafka只是被用作二级系统来分发它们. (4认同)
  • 这里也有有价值的信息:https://groups.google.com/forum/#!topic/dddcqrs/rm02iCfffUY (3认同)
  • @Darien:我可能正在使用Redis喂Kafka的设置(使用[Redis Notifications](http://redis.io/topics/notifications))。由于Redis允许乐观并发(使用Watch / multi-exec),因此应该可以 (2认同)
  • @Darien我不是事件源专家,但是我的理解是,一般而言,您不需要乐观并发,因为事件根据定义是对已经发生过的事情的记录。 (2认同)

Kas*_*man 40

我一直回到这个QA.而且我没有发现现有的答案有细微差别,所以我添加了这个.

TL; DR.是或否,取决于您的活动采购用途.

我知道有两种主要的事件源系统.

下游事件处理器=是

在这种系统中,事件发生在现实世界中并被记录为事实.如仓库系统,以跟踪产品的托盘.基本上没有冲突的事件.一切都已经发生,即使它是错的.(即托盘123456放在卡车A上,但预定用于卡车B.)然后通过报告机制检查事实是否异常.Kafka似乎非常适合这种下游事件处理应用程序.

在这种情况下,为什么Kafka人们将其作为事件采购解决方案提倡是可以理解的.因为它与已经使用的方式非常相似,例如,点击流.然而,使用术语事件采购(与流处理相对)的人可能指的是第二种用法......

应用程序控制的事实来源=否

由于用户请求通过业务逻辑,这种应用程序声明自己的事件.由于两个主要原因,卡夫卡在这种情况下效果不佳.

缺乏实体隔离

此方案需要能够为特定实体加载事件流.这样做的常见原因是为业务逻辑构建一个临时写模型,用于处理请求.这样做在卡夫卡是不切实际的.使用每个实体主题可以允许这样做,但是当可能有数千或数百万个实体时,这是非启动性的.这是由于Kafka/Zookeeper的技术限制.建议使用每个类型的主题代替Kafka,但这需要为该类型的每个实体加载事件,以便为单个实体获取事件.由于您无法通过日志位置判断哪些事件属于哪个实体.即使使用快照从已知的日志位置开始,这也可能是大量的事件.但快照无法帮助您更改代码.因为向业务逻辑添加新功能可能会使以前的快照在结构上不兼容.因此,在这些情况下仍然需要进行主题重放以构建新模型.使用瞬态写入模型而不是持久性写入模型的主要原因之一是使业务逻辑变更便宜且易于部署.

缺乏冲突检测

其次,由于针对同一实体的并发请求,用户可以创建竞争条件.保存冲突事件并在事后解决它们可能是非常不受欢迎的.因此,能够防止冲突事件非常重要.为了扩展请求负载,通常使用无状态服务,同时使用条件写入防止写入冲突(仅在最后一个实体事件为#x时写入).Aka乐观并发.Kafka不支持乐观并发.即使它在主题级别支持它,它也需要一直到实体级别才能生效.要使用Kafka并防止冲突事件,您需要在应用程序级别使用有状态的序列化编写器.这是一个重要的架构要求/限制.

更多的信息


每条评论更新

该评论已被删除,但问题是:人们用什么来进行事件存储呢?

似乎大多数人在现有数据库之上推出自己的事件存储实现.对于非分布式的场景,像内部后端或独立的产品,它充分证明了如何创建一个基于SQL的事件存储.并且在各种数据库之上有可用的库.还有EventStore,它是为此目的而构建的.

在分布式场景中,我看到了几种不同的实现.Jet的Panther项目使用Azure CosmosDB,通过Change Feed功能通知侦听器.我在AWS上听到的另一个类似的实现是使用DynamoDB及其Streams功能来通知侦听器.分区键可能应该是用于最佳数据分发的流ID(以减少过度配置的数量).但是,在Dynamo中跨流的完整重播是昂贵的(阅读和成本方面).因此,这个impl也被设置为Dynamo Streams将事件转储到S3.当一个新的监听器上线,或者一个现有的监听器想要一个完整的重放时,它会先读取S3以便赶上.

我目前的项目是一个多租户场景,我在Postgres上推出了自己的项目.像Citus这样的东西似乎适合于可扩展性,可以通过tentant + stream进行分区.

Kafka在分布式场景中仍然非常有用.将每个服务的事件暴露给其他服务是一个非常重要的问题.事件存储不是为此而构建的,但这正是Kafka所做的.每个服务都有自己的内部事实来源(可能是事件存储或其他),但是听听Kafka知道"外部"发生了什么.该团队还可以将其服务活动发布到Kafka,以告知服务所做的有趣事情的"外部".

  • @Dominik 我在更新部分(第 2 段)中提到了 EventStore。我会回去链接它。我已经尝试过了,它的性能令人印象深刻。对于我们的小团队来说,暂时不引入另一个数据库被认为更重要,因此 Postgres(也用于视图)。我们有可能在未来或未来的产品中转向EventStore。 (2认同)
  • @KaseySpeakman 主题与分区不同。一个主题有一个或多个分区。分区保证在任何给定时刻每个组只有一个消费者。以利用它的方式对您的实体进行分区。您不需要每个实体的主题,甚至不需要每个实体的分区。您只需要以这样一种方式对它们进行分区,以确保所有发送到同一实体的命令都进入同一分区。 (2认同)
  • @AndrewLarsson如果您不按实体进行分区,那么您将如何防止实体级别的冲突事件?既然我们已经绕了一圈又回到了并发冲突,那么也许您应该在媒体上发布您自己的文章,或者关于如何在生产中使用 Kafka 进行事件溯源(而不是流处理)的文章。如何通过按类型分区且没有实体级并发控制来完成此任务。我会读它,如果我不同意,我什至不会在评论中攻击你。 (2认同)
  • @KaseySpeakman 以这种方式使用 Kafka 无论如何都不容易。但是,如果您已经认真考虑过 CQRS 和事件溯源,那么您就无法以简单的方式做事。您的并发模型对您的规模有直接影响 - 不要随意选择一个。此外,HTTP 并不是一种可靠的传输方式,而且,如果您处于这种规模,您就无法花时间解决丢失和/或重复的消息问题。这都可以通过在客户端和命令处理器之间使用 Kafka 来解决,但是,这是以复杂性为代价的。 (2认同)

Ale*_*rev 31

所有现有的答案似乎都非常全面,但有一个术语问题,我想在我的答案中解决。

什么是事件溯源?

似乎如果你看五个不同的地方,你会得到这个问题的五个不同答案。

然而,如果你看一下 Greg Young 2010 年的论文,它从第 32 页开始很好地总结了这个想法,但它没有包含最终的定义,所以我敢自己制定它。

事件溯源是一种持久状态的方法。您不必因状态突变而将一种状态替换为另一种状态,而是保留代表该突变的事件。因此,您始终可以通过读取所有实体事件并按顺序应用这些状态突变来获取实体的当前状态。通过这样做,当前实体状态将成为该实体所有事件的左折叠

“好的”事件存储(数据库)意味着什么?

任何持久化机制都需要执行两个基本操作:

  • 将新的实体状态保存到数据库
  • 从数据库中检索实体状态

这就是 Greg 谈论实体流概念的地方,其中每个实体都有自己的事件流,由实体 id 唯一标识。当您拥有一个能够通过实体 ID 读取所有实体事件(读取流)的数据库时,使用事件溯源并不是一个难题。

正如 Greg 的论文在 CQRS 背景下提到事件溯源一样,他解释了为什么这两个概念可以很好地配合。尽管您有一个充满一堆实体的原子状态突变的数据库,但查询多个实体的当前状态是一项艰巨的工作。通过分离用作事实来源的事务(事件源)存储和报告(查询、读取)存储(用于跨多个实体报告和查询当前系统状态)来解决该问题。查询存储不包含任何事件,它包含根据查询数据的需要组成的多个实体的预计状态。它不一定需要包含每个实体的快照,您可以自由选择查询模型的形状和形式,只要您可以将事件投影到该模型即可。

因此,“正确的”事件数据库需要支持我们所说的实时订阅,它将向查询模型提供新的(和历史的,如果我们需要重播)事件以进行项目。

我们还知道,在做出有关允许的状态转换的决策时,我们需要掌握实体状态。例如,已经执行的汇款不应执行两次。由于查询模型根据定义是过时的(即使是几毫秒),因此当您对过时数据做出决策时,它会变得很危险。因此,在对实体执行操作时,我们使用事务(事件)存储中最新且完全一致的状态来重建实体状态。

有时,您还想从数据库中删除整个实体,这意味着删除其所有事件。例如,这可能是遵守 GDPR 的要求。

那么,用作事件存储的数据库需要哪些属性才能使事件溯源系统正常工作?一些:

  • 使用实体 id 作为键将事件附加到有序的仅附加日志
  • 使用实体 id 作为键,按有序顺序加载单个实体的所有事件
  • 使用实体 id 作为键删除给定实体的所有事件
  • 支持实时订阅项目事件查询模型

卡夫卡是什么?

Kafka 是一个高度可扩展的消息代理,基于仅附加日志。Kafka 中的消息是根据主题生成的,现在一个主题通常包含一种消息类型,以便与模式注册表很好地配合。一个主题可以是类似于cpu-load 的主题,我们可以在其中生成许多服务器的 CPU 负载的时间序列测量值。

Kafka 主题可以分区。分区允许您并行生成和使用消息。消息仅在单个分区内排序,并且您通常需要使用可预测的分区键,以便 Kafka 可以跨分区分发消息。

现在,让我们看一下清单:

  • 可以将事件附加到 Kafka 吗?是的,它叫做“农产品”。您可以使用实体 id 作为键来附加事件吗?并非如此,因为分区键用于跨分区分发消息,所以它实际上只是一个分区键。另一个答案中提到的一件事是乐观并发。如果您使用关系数据库,您可能会使用该Version列。对于 NoSQL 数据库,您可能使用过文档 eTag。两者都允许您确保更新处于您了解的状态的实体,并且它在操作过程中没有发生变化。Kafka不为您提供任何支持此类状态转换的乐观并发性。
  • 您可以使用实体 ID 作为键,从 Kafka 主题中读取单个实体的所有事件吗?不,你不能。由于 Kafka 不是数据库,它的主题没有索引,因此从主题检索消息的唯一方法就是使用它们。
  • 您可以使用实体 id 作为键从 Kafka 中删除事件吗?,这是不可能的。仅当消息的保留期到期后,才会从主题中删除消息。
  • 您能否订阅 Kafka 主题以按顺序接收实时(和历史)事件,以便将它们投影到您的查询模型中?是的,因为主题是分区的,所以您可以扩展您的预测以提高性能。

Kafka 和 Sansa(或 Kafka Streams)怎么样?

将事件折叠为状态的某种表示形式,并将该状态异步存储在另一个数据库中的能力是事件溯源的一个附带功能。我们通常将这些操作称为“投影”,因为您可以以多种不同的方式折叠事件以进行陈述。这是一个有用的功能,因为您可以随意构建特定于用例的查询模型,并从开始或从某个时间点开始重建它们,因为您可以使用日志中的事件的完整历史记录。

然而,这不是事件溯源的目的,因为您可以使用队列执行完全相同的操作,并且没有人说过通过队列传递消息并更新消息使用者中的数据库记录是“事件溯源”。

其他需要考虑的事情

  • 将事件投影到持久状态是单向操作。如果您犯了错误,则无法恢复它,因为状态已经持续存在。您必须停止系统,重新投影所有内容,然后再次启动系统。这可能需要几个小时,甚至几周的时间。
  • 您无法查看单个实体的历史记录,因为可能有数百万个实体的所有事件都存储在单个主题中。您需要扫描整个主题才能弄清楚该实体发生了什么。
  • 当您收到用户删除其数据的请求时,如果不重新删除主题之间的事件,您将无法执行此操作。基本上,您必须提前考虑并应用相当复杂的模式,例如提前进行加密粉碎,否则将面临不符合当地隐私法规的风险。忽视这些规定可能最终会让您的公司破产。

那么,为什么人们继续这样做呢?

我相信,很多人声称 Kafka 是事件溯源系统的事件存储的不错选择的原因是他们将事件溯源与简单的发布-订阅(您可以使用炒作词“EDA”,或者改为事件驱动架构)。使用消息代理将事件扇出到其他系统组件是几十年来众所周知的模式。“经典”代理的问题是,消息一旦被消耗就会消失,因此您无法构建类似于从历史记录构建的查询模型之类的东西。另一个问题是,在投影事件时,您希望它们以与生成事件相同的顺序被消费,而“经典”代理通常旨在支持竞争消费者模式,该模式根据定义不支持有序消息处理。毫无疑问,Kafka支持竞争消费者,它限制每个或多个分区有一个消费者,但反之则不然。Kafka很好地解决了排序问题和历史消息保留问题。因此,您现在可以根据通过 Kafka 推送的事件构建查询模型。但这并不是事件溯源的最初想法,而是我们今天所说的 EDA。一旦这种分离变得清晰,我们希望不再看到这样的说法:任何仅附加事件日志都是事件源系统的事件存储数据库的良好候选者。


ken*_*sai 14

您可以使用Kafka作为事件存储,但我不建议这样做,尽管它可能看起来不错:

  • Kafka仅保证至少一次交付,并且事件存储中存在无法删除的重复项. 更新: 在这里你可以阅读为什么Kafka如此难以及最新消息如何最终实现这种行为:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how -apache -卡夫卡不-IT /
  • 由于不变性,当应用程序发展并且需要转换事件时,没有办法操纵事件存储(当然有像upcasting这样的方法,但是......).一旦可能会说你永远不需要转换事件,但这不是正确的假设,可能会有你原始备份的情况,但你将它们升级到最新版本.这是事件驱动架构中的有效要求.
  • 没有地方可以保留实体/聚合的快照并且重放将变得越来越慢.从长远角度来看,创建快照是事件存储的必备功能.
  • 鉴于Kafka分区是分布式的,并且与数据库相比,它们很难管理和备份.数据库简单易懂:-)

所以,在你做出选择之前,你要三思而后行.事件存储作为应用程序层接口(监视和管理),SQL/NoSQL存储和Kafka作为代理的组合是比Kafka处理这两个角色以创建完整功能完整解决方案更好的选择.

事件存储是一项复杂的服务,如果您认真考虑在事件驱动架构中应用事件源,CQRS,Sagas和其他模式并保持高性能,则需要更多Kafka所能提供的服务.

随意挑战我的答案!您可能不喜欢我对您最喜欢的具有大量重叠功能的经纪人所说的话,但Kafka仍然不是设计为事件存储,而是更多地作为高性能代理和缓冲区同时处理快速生产者与慢速消费者场景,例如.

请查看eventuate.io微服务开源框架,以发现有关潜在问题的更多信息:http://eventuate.io/

自2018年2月8日起更新

我没有收录评论中的新信息,但同意其中的一些方面.此更新更多是关于微服务事件驱动平台的一些建议.如果您认真对待微服务稳健设计和最高性能,我将为您提供一些您可能感兴趣的提示.

  1. 不要使用Spring - 它很棒(我自己经常使用它),但同时又重又慢.它根本不是微服务平台.它只是一个框架来帮助你实现一个(这背后的很多工作......).其他框架"只是"轻量级REST或JPA或不同的框架.我推荐可能是一流的开源完整微服务平台,它可以回归纯Java根源:https: //github.com/networknt

如果您对性能有疑问,可以将自己与现有的基准测试套件进行比较. https://github.com/networknt/microservices-framework-benchmark

  1. 根本不要使用Kafka :-))这是半开玩笑.我的意思是,虽然卡夫卡很棒,但它是另一个以经纪人为中心的系统.我认为未来是在无需经纪人的邮件系统中.你可能会感到惊讶,但有比Kafka系统更快的速度:-),当然你必须降到更低的水平.看看纪事.

  2. 对于事件存储,我建议使用名为TimescaleDB的高级Postgresql扩展,它专注于大容量的高性能时间序列数据处理(事件是时间序列).当然CQRS,事件采购(重放等功能)都是在light4j框架中构建的,它使用Postgres作为低存储空间.

  3. 对于消息传递,请尝试查看Chronicle Queue,Map,Engine,Network.我的意思是摆脱这种老式的经纪人中心解决方案,并采用微信息系统(嵌入式系统).Chronicle Queue实际上比Kafka更快.但我同意这不是一个解决方案,你需要做一些开发,否则你去购买企业版(付费一个).最后,通过消除维护Kafka集群的负担,将从Chronicle构建您自己的消息传递层.

  • 顺便说一下,Kafka 现在支持一次交付。更新项目符号 1 (2认同)

Dmi*_*sky 7

是的,您可以将Kafka用作事件存储。它工作得很好,尤其是在引入Kafka Streams的情况下尤其如此,它提供了Kafka本机的方式将事件处理为可以查询的累积状态

关于:

重播事件日志的能力,使新订户可以在事件发生后向系统注册。

这可能很棘手。我在这里详细介绍了这一点:https : //stackoverflow.com/a/48482974/741970