Wor*_*bee 6 .net c# apache-kafka ksqldb
我目前正在 .NET 中使用 Kafka 消费者处理大量 Kafka 消息。
我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。
我不想首先处理(特别是不下载)那些不需要的消息。
看起来 kSql 查询(写为推送查询)可以有效地过滤出我需要处理的消息。
我如何通过 .NET 使用这些?我看到一些文档提到了 REST API,但我怀疑这是一个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以选择性地下载和处理消息,我只会正在处理当前体积的大约三分之一。)
不幸的是,我无法控制发布者,因此我无法更改消息的发布内容/方式。
是的,您可以使用 ksqlDB 来执行此操作
-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT)
WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');
-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
SELECT * FROM my_source WHERE COL1='FOO';
Run Code Online (Sandbox Code Playgroud)
然后使用应用程序中的REST API运行推送查询,该查询将仅使用过滤后的消息:
SELECT * FROM target EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)
除了 ksqlDB 之外,您可能还想看看社区最近发布的项目: https: //github.com/LGouellec/kafka-streams-dotnet
| 归档时间: |
|
| 查看次数: |
4823 次 |
| 最近记录: |