标签: blockingqueue

在Java中实现多个使用者多生产者的问题

我用一个阻塞队列编写了一个简单的消费者 - 生产者问题,该队列有多个生产者和多个消费者接受并将整数放在队列中.但是,当我尝试测试它时,结果并不理想,例如队列的大小不正确.我不认为消费者和生产者规模正在同步.此外,我对生产者和消费者都进行了2秒钟的睡眠,但在测试时,每两秒就打印出2个生产者和2个消费者的结果.有谁知道我做错了什么?也许我开始错误的线程?我评论了另一种方式,但结果仍然是错误的.

结果:

run:
Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0
Consuming 192     Thread-2 size left 0
Consuming 155     Thread-3 size left 0
Producing 155     Thread-1 size left 0
Producing 192     Thread-0 size left 0
Consuming 141     Thread-2 size left 1
Producing 141     Thread-0 size left 0
Producing 919     Thread-1 size left 0
Consuming 919     Thread-3 size left 0
Producing 361     Thread-1 …
Run Code Online (Sandbox Code Playgroud)

java multithreading producer-consumer blockingqueue

3
推荐指数
1
解决办法
5624
查看次数

ThreadPoolExecutor中的SynchronousQueue

我试图了解队列中的行为ThreadPoolExecutor.在下面的程序中,当我使用时LinkedBlockingQueue,我一次只能向线程池提交一个任务.但是,如果我取代LinkedBlockingQueueSynchronousQueue,我可以在瞬间提交所有5个任务到池中.如何SynchronousQueue不同于LinkedBlockingQueue在这种情况下?

Java程序:

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Sample {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<Runnable> threadPoolQueue = new LinkedBlockingQueue<>();
//      SynchronousQueue<Runnable> threadPoolQueue = new SynchronousQueue<>();
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, threadPoolQueue, threadFactory);
        Runnable np;

        for (int i = 1; i <= 5; i++) {
            np = new SampleWorker("ThreadPoolWorker " …
Run Code Online (Sandbox Code Playgroud)

java multithreading blockingqueue threadpool threadpoolexecutor

3
推荐指数
1
解决办法
2126
查看次数

如何正确终止正在阻塞的线程(Lparallel Common Lisp)

在Lparallel API中,终止所有线程任务的推荐方法是使用停止内核(lparallel:end-kernel)。但是,当线程正在阻塞时(例如,(pop-queue queue1)等待某个项目出现在队列中),当内核停止时,该线程仍将处于活动状态。在这种情况下(至少在SBCL中),内核关闭偶尔(但并非每次)失败,并显示以下信息:

debugger invoked on a SB-KERNEL:BOUNDING-INDICES-BAD-ERROR in thread
#<THREAD "lparallel" RUNNING {1002F04973}>:
  The bounding indices 1 and NIL are bad for a sequence of length 0.
See also:
  The ANSI Standard, Glossary entry for "bounding index designator"
  The ANSI Standard, writeup for Issue SUBSEQ-OUT-OF-BOUNDS:IS-AN-ERROR

debugger invoked on a SB-SYS:INTERACTIVE-INTERRUPT in thread
#<THREAD "main thread" RUNNING {10012E0613}>:
  Interactive interrupt at #x1001484328.
Run Code Online (Sandbox Code Playgroud)

我假设这与阻塞线程无法正确终止有关。关闭内核之前,应该如何正确终止阻塞线程?(API表示kill-tasks仅应在特殊情况下使用,我认为这种情况不适用于这种“正常”关闭情况。)

multithreading common-lisp blockingqueue

3
推荐指数
1
解决办法
94
查看次数

轮询是否阻止LinkedBlockingQueue中的其他操作?

在下面的伪代码中,我有一个poll()在主线程中永远调用的函数.如果我在没有sleep()声明的情况下执行此操作,则poll()每分钟只有2-3个项目被另一个线程添加到队列中.这是否意味着轮询会阻止该put()声明?

我怎么解决这个问题?

public class Test extends Thread{
    private LinkedBlockingQueue<Object> queue = null;

    Test(){
        queue = new LinkedBlockingQueue<Object>(10);
    }

    public void run(){
        // Do stuff, get incoming object from network
        queue.put(o);
    }

    public Object poll(){
        Object o = queue.poll();
        sleep(1000);
        return o;
    }
}
Run Code Online (Sandbox Code Playgroud)

java multithreading blockingqueue

2
推荐指数
1
解决办法
9295
查看次数

notify()而不是notifyAll()用于阻塞队列

我试图找出是否有可能有一个多生产者/多个消费者队列,我可以使用notify()而不是notifyAll().例如,在下面的实现中(源代码:这里),你不能只是简单地切换notifyAll()for notify().你不能切换的原因并不是很明显,所以我会把它作为预告片给那些想要帮助我理解这个问题的人.

所以下面的代码被破坏了:

public class BlockingQueue {

  private Object lock = new Object();

  private List queue = new LinkedList();
  private int  limit = 10;

  public BlockingQueue(int limit){
    this.limit = limit;
  }


  public void enqueue(Object item)
  throws InterruptedException  {
   synchronized(lock) {
    while(this.queue.size() == this.limit) {
      lock.wait();
    }
    if(this.queue.size() == 0) {
      lock.notify();
    }
    this.queue.add(item);
   }
  }


  public Object dequeue()
  throws InterruptedException{
   synchronized(lock) {
    while(this.queue.size() == 0){
      lock.wait();
    }
    if(this.queue.size() == this.limit){
      lock.notify(); …
Run Code Online (Sandbox Code Playgroud)

java queue multithreading blockingqueue data-structures

2
推荐指数
1
解决办法
2299
查看次数

创建阻塞队列

有时,这种实施和执行BlockingQueue确实有效。有时会出现段错误。知道为什么吗?

#include <thread>
using std::thread;
#include <mutex>
using std::mutex;
#include <iostream>
using std::cout;
using std::endl;
#include <queue>
using std::queue;
#include <string>
using std::string;
using std::to_string;
#include <functional>
using std::ref;

template <typename T>
class BlockingQueue {
private:
    mutex mutex_;
    queue<T> queue_;
public:
    T pop() {
        this->mutex_.lock();
        T value = this->queue_.front();
        this->queue_.pop();
        this->mutex_.unlock();
        return value;
    }

    void push(T value) {
        this->mutex_.lock();
        this->queue_.push(value);
        this->mutex_.unlock();
    }

    bool empty() {
        this->mutex_.lock();
        bool check = this->queue_.empty();
        this->mutex_.unlock();
        return check;
    }
};

void fillWorkQueue(BlockingQueue<string>& workQueue) …
Run Code Online (Sandbox Code Playgroud)

c++ multithreading blockingqueue c++11

2
推荐指数
1
解决办法
1万
查看次数

多生产者多消费者多线程Java

我正在尝试多生产者 - 生产者 - 消费者问题的多个消费者使用案例.我正在使用BlockingQueue在多个生产者/消费者之间共享公共队列.

以下是我的代码.
制片人

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue inputQueue;
    private static volatile int i = 0;
    private volatile boolean isRunning = true;

    public Producer(BlockingQueue q){
        this.inputQueue=q;
    }

    public synchronized void run() {

        //produce messages
        for(i=0; i<10; i++) 
        {
            try {
                inputQueue.put(new Integer(i));

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Produced "+i);
        }
        finish();
    }

    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
    } …
Run Code Online (Sandbox Code Playgroud)

java multithreading producer-consumer blockingqueue

2
推荐指数
2
解决办法
1万
查看次数

PriorityBlockingQueue 何时对元素进行排序?

我有一个PriorityBlockingQueue包含元素列表的元素。我已经实现了 Comparable 接口并覆盖了compareTo()以定义哪个元素小于,等于 o 大于其他元素。

所以我想知道优先队列是如何工作的,也就是说,它什么时候对它的元素进行排序?自动处理队列中的任何事件(添加、删除、修改)?

谁能向我解释优先队列是如何工作的?我不清楚。

java priority-queue blockingqueue

2
推荐指数
1
解决办法
1119
查看次数

即使队列为空,queue.IsCompleted 也会返回 false?

以下代码永远不会返回。调试表明,即使队列为空, queue.IsCompleted 也会返回 false。我错过了什么吗?

var workers = new Task[1];
using (var queue = new BlockingCollection<QueuePayload>(20))
{
    workers[0] = Task.Run(() => Consume(queue));
    queue.Add(new QueuePayload{....});
    Task.WaitAll(workers);
}

void Consume(BlockingCollection<QueuePayload> queue))
{
    while (!queue.IsCompleted)
    {
        var i = new QueuePayload();
        try
        {
            i = queue.Take();
        }
        catch (InvalidOperationException)
        {
            break;
        }
    ......
Run Code Online (Sandbox Code Playgroud)

c# blockingqueue task-parallel-library

2
推荐指数
1
解决办法
1231
查看次数

当我已经有 max.Queue.poolSize 时,Hystrix 中的queueSizeRejectionThreshold 有什么用?

为什么我们需要queueSizeRejectionThreshold在 Hystrix 之外呢maxQueueSize

根据定义,queueSizeRejectionThreshold <= maxQueueSize. maxQueueSize但我不明白为什么在线程满时不拒绝线程,为什么要引入这个术语queueSizeRejectionThreshold

blockingqueue hystrix

2
推荐指数
1
解决办法
3096
查看次数