将 Flux 条目与 previous 和 map 相结合

Wes*_*Wes 0 java project-reactor

问题

使用 Flux 如何访问前一个元素?

背景

我有一个外部事件流,它按顺序提供事件,该流的顺序是调度一个事件,然后立即调度另一个事件。然而,第二个事件的元数据位于第一个事件中。

请注意,事件数量并不总是偶数。

我想做的是将事件组合成事件流以供下游使用。

Flux#zip看起来很有希望,但这意味着返回外部事件类型的对象。

初始代码

到目前为止我所拥有的是。

    BinaryLogClient client = new BinaryLogClient(host, port, username, password);
    Flux<Event> bridge = Flux.create(sink -> {
        EventListener fluxListener = event -> {
            sink.next(event);
        };

        client.registerEventListener(fluxListener);
    });

    bridge.subscribe(DemoApplication::printEvent);
    bridge.map(new EventPairMemorizer());


public class EventPair  {
    private final Event previous;
    private final Event current;

    public EventPair(Event previous, Event current) {
        this.previous = previous;
        this.current = current;
    }

    /**
     * @return `null` if no previous events.
     */
    public Event getPrevious() {
        return previous;
    }

    public Event getCurrent() {
        return current;
    }
}

/**
 * Not thread safe has to go on a single thread
 */
public class EventPairMemorizer implements Function<Event, EventPair> {
    Event previous = null;

    EventPair toPair(Event e) {
        EventPair pair = new EventPair(previous, e);
        previous = e;
        return pair;
    }

    @Override
    public EventPair apply(Event current) {
        return toPair(current);
    }
}
Run Code Online (Sandbox Code Playgroud)

这部分是一个学习练习,部分是一个概念验证。

不相关的细节

我正在尝试使用 mysql-binlog-connector-java 来获取有关数据库中更改内容的流。

因此,如果我收到一个EXT_WRITE_ROWS事件,则前一个事件就是一个TABLE_MAP事件。然后我想对TABLE_MAP事件进行列查找(使用 jdbc)。然后转换为一些 JSON 友好的内部结构。

这同样适用于EXT_UPDATE_ROWS事件。

所以想法代码看起来像

  1. onExternalEvent 推送到 Flux
  2. 检查事件类型。如果使用 Mono 在 jdbc 线程上匹配调用 jdbc
  3. 结合 Mono 和当前事件。
  4. 映射到内部类型。
  5. 发送到不同的流。
  6. 利润

Sim*_*slé 6

重叠缓冲区怎么样?

您将为buffer(2, 1)每个元素打开一个缓冲区,每个缓冲区将包含 2 个元素。

然后,您可以忽略不以您感兴趣的事件结尾的缓冲区,并获取您感兴趣的事件的先前值...