我正在寻找一个简单的生产者 - Java 中的消费者实现,不想重新发明轮子
我找不到同时使用新并发包和 Piped 类的示例
是否有为此使用PipedInputStream和新的 Java 并发包的示例?
有没有更好的方法而不使用 Piped 类来完成这样的任务?
我正在尝试处理从MQ基础结构获得的一些消息.我有两个阻塞队列,sharedQueue和pubQueue.在sharedqueue得到与我从MQ基础设施below.It得到会把消息给邮件填满sharedQueue.
client.setCallback(new CallBack("inst",sharedQueue));
messagemanipulator线程将从中读取sharedQueue,处理它并将响应放入以pubQueue供稍后发布.
新的MessageManipulatorThread(sharedQueue,pubQueue).run();
发布者线程将从pubQueueMQ基础结构中获取消息并将其发布到MQ基础结构.
new PublisherThread(pubQueue).run();
以下是完整代码:
public class ArrayBlockingQueueExample {
private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);
private BlockingQueue<String> pubQueue = new ArrayBlockingQueue<>(64);
public static void main(String[] args) throws MqttException, Exception {
new ArrayBlockingQueueExample().startThreads();
}
public void startThreads() throws MqttException, Exception{
MqttClient client = new MQTTClientFactory().getInstance();
client.setCallback(new CallBack("inst", sharedQueue));
new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();
}
public MessageManipulatorThread( BlockingQueue<String> sharedQueue , BlockingQueue<String> pubQueue){
this.sharedQueue = sharedQueue; …Run Code Online (Sandbox Code Playgroud) 嗨,我是Java多线程环境的新手.我必须在一个线程中添加,更新和删除集合中的对象集.同时我将检查并在另一个线程上迭代该对象.
列表和设置不是线程安全的.任何人都可以分享一下,哪个集合/ Map类更适合在多线程环境中使用?
java collections concurrency multithreading java.util.concurrent
我有4个观察者正在监听可观察的数据.然而,我的一个观察者是较慢的,可以采取.我刚看到notifyObserver的代码为: -
132 public void notifyObservers(Object arg) {
133 /*
134 * a temporary array buffer, used as a snapshot of the state of
135 * current Observers.
136 */
137 Object[] arrLocal;
138
139 synchronized (this) {
/**comment removed for clarity **/
152 if (!changed)
153 return;
154 arrLocal = obs.toArray();
155 clearChanged();
156 }
157
158 for (int i = arrLocal.length-1; i>=0; i--)
159 ((Observer)arrLocal[i]).update(this, arg);
160 }
Run Code Online (Sandbox Code Playgroud)
从代码中可以清楚地看到观察者是一个接一个地被调用的.由于设计观察者在执行中是独立的.不应该同时调用它们arg作为最终的功能吗?
执行的时间本t1+t2+t3+t4应该是max(t1,t2,t3,t4).我可以使 …
java concurrency design-patterns java.util.concurrent observer-pattern
我知道,有两种方法可以在java中建立before-before关系:synchronized块和方法,volatile关键字.(如果我是正确的,它不适用于最终字段).我的问题是:并发包中的原子变量是否相似?可以发生 - 之前由他们建立?
java concurrency multithreading java.util.concurrent happens-before
我认为主线程必须在子线程之后结束。但是下面的代码显示在打印“异步结束”之前完成的过程。是什么原因?谁能解释一下?谢谢。
import java.util.concurrent.CompletableFuture;
public class Test {
public static void main(String[] args) {
CompletableFuture.runAsync(() -> {
try {
System.out.println("async start");
Thread.sleep(3000);
System.out.println("async end");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("main end");
}
}
Run Code Online (Sandbox Code Playgroud) 我想打电话给cancel上CompletableFuture。
从文档看来:
如果尚未完成,则使用 CancellationException 完成此 CompletableFuture。尚未完成的从属 CompletableFutures 也将异常完成,此 CancellationException 会导致 CompletionException。
它应该异常地完成它们,这正是我所期望的,但相反,它会立即抛出 CancellationException。
这是一个示例代码
CompletableFuture<?> f = CompletableFuture.supplyAsync(() -> false);
f.cancel(true); // Line 7.
f.join();
Run Code Online (Sandbox Code Playgroud)
使用重现:https : //www.mycompiler.io/view/2v1ME4u
Exception in thread "main" java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2396)
at Main.main(Main.java:7)
Run Code Online (Sandbox Code Playgroud)
7号线是f.cancel(true);线。
java concurrency java.util.concurrent java-8 completable-future
我需要将 AtomicInteger 增加两次,例如:++i; ++i; 通过无尽的循环,我想将计数器增加两次并检查奇偶校验。但我总是得到增加一次的变量。如何解决?