标签: java.util.concurrent

为消费者-生产者使用 ExecutorService 和 PipedReader/PipedWriter(或 PipedInputStream/PipedOutputStream)的 Java 示例

我正在寻找一个简单的生产者 - Java 中的消费者实现,不想重新发明轮子

我找不到同时使用新并发包和 Piped 类的示例

是否有为此使用PipedInputStream和新的 Java 并发包的示例?

有没有更好的方法而不使用 Piped 类来完成这样的任务?

java producer-consumer executorservice java.util.concurrent

0
推荐指数
1
解决办法
4348
查看次数

使用BlockingQueue时的线程同步

我正在尝试处理从MQ基础结构获得的一些消息.我有两个阻塞队列,sharedQueuepubQueue.在sharedqueue得到与我从MQ基础设施below.It得到会把消息给邮件填满sharedQueue.

client.setCallback(new CallBack("inst",sharedQueue));

messagemanipulator线程将从中读取sharedQueue,处理它并将响应放入以pubQueue供稍后发布.

新的MessageManipulatorThread(sharedQueue,pubQueue).run();

发布者线程将从pubQueueMQ基础结构中获取消息并将其发布到MQ基础结构.

new PublisherThread(pubQueue).run();

以下是完整代码:

 public class ArrayBlockingQueueExample {

 private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);
 private BlockingQueue<String> pubQueue = new ArrayBlockingQueue<>(64);


public static void main(String[] args) throws MqttException, Exception {

    new ArrayBlockingQueueExample().startThreads();

}

public void startThreads() throws MqttException, Exception{

    MqttClient client =  new MQTTClientFactory().getInstance();
    client.setCallback(new CallBack("inst", sharedQueue));

    new MessageManipulatorThread(sharedQueue,pubQueue).run();
    new PublisherThread(pubQueue).run();


}



 public MessageManipulatorThread( BlockingQueue<String> sharedQueue , BlockingQueue<String> pubQueue){

    this.sharedQueue = sharedQueue; …
Run Code Online (Sandbox Code Playgroud)

java multithreading java.util.concurrent mqtt

0
推荐指数
1
解决办法
434
查看次数

哪个Java集合类更适合在多线程环境中使用

嗨,我是Java多线程环境的新手.我必须在一个线程中添加,更新和删除集合中的对象集.同时我将检查并在另一个线程上迭代该对象.

列表和设置不是线程安全的.任何人都可以分享一下,哪个集合/ Map类更适合在多线程环境中使用?

java collections concurrency multithreading java.util.concurrent

0
推荐指数
1
解决办法
6356
查看次数

Java:notifyObservers不兼容?

我有4个观察者正在监听可观察的数据.然而,我的一个观察者是较慢的,可以采取.我刚看到notifyObserver的代码为: -

  132       public void notifyObservers(Object arg) {
  133           /*
  134            * a temporary array buffer, used as a snapshot of the state of
  135            * current Observers.
  136            */
  137           Object[] arrLocal;
  138   
  139           synchronized (this) {
/**comment removed for clarity **/

  152               if (!changed)
  153                   return;
  154               arrLocal = obs.toArray();
  155               clearChanged();
  156           }
  157   
  158           for (int i = arrLocal.length-1; i>=0; i--)
  159               ((Observer)arrLocal[i]).update(this, arg);
  160       }
Run Code Online (Sandbox Code Playgroud)

从代码中可以清楚地看到观察者是一个接一个地被调用的.由于设计观察者在执行中是独立的.不应该同时调用它们arg作为最终的功能吗?

执行的时间本t1+t2+t3+t4应该是max(t1,t2,t3,t4).我可以使 …

java concurrency design-patterns java.util.concurrent observer-pattern

0
推荐指数
1
解决办法
372
查看次数

在Java中建立关系

我知道,有两种方法可以在java中建立before-before关系:synchronized块和方法,volatile关键字.(如果我是正确的,它不适用于最终字段).我的问题是:并发包中的原子变量是否相似?可以发生 - 之前由他们建立?

java concurrency multithreading java.util.concurrent happens-before

0
推荐指数
1
解决办法
191
查看次数

为什么不执行 CompletableFuture.runAsync?

我认为主线程必须在子线程之后结束。但是下面的代码显示在打印“异步结束”之前完成的过程。是什么原因?谁能解释一下?谢谢。

import java.util.concurrent.CompletableFuture;

public class Test {
    public static void main(String[] args) {
        CompletableFuture.runAsync(() -> {
            try {
                System.out.println("async start");
                Thread.sleep(3000);
                System.out.println("async end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("main end");
    }
}
Run Code Online (Sandbox Code Playgroud)

java java.util.concurrent

0
推荐指数
1
解决办法
1450
查看次数

为什么调用 CompletableFuture::cancel 会导致立即 CancellationException

我想打电话给cancelCompletableFuture

从文档看来:

如果尚未完成,则使用 CancellationException 完成此 CompletableFuture。尚未完成的从属 CompletableFutures 也将异常完成,此 CancellationException 会导致 CompletionException。

它应该异常地完成它们,这正是我所期望的,但相反,它会立即抛出 CancellationException。

这是一个示例代码

CompletableFuture<?> f = CompletableFuture.supplyAsync(() -> false);
f.cancel(true);  // Line 7.
f.join();
Run Code Online (Sandbox Code Playgroud)

使用重现:https : //www.mycompiler.io/view/2v1ME4u

Exception in thread "main" java.util.concurrent.CancellationException
    at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2396)
    at Main.main(Main.java:7)
Run Code Online (Sandbox Code Playgroud)

7号线是f.cancel(true);线。

java concurrency java.util.concurrent java-8 completable-future

0
推荐指数
1
解决办法
46
查看次数

如何将 AtomicInteger 增加两次?

我需要将 AtomicInteger 增加两次,例如:++i; ++i; 通过无尽的循环,我想将计数器增加两次并检查奇偶校验。但我总是得到增加一次的变量。如何解决?

java thread-safety java.util.concurrent

-2
推荐指数
1
解决办法
9208
查看次数