Art*_*hka 4 scala stream apache-kafka-streams
有一个 kafka 集群,我从中消费两个主题并加入它。使用 join 的结果,我对数据库进行了一些操作。对 DB 的所有操作都是异步的,因此它们返回给我一个 Future(scala.concurrent.Future,但无论如何它与 java.util.concurrent.CompletableFuture 相同)。所以结果我得到了这样的代码:
val firstSource: KTable[String, Obj]
val secondSource: KTable[String, Obj2]
def enrich(data: ObjAndObj2): Future[EnrichedObj]
def saveResultToStorage(enrichedData: Future[EnrichedObj]): Future[Unit]
firstSource.leftJoin(secondSource, joinFunc)
.mapValues(enrich)
.foreach(saveResultToStorage)
Run Code Online (Sandbox Code Playgroud)
我可以在流中使用未来值进行操作,还是有更好的方法来处理异步任务(例如 Akka 流中的 .mapAsync)?
小智 5
我有同样的问题。据我所知,Kafka Streams 的设计目的不是像 Akka Streams 那样处理多速率流。Kafka Streams 没有 Akka 具有的多速率原语等价物,如 mapAsync、throttle、conflate、buffer、batch 等。Kafka Streams 擅长处理主题和有状态数据聚合之间的连接。Akka Streams 擅长多速率和异步处理。
您有几种选择来处理这个问题:
| 归档时间: |
|
| 查看次数: |
1622 次 |
| 最近记录: |