如何在 Reactor 中将 Context 与 flatMap() 一起使用?

Hub*_*ert 4 java project-reactor

我在理解Context 时遇到问题。所以文档说Context是:

通过上下文协议在运算符等组件之间传播的键/值存储。上下文非常适合传输正交信息,例如跟踪或安全令牌。

伟大的。

现在让我们假设我们想使用Context传播一些东西,让它无处不在。要调用另一个异步代码,我们只需使用flatMap()方法。

问题如何访问被调用方法内的上下文?

示例(简单)代码:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
            .flatMap(TestFlatMap::nameToGreeting)
            .subscriberContext(context ->
                Context.of("greetingWord", "Hello")  // context initialized
            );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.just("Hello " + name + " !!!");  // ALERT: we don't have Context here
    }
}
Run Code Online (Sandbox Code Playgroud)

被调用的方法可以(并且很可能会)在另一个类中。

提前感谢您的帮助!

编辑:删除了一些代码,使问题更加简洁明了。

Ole*_*uka 12

链接您的Publishers 并可能Context与您同在

在这种情况下,您连接了所有Publishers(这包括flatMap/concatMap和类似运算符中的连接),您将Context在整个流运行时之间正确传播。

ContextnameToGreeting方法中访问,Mono.subscribeContext如果方法似乎不相关,您可以调用和检索存储的信息事件。下面显示了提到的概念:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.subscriberContext()
                   .filter(c -> c.hasKey("greetingWord"))
                   .map(c -> c.get("greetingWord"))
                   .flatMap(greetingWord -> Mono.just(greetingWord + " " + name + " " + "!!!"));// ALERT: we have Context here !!!
    }
}
Run Code Online (Sandbox Code Playgroud)

此外,您可以通过以下方式使用zip运算符执行相同操作,以便稍后合并结果:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.zip(
            Mono.subscriberContext()
                .filter(c -> c.hasKey("greetingWord"))
                .map(c -> c.get("greetingWord")), // ALERT: we have Context here !!!
            Mono.just(name),
            (greetingWord, receivedName) -> greetingWord + " " + receivedName + " " + "!!!"
        );
    }
}
Run Code Online (Sandbox Code Playgroud)

那么,它为什么有效?

正如我们从上面的示例中看到的,nameToGreeting是在 main 的上下文中调用的Flux。在幕后-> (Here Some FluxFlatMap internals),每个映射Publisher都由FlatMapInner. 如果我们查看FlatMapInner并寻找我们将看到currentContext覆盖,它FlatMapInner使用了 parent Context,这意味着如果父级有一个 Reactor Context- 那么这个上下文将被传播到每个 inner Publisher

因此,该nameToGreeting方法返回的Mono将与Context它的父级相同


小智 11

Reactor-Core v3.4 引入了Mono.deferContextualFlux.deferContextual,取代了v3.3 中引入的Mono.deferWithContextFlux.deferWithContext

使用这些方法,Oleh Dokukas zip 示例可以简化为

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                .flatMap(TestFlatMap::nameToGreeting)
                .subscriberContext(context ->
                        Context.of("greetingWord", "Hello"));  // context initialized
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.deferContextual(c -> Mono.just(name)
                .filter(x -> c.hasKey("greetingWord"))
                .map(n -> c.get("greetingWord") + " " + n + " " + "!!!"));
    }
}
Run Code Online (Sandbox Code Playgroud)