如何在 kafka 流应用程序中进行 REST API 调用/

Mac*_*Mac 6 rest model serving apache-kafka apache-kafka-streams

时间序列数据是在 kafka 主题中生成的。我需要读取每条记录并用数据库中的一些数据进行修饰,并最终调用 REST API。收到响应后,输出到 kafka 主题。如何使用 kafka Streams API 高效且可扩展地做到这一点?

脚步 -

  • 开始阅读输入主题
  • 调用mapvalues进行数据库调用并用附加数据装饰记录
  • 使用输入请求进行 REST api 调用,获取响应。
  • 输出kafka主题中的记录

我认为,上述算法有两个瓶颈——

  • 进行数据库调用会减慢速度。这可以通过缓存元数据并在存在错误或使用状态存储时加载元数据来避免。

  • 同步进行 REST API 调用会减慢速度。

    final KStream<String, String> records = builder.stream(InputTopic);

    //This is bad
    final KStream<String, String> output = records
      .mapValues(value ->  { //cache hit otherwise database call});
      .mapValues(value ->  { //prepare http request and convert the http resonse };
    output.to(OutputTopic)

Run Code Online (Sandbox Code Playgroud)

如果数据库调用或 REST API 需要较长时间才能完成,上面的代码将对吞吐量产生依赖性和不利影响。具有相同键的记录不应乱序处理。预计吞吐量约为1m/分钟。当一条记录到达 REST API 时,可以并发进行数据库调用。

不知道如何编写可以在这种情况下扩展的拓扑。我是卡夫卡流的新手。