相关疑难解决方法(0)

如何使用Kafka Stream DSL使用处理器过滤密钥和值

我有一个与StateStore交互的处理器,用于过滤并对消息执行复杂的逻辑.在process(key,value)context.forward(key,value)用来发送我需要的键和值的方法中.出于调试目的,我也打印出来.

我有一个KStream mergedStream,它来自另外两个流的连接.我想将处理器应用于该流的记录.我实现了这个:mergedStream.process(myprocessor,"stateStoreName")

当我启动这个程序时,我可以看到要打印到我的控制台的正确值.但是,如果我使用主题上mergedStream.to("topic")的值将mergedStream发送到主题,则不是我在处理器中转发的那个,而是原始的.

我使用kafka-streams 0.10.1.0.

将我在处理器中转发的值转换为另一个流的最佳方法是什么?

是否可以将Processor APIKStream DSL创建的流混合使用?

apache-kafka-streams

6
推荐指数
1
解决办法
3398
查看次数

如何将自定义StateStore添加到Kafka Streams DSL处理器?

对于我的一个Kafka流应用程序,我需要使用DSL和Processor API的功能.我的流媒体应用流程是

source -> selectKey -> filter -> aggregate (on a window) -> sink
Run Code Online (Sandbox Code Playgroud)

在聚合之后,我需要向接收器发送SINGLE聚合消息.所以我将拓扑定义如下

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
      .filterNot((k,v) -> k.equals("UnknownGroup"))
      .process(() -> new MyProcessor());
Run Code Online (Sandbox Code Playgroud)

我定义了一个自定义StateStore并将其注册到我的处理器,如下所示

public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context = null;
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
        .withKeys(Serdes.String())
        .withValues(invSerde)
        .persistent()
        .build()
        .get();

    public MyProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.register(invStore, false, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

3
推荐指数
1
解决办法
4324
查看次数

标签 统计

apache-kafka-streams ×2