我用一个阻塞队列编写了一个简单的消费者 - 生产者问题,该队列有多个生产者和多个消费者接受并将整数放在队列中.但是,当我尝试测试它时,结果并不理想,例如队列的大小不正确.我不认为消费者和生产者规模正在同步.此外,我对生产者和消费者都进行了2秒钟的睡眠,但在测试时,每两秒就打印出2个生产者和2个消费者的结果.有谁知道我做错了什么?也许我开始错误的线程?我评论了另一种方式,但结果仍然是错误的.
结果:
run:
Producing 425 Thread-0 size left 0
Consuming 890 Thread-3 size left 0
Consuming 425 Thread-2 size left 0
Producing 890 Thread-1 size left 0
Consuming 192 Thread-2 size left 0
Consuming 155 Thread-3 size left 0
Producing 155 Thread-1 size left 0
Producing 192 Thread-0 size left 0
Consuming 141 Thread-2 size left 1
Producing 141 Thread-0 size left 0
Producing 919 Thread-1 size left 0
Consuming 919 Thread-3 size left 0
Producing 361 Thread-1 …Run Code Online (Sandbox Code Playgroud) 我试图了解队列中的行为ThreadPoolExecutor.在下面的程序中,当我使用时LinkedBlockingQueue,我一次只能向线程池提交一个任务.但是,如果我取代LinkedBlockingQueue用SynchronousQueue,我可以在瞬间提交所有5个任务到池中.如何SynchronousQueue不同于LinkedBlockingQueue在这种情况下?
Java程序:
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Sample {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Runnable> threadPoolQueue = new LinkedBlockingQueue<>();
// SynchronousQueue<Runnable> threadPoolQueue = new SynchronousQueue<>();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ThreadPoolExecutor tpe = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, threadPoolQueue, threadFactory);
Runnable np;
for (int i = 1; i <= 5; i++) {
np = new SampleWorker("ThreadPoolWorker " …Run Code Online (Sandbox Code Playgroud) java multithreading blockingqueue threadpool threadpoolexecutor
在Lparallel API中,终止所有线程任务的推荐方法是使用停止内核(lparallel:end-kernel)。但是,当线程正在阻塞时(例如,(pop-queue queue1)等待某个项目出现在队列中),当内核停止时,该线程仍将处于活动状态。在这种情况下(至少在SBCL中),内核关闭偶尔(但并非每次)失败,并显示以下信息:
debugger invoked on a SB-KERNEL:BOUNDING-INDICES-BAD-ERROR in thread
#<THREAD "lparallel" RUNNING {1002F04973}>:
The bounding indices 1 and NIL are bad for a sequence of length 0.
See also:
The ANSI Standard, Glossary entry for "bounding index designator"
The ANSI Standard, writeup for Issue SUBSEQ-OUT-OF-BOUNDS:IS-AN-ERROR
debugger invoked on a SB-SYS:INTERACTIVE-INTERRUPT in thread
#<THREAD "main thread" RUNNING {10012E0613}>:
Interactive interrupt at #x1001484328.
Run Code Online (Sandbox Code Playgroud)
我假设这与阻塞线程无法正确终止有关。关闭内核之前,应该如何正确终止阻塞线程?(API表示kill-tasks仅应在特殊情况下使用,我认为这种情况不适用于这种“正常”关闭情况。)
在下面的伪代码中,我有一个poll()在主线程中永远调用的函数.如果我在没有sleep()声明的情况下执行此操作,则poll()每分钟只有2-3个项目被另一个线程添加到队列中.这是否意味着轮询会阻止该put()声明?
我怎么解决这个问题?
public class Test extends Thread{
private LinkedBlockingQueue<Object> queue = null;
Test(){
queue = new LinkedBlockingQueue<Object>(10);
}
public void run(){
// Do stuff, get incoming object from network
queue.put(o);
}
public Object poll(){
Object o = queue.poll();
sleep(1000);
return o;
}
}
Run Code Online (Sandbox Code Playgroud) 我试图找出是否有可能有一个多生产者/多个消费者队列,我可以使用notify()而不是notifyAll().例如,在下面的实现中(源代码:这里),你不能只是简单地切换notifyAll()for notify().你不能切换的原因并不是很明显,所以我会把它作为预告片给那些想要帮助我理解这个问题的人.
所以下面的代码被破坏了:
public class BlockingQueue {
private Object lock = new Object();
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public void enqueue(Object item)
throws InterruptedException {
synchronized(lock) {
while(this.queue.size() == this.limit) {
lock.wait();
}
if(this.queue.size() == 0) {
lock.notify();
}
this.queue.add(item);
}
}
public Object dequeue()
throws InterruptedException{
synchronized(lock) {
while(this.queue.size() == 0){
lock.wait();
}
if(this.queue.size() == this.limit){
lock.notify(); …Run Code Online (Sandbox Code Playgroud) 有时,这种实施和执行BlockingQueue确实有效。有时会出现段错误。知道为什么吗?
#include <thread>
using std::thread;
#include <mutex>
using std::mutex;
#include <iostream>
using std::cout;
using std::endl;
#include <queue>
using std::queue;
#include <string>
using std::string;
using std::to_string;
#include <functional>
using std::ref;
template <typename T>
class BlockingQueue {
private:
mutex mutex_;
queue<T> queue_;
public:
T pop() {
this->mutex_.lock();
T value = this->queue_.front();
this->queue_.pop();
this->mutex_.unlock();
return value;
}
void push(T value) {
this->mutex_.lock();
this->queue_.push(value);
this->mutex_.unlock();
}
bool empty() {
this->mutex_.lock();
bool check = this->queue_.empty();
this->mutex_.unlock();
return check;
}
};
void fillWorkQueue(BlockingQueue<string>& workQueue) …Run Code Online (Sandbox Code Playgroud) 我正在尝试多生产者 - 生产者 - 消费者问题的多个消费者使用案例.我正在使用BlockingQueue在多个生产者/消费者之间共享公共队列.
以下是我的代码.
制片人
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue inputQueue;
private static volatile int i = 0;
private volatile boolean isRunning = true;
public Producer(BlockingQueue q){
this.inputQueue=q;
}
public synchronized void run() {
//produce messages
for(i=0; i<10; i++)
{
try {
inputQueue.put(new Integer(i));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produced "+i);
}
finish();
}
public void finish() {
//you can also clear here if you wanted
isRunning = false;
} …Run Code Online (Sandbox Code Playgroud) 我有一个PriorityBlockingQueue包含元素列表的元素。我已经实现了 Comparable 接口并覆盖了compareTo()以定义哪个元素小于,等于 o 大于其他元素。
所以我想知道优先队列是如何工作的,也就是说,它什么时候对它的元素进行排序?自动处理队列中的任何事件(添加、删除、修改)?
谁能向我解释优先队列是如何工作的?我不清楚。
以下代码永远不会返回。调试表明,即使队列为空, queue.IsCompleted 也会返回 false。我错过了什么吗?
var workers = new Task[1];
using (var queue = new BlockingCollection<QueuePayload>(20))
{
workers[0] = Task.Run(() => Consume(queue));
queue.Add(new QueuePayload{....});
Task.WaitAll(workers);
}
void Consume(BlockingCollection<QueuePayload> queue))
{
while (!queue.IsCompleted)
{
var i = new QueuePayload();
try
{
i = queue.Take();
}
catch (InvalidOperationException)
{
break;
}
......
Run Code Online (Sandbox Code Playgroud) 为什么我们需要queueSizeRejectionThreshold在 Hystrix 之外呢maxQueueSize?
根据定义,queueSizeRejectionThreshold <= maxQueueSize. maxQueueSize但我不明白为什么在线程满时不拒绝线程,为什么要引入这个术语queueSizeRejectionThreshold?
blockingqueue ×10
java ×6
c# ×1
c++ ×1
c++11 ×1
common-lisp ×1
hystrix ×1
queue ×1
threadpool ×1