有没有办法使用 .NET 中的 Kafka Ksql Push 查询

Wor*_*bee 6 .net c# apache-kafka ksqldb

我目前正在 .NET 中使用 Kafka 消费者处理大量 Kafka 消息。

我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。

我不想首先处理(特别是不下载)那些不需要的消息。

看起来 kSql 查询(写为推送查询)可以有效地过滤出我需要处理的消息。

我如何通过 .NET 使用这些?我看到一些文档提到了 REST API,但我怀疑这是一个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以选择性地下载和处理消息,我只会正在处理当前体积的大约三分之一。)

不幸的是,我无法控制发布者,因此我无法更改消息的发布内容/方式。

Rob*_*att 2

是的,您可以使用 ksqlDB 来执行此操作

-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT) 
  WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');

-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
  SELECT * FROM my_source WHERE COL1='FOO';
Run Code Online (Sandbox Code Playgroud)

然后使用应用程序中的REST API运行推送查询,该查询将仅使用过滤后的消息:

SELECT * FROM target EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)

除了 ksqlDB 之外,您可能还想看看社区最近发布的项目: https: //github.com/LGouellec/kafka-streams-dotnet

  • @罗宾·莫法特。谢谢。我看到了你提到的流项目,但看起来还很早期。仍然值得一试。您认为通过 REST API 高速处理大量数据真的可行吗?对我来说这似乎有点狡猾 - 但我不是这方面的专家。(如果我必须进行大量 REST 调用,我主要担心速度) (2认同)