小编Dim*_*der的帖子

如何从 Transformer 输出多条记录?

给定:带有KStream::transform. 作为Transformer::transform执行的一部分,从输入一 ( KeyValue<String, Message>)生成多条消息。

我可能可以KeyValue<String, List<Message>>从返回对象Transformer::transform并将其应用flatMapValues为拓扑中的下一个处理器来平整列表。但是我想知道是否可以ProcessorContext::forward用于相同的目标,即

public KeyValue<String, Message> transform(String key, Message message) {
        Iterable<Message> messages = generateMultipleFromOne(message);
        messages.forEach(m->context.forward(key, m));

        return null;
}
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
929
查看次数

使用 selectKey 和转换对 DSL 拓扑进行流重新分区

我觉得我可能缺少一些非常基本的东西,但我还是会问。

有多个分区的输入主题。我使用 selectKey 作为 DSL 拓扑的一部分。selectKey 始终返回相同的值。我的期望是,在 selectKey() 触发的内部重新分区之后,拓扑中的下一个处理器将在同一个分区上为同一个键调用。然而,transform() 的下一个处理器在不同分区上为相同的键调用。

拓扑:

    Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();


        builder
            .stream("in-topic", Consumed.with(Serdes.String(), new JsonSerde<>(CatalogEvent.class)))
            .selectKey((k,v) -> "key")
            .transform(() -> new Processor())
            .print();

        return builder.build();
    }
Run Code Online (Sandbox Code Playgroud)

转换使用的处理器类

public class Processor implements Transformer<String, CatalogEvent, KeyValue<String, DispEvent>> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public KeyValue<String, DispEvent> transform(String key, CatalogEvent catalogEvent) {
        System.out.println("key:" + key + " partition:" + context.partition());
        return null;
    }

    @Override …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
817
查看次数

标签 统计

apache-kafka ×2

apache-kafka-streams ×2

java ×1