smu*_*kes 6 java concurrency producer-consumer java.util.concurrent
"2)制片人不需要知道谁是消费者或消费者有多少消费者.消费者也是如此."
我的问题是我需要尽快从Web服务器到客户端获取一系列数据.客户可以出现在计算中期.不同时间的多个客户端可以请求数据数组.一旦计算完成,它就会被缓存,然后就可以简单地读取它.
Exmaple用例:在计算过程中,我希望尽快为数组的每个数据提供服务.我不能使用BlockingQueue,因为如果第二个客户端开始请求数组,而第一个客户端已经在数组的前半部分使用了.take().然后第二个客户错过了一半的数据!我需要一个BlockingQueue,你不必带(),但你可以只读(int index).
解?我的数组上有大量的写入,所以我不想使用CopyOnWriteArrayList?Vector类应该工作但效率低下?是否最好像这样使用ThreadSafeList 并添加一个waitForElement()函数?我只是不想重新发明轮子,我更喜欢针对多线程问题的人群测试解决方案......
据我了解,您需要将broadcast数据传输到subscribers/clients. 以下是我所知道的一些处理方法。
纯 Java 解决方案,每个客户端都有一个,BlockingQueue每次广播消息时,都会将其放入每个队列中。
for(BlockingQueue client: clients){
client.put(msg);
}
Run Code Online (Sandbox Code Playgroud)RxJava提供了一种反应式方法。subscribers每当您发送消息时,客户emit都会收到通知,订阅者可以选择取消订阅
Observable<String> observable = Observable.create(sub->{
String[] msgs = {"msg1","msg2","msg3"};
for (String msg : msgs) {
if(!sub.isUnsubscribed()){
sub.onNext(msg);
}
}
if (!sub.isUnsubscribed()) { // completes
sub.onCompleted();
}
});
Run Code Online (Sandbox Code Playgroud)
现在多个订阅者可以选择接收消息。
observable.subscribe(System.out::println);
observable.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
Observables 有点functional,他们可以选择他们需要的。
observable.filter(msg-> msg.equals("msg2")).map(String::length)
.subscribe(msgLength->{
System.out.println(msgLength); // or do something useful
});
Run Code Online (Sandbox Code Playgroud)