从多个线程同时使用同一个 FluxSink 是否安全

kaq*_*qao 1 java concurrency spring project-reactor reactive-streams

我知道 aPublisher不能同时发布,但是如果我使用Flux#create(FluxSink),我可以安全地FluxSink#next同时调用吗?

换句话说,即使FluxSink#next并发调用,Spring 是否具有确保事件正确串行发布的内部魔法?

public class FluxTest {

    private final Map<String, FluxSink<Item>> sinks = new ConcurrentHashMap<>();

    // Store a new sink for the given ID
    public void start(String id) {
        Flux.create(sink -> sinks.put(id, sink));
    }

    // Called from different threads
    public void publish(String id, Item item) {
        sinks.get(id).next(item); //<----------- Is this safe??
    }
}
Run Code Online (Sandbox Code Playgroud)

的声音,我喜欢这一段的官方指南中指出,上述确实是安全的,但我不是在我的理解非常有信心。

create 是一种更高级的 Flux 编程创建形式,适用于每轮多次发射,甚至来自多个线程。

Sim*_*slé 6

是的,Flux.create产生一个SerializedSink可以从多个线程安全使用的next调用

  • @Dachstein(StandaloneFluxSink 来自过时的 Milestone 或 RC)默认情况下,`Sinks.Many` 会_检测_多线程使用并失败。他们有一个“tryEmitNext” API,它将立即返回一个枚举,其中一个表示尝试并行发射的事实。然后,您可以选择乐观地重试/忙循环,或者将其视为错误。 (3认同)
  • 我认为该评论更多的是关于这些“FluxSink”是根据“订阅”创建的事实,因此在对“Flux.create(...)”进行多个订阅的情况下,在字段中捕获一个可能会产生误导。请注意,在 3.4 中,我们引入了一种新风格的“Sinks”,它更适合这种“在具有多个可能订阅的字段中独立”的使用模式。线程安全注释仍然有效,例如。可以从“Flux.create”生成多个线程,并将每个线程的数据提供给同一个“FluxSink”。 (2认同)
  • 仅由 Flux 创建。它旨在为每个订阅创建一个接收器 (2认同)