消费者 - 网络服务器中的生产者问题流式传输数据

smu*_*kes 6 java concurrency producer-consumer java.util.concurrent

制片人 - 消费者博客文章指出:

"2)制片人不需要知道谁是消费者或消费者有多少消费者.消费者也是如此."

我的问题是我需要尽快从Web服务器到客户端获取一系列数据.客户可以出现在计算中期.不同时间的多个客户端可以请求数据数组.一旦计算完成,它就会被缓存,然后就可以简单地读取它.

Exmaple用例:在计算过程中,我希望尽快为数组的每个数据提供服务.我不能使用BlockingQueue,因为如果第二个客户端开始请求数组,而第一个客户端已经在数组的前半部分使用了.take().然后第二个客户错过了一半的数据!我需要一个BlockingQueue,你不必带(),但你可以只读(int index).

解?我的数组上有大量的写入,所以我不想使用CopyOnWriteArrayList?Vector类应该工作但效率低下?是否最好像这样使用ThreadSafeList 并添加一个waitForElement()函数?我只是不想重新发明轮子,我更喜欢针对多线程问题的人群测试解决方案......

Sle*_*idi 2

据我了解,您需要将broadcast数据传输到subscribers/clients. 以下是我所知道的一些处理方法。

  • 纯 Java 解决方案,每个客户端都有一个,BlockingQueue每次广播消息时,都会将其放入每个队列中。

    for(BlockingQueue client: clients){
      client.put(msg);
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • RxJava提供了一种反应式方法subscribers每当您发送消息时,客户emit都会收到通知,订阅者可以选择取消订阅

    Observable<String> observable = Observable.create(sub->{
        String[] msgs = {"msg1","msg2","msg3"};
        for (String msg : msgs) {
            if(!sub.isUnsubscribed()){
                sub.onNext(msg);
            }
        }
        if (!sub.isUnsubscribed()) { // completes
            sub.onCompleted();
        }
    
    });
    
    Run Code Online (Sandbox Code Playgroud)

    现在多个订阅者可以选择接收消息。

    observable.subscribe(System.out::println);
    observable.subscribe(System.out::println);
    
    Run Code Online (Sandbox Code Playgroud)

    Observables 有点functional,他们可以选择他们需要的。

    observable.filter(msg-> msg.equals("msg2")).map(String::length)
       .subscribe(msgLength->{
        System.out.println(msgLength); // or do something useful
    });
    
    Run Code Online (Sandbox Code Playgroud)
  • Akka提供广播路由器