我想有一个SynchronousQueue
从一个线程插入元素的位置put()
,因此输入被阻塞,直到元素被另一个线程占用.
在另一个线程中,我执行了大量计算,并且不时想要检查元素是否已经可用,并使用它.但似乎isEmpty()
总是返回true,即使另一个线程正在等待put()
通话.
这怎么可能呢?以下是示例代码:
@Test
public void testQueue() throws InterruptedException {
final BlockingQueue<Integer> queue = new SynchronousQueue<Integer>();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
if (!queue.isEmpty()) {
try {
queue.take();
System.out.println("taken!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// do useful computations here (busy wait)
}
}
});
t.start();
queue.put(1234);
// this point is never reached!
System.out.println("hello");
}
Run Code Online (Sandbox Code Playgroud)
编辑:既不是isEmpty()也不是peek()工作,必须使用poll().谢谢!
我最近遇到了一些线程相关的问题,消费者需要积分.这是原始的,除了占用大量的cpu不断检查队列之外,它工作正常.这个想法是可以随便调用cuePoint,主线程继续运行.
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class PointConsumer implements Runnable {
public static final int MAX_QUEUE_SIZE=500;
BlockingQueue<Point> queue;
public PointConsumer (){
this.queue=new ArrayBlockingQueue<Point>(MAX_QUEUE_SIZE);
}
public void cuePoint(Point p){
try{
this.queue.add(p);
}
catch(java.lang.IllegalStateException i){}
}
public void doFirstPoint(){
if(queue.size()!=0){
Point p=queue.poll();
//operation with p that will take a while
}
}
public void run() {
while(true){
doFirstPoint();
}
}
}
Run Code Online (Sandbox Code Playgroud)
我试图通过每次调用cue函数时添加notify()来修复cpu问题,并将doFirstPoint()重新处理为这样的事情:
public void doFirstPoint(){
if(queue.size()!=0){
//operation with p that will take a while
}
else{
try{
wait(); …
Run Code Online (Sandbox Code Playgroud) 我想到了.愚蠢的错误,我实际上并没有从队列中删除元素,我只是阅读第一个元素.我修改了代码,下面的代码没有用.谢谢大家的帮助.
我正在尝试使用boost实现生产者消费者问题,这实际上是更大项目的一部分.我已经从互联网上的例子中实现了一个程序,甚至我在这里找到了一些帮助.但是目前我的代码只是挂起.基于一些好的建议,我决定使用boost ciruclar缓冲区来保存生产者和消费者之间的数据.那里有很多相似的代码,我能够汇集那些想法并自己写点东西.但是,我似乎仍然遇到与以前相同的问题(这是我的程序只是挂起).我以为我没有像以前那样犯同样的错误..
我的代码在下面给出,我已经取出了我之前的代码,我只是我自己的链接列表.
缓冲区头:
#ifndef PCDBUFFER_H
#define PCDBUFFER_H
#include <pcl/io/pcd_io.h>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/circular_buffer.hpp>
class pcdBuffer
{
public:
pcdBuffer(int buffSize);
void put(int data);
int get();
bool isFull();
bool isEmpty();
int getSize();
int getCapacity();
private:
boost::mutex bmutex;
boost::condition_variable buffEmpty;
boost::condition_variable buffFull;
boost::circular_buffer<int> buffer;
};
#endif
Run Code Online (Sandbox Code Playgroud)
缓冲源(仅相关部分):
#include "pcdBuffer.h"
#include <iostream>
//boost::mutex io_mutex;
pcdBuffer::pcdBuffer(int buffSize)
{
buffer.set_capacity(buffSize);
}
void pcdBuffer::put(int data)
{
{
boost::mutex::scoped_lock buffLock(bmutex);
while(buffer.full())
{
std::cout << "Buffer is full" << std::endl;
buffFull.wait(buffLock);
}
buffer.push_back(data);
}
buffEmpty.notify_one(); …
Run Code Online (Sandbox Code Playgroud) 我正在寻找一个简单的生产者 - Java 中的消费者实现,不想重新发明轮子
我找不到同时使用新并发包和 Piped 类的示例
是否有为此使用PipedInputStream和新的 Java 并发包的示例?
有没有更好的方法而不使用 Piped 类来完成这样的任务?
我写了一个简单的消费者 - 生产者模式来帮助我完成以下任务:
当我运行程序时,即使有5个线程,我的CPU消耗也会猛增到100%.这可能是因为我使用单个制作人来阅读文件吗?
文件示例(制表符分隔)
Column1 Column2 Column3 Column 4 Column5
A 1 * - -
B 1 * - -
C 1 % - -
Run Code Online (Sandbox Code Playgroud)
制片人
public class Producer implements Runnable{
private BlockingQueue<Map<String, Map<String, String>>> m_Queue;
private String m_Directory;
public Producer(BlockingQueue<Map<String, Map<String, String>>> i_Queue, String i_Directory)
{
m_Queue = i_Queue;
m_Directory = i_Directory;
}
@Override
public void run()
{
if (Files.exists(Paths.get(m_Directory)))
{
File[] files = new File(m_Directory).listFiles();
if (files != null)
{
for (File file …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用生产者/消费者模式来实现,BlockingCollection<T>
所以我编写了一个简单的控制台应用程序来测试它。
public class Program
{
public static void Main(string[] args)
{
var workQueue = new WorkQueue();
workQueue.StartProducingItems();
workQueue.StartProcessingItems();
while (true)
{
}
}
}
public class WorkQueue
{
private BlockingCollection<int> _queue;
private static Random _random = new Random();
public WorkQueue()
{
_queue = new BlockingCollection<int>();
// Prefill some items.
for (int i = 0; i < 100; i++)
{
//_queue.Add(_random.Next());
}
}
public void StartProducingItems()
{
Task.Run(() =>
{
_queue.Add(_random.Next()); // Should be adding items to the queue …
Run Code Online (Sandbox Code Playgroud) c# producer-consumer task-parallel-library blockingcollection
我使用这个答案中的代码,但是asyncio.exceptions.CancelledError
当队列为空时得到。在实际项目中,我将任务添加到消费者的队列中,这就是我使用while True
语句的原因
我压缩该代码以使调试更容易:
import asyncio
import traceback
async def consumer(queue: asyncio.Queue):
try:
while True:
number = await queue.get() # here is exception
queue.task_done()
print(f'consumed {number}')
except BaseException:
traceback.print_exc()
async def main():
queue = asyncio.Queue()
for i in range(3):
await queue.put(i)
consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
和错误:
consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
number …
Run Code Online (Sandbox Code Playgroud) 我正在阅读 Galvin OS 关于生产者消费者问题的书,并浏览了这段代码。
\n\n全局定义
\n\n#define BUFFER_SIZE 10\ntypedef struct {\n . . .\n} item;\n\nint in = 0;\nint out = 0;\n
Run Code Online (Sandbox Code Playgroud)\n\n制片人
\n\nwhile (((in + 1) % BUFFER_SIZE) == out)\n ; /* do nothing */\nbuffer[in] = next_produced;\nin = (in + 1) % BUFFER_SIZE ;\n
Run Code Online (Sandbox Code Playgroud)\n\n消费者
\n\nwhile (in == out)\n ; /* do nothing */\nnext_consumed = buffer[out];\nout = (out + 1) % BUFFER_SIZE;\n
Run Code Online (Sandbox Code Playgroud)\n\n这就是加尔文书中所说的:
\n\n\n\n\n …此方案最多允许缓冲区中同时存在 BUFFER_SIZE \xe2\x88\x92 1 个项目。我们将其作为练习,让您提供一个解决方案,\n BUFFER_SIZE 项可以同时位于缓冲区中。
\n
我正在编写一个生产者-消费者解决方案,但我不断收到“错误:预期 ']' 之前的 ';' 标记”时声明“buffer_item buffer[BUFFER_SIZE];”。我不太确定该怎么办?
这是我的 buffer.h 文件:
typedef int buffer_item;
#define BUFFER_SIZE 5;
Run Code Online (Sandbox Code Playgroud)
buffer.c 文件
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include "buffer.h"
buffer_item buffer[BUFFER_SIZE];
void *producer(void *param);
void *consumer(void *param);
//etc..
Run Code Online (Sandbox Code Playgroud)