版本冲突,当前版本 [2] 与提供的版本 [1] 不同

ali*_*ina 2 java elasticsearch apache-kafka apache-spark

我有一个 Kafka 主题和一个 Spark 应用程序。Spark 应用程序从 Kafka 主题获取数据,对其进行预聚合并将其存储在 Elastic Search 中。听起来很简单,对吧?

一切都按预期工作正常,但是当我将“spark.cores”属性设置为 1 以外的值时,我开始得到

version conflict, current version [2] is different than the one provided [1]
Run Code Online (Sandbox Code Playgroud)

经过一番研究后,我认为错误是因为多个核心可以同时拥有相同的文档,因此,当一个核心完成其聚合并尝试写回文档时,它会收到此错误

说实话,我对这种行为感到有点惊讶,因为我认为 Spark 和 ES 会自己处理这个问题。这让我相信,也许我的方法有问题。

我怎样才能解决这个问题?我需要遵循某种“同步”或“锁定”概念吗?

干杯!

Wil*_*urn 5

听起来队列中有几条消息,它们都更新同一个 ES 文档,并且这些消息正在同时处理。有两种可能的解决方案:

\n

首先,可以使用Kafka分区来确保更新同一个ES文档的所有消息都按顺序处理。这假设\xe2\x80\x99s 消息中有一些\xe2\x80\x99s 属性,Kafka 可以使用这些属性来确定消息如何映射到 ES 文档。

\n

另一种方法是处理乐观并发冲突的标准方法:重试事务。如果你有一些来自Kafka消息的数据需要添加到ES文档中,并且ES中当前的文档是版本1,那么你可以尝试更新它并保存回版本2。但是如果其他人已经写了版本2 ,您可以使用版本 2 作为起点,添加新数据并保存版本 3 来重试。

\n

如果这些方法中的任何一种破坏了您期望从 Kafka 和 Spark 获得的并发性,那么您可能需要重新考虑您的方法。您可能必须引入一个新的处理阶段,该阶段会执行一些繁重的工作,但\xe2\x80\x99不会实际写入ES,然后在单独的步骤中进行ES更新。

\n