标签: producer-consumer

BlockingQueue:put()和isEmpty()不能一起工作?

我想有一个SynchronousQueue从一个线程插入元素的位置put(),因此输入被阻塞,直到元素被另一个线程占用.

在另一个线程中,我执行了大量计算,并且不时想要检查元素是否已经可用,并使用它.但似乎isEmpty()总是返回true,即使另一个线程正在等待put()通话.

这怎么可能呢?以下是示例代码:

@Test
public void testQueue() throws InterruptedException {
    final BlockingQueue<Integer> queue = new SynchronousQueue<Integer>();

    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                if (!queue.isEmpty()) {
                    try {
                        queue.take();
                        System.out.println("taken!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // do useful computations here (busy wait)
            }
        }
    });
    t.start();

    queue.put(1234);
    // this point is never reached!
    System.out.println("hello");
}
Run Code Online (Sandbox Code Playgroud)

编辑:既不是isEmpty()也不是peek()工作,必须使用poll().谢谢!

java collections multithreading producer-consumer

0
推荐指数
1
解决办法
1499
查看次数

Java:使用BlockingQueue的生产者/消费者:使用消费者线程wait()直到另一个对象排队

我最近遇到了一些线程相关的问题,消费者需要积分.这是原始的,除了占用大量的cpu不断检查队列之外,它工作正常.这个想法是可以随便调用cuePoint,主线程继续运行.

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class PointConsumer implements Runnable {
    public static final int MAX_QUEUE_SIZE=500;

    BlockingQueue<Point> queue;

    public PointConsumer (){
        this.queue=new ArrayBlockingQueue<Point>(MAX_QUEUE_SIZE);
    }

     public void cuePoint(Point p){
        try{
            this.queue.add(p);
        }
        catch(java.lang.IllegalStateException i){}
    }
     public void doFirstPoint(){
        if(queue.size()!=0){
            Point p=queue.poll();
            //operation with p that will take a while
        }
    }

    public void run() {
        while(true){
                  doFirstPoint();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

我试图通过每次调用cue函数时添加notify()来修复cpu问题,并将doFirstPoint()重新处理为这样的事情:

public void doFirstPoint(){

    if(queue.size()!=0){
            //operation with p that will take a while
    }
    else{
        try{
            wait(); …
Run Code Online (Sandbox Code Playgroud)

java multithreading producer-consumer blockingqueue

0
推荐指数
1
解决办法
4912
查看次数

使用boost线程和循环缓冲区挂起的生产者/消费者

我想到了.愚蠢的错误,我实际上并没有从队列中删除元素,我只是阅读第一个元素.我修改了代码,下面的代码没有用.谢谢大家的帮助.

我正在尝试使用boost实现生产者消费者问题,这实际上是更大项目的一部分.我已经从互联网上的例子中实现了一个程序,甚至我在这里找到了一些帮助.但是目前我的代码只是挂起.基于一些好的建议,我决定使用boost ciruclar缓冲区来保存生产者和消费者之间的数据.那里有很多相似的代码,我能够汇集那些想法并自己写点东西.但是,我似乎仍然遇到与以前相同的问题(这是我的程序只是挂起).我以为我没有像以前那样犯同样的错误..

我的代码在下面给出,我已经取出了我之前的代码,我只是我自己的链接列表.

缓冲区头:

#ifndef PCDBUFFER_H
#define PCDBUFFER_H

#include <pcl/io/pcd_io.h>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/circular_buffer.hpp>

class pcdBuffer
{
    public:
        pcdBuffer(int buffSize);
        void put(int data);
        int get();
        bool isFull();
        bool isEmpty();
        int getSize();
        int getCapacity();
    private:
        boost::mutex bmutex;
        boost::condition_variable buffEmpty;
        boost::condition_variable buffFull;
        boost::circular_buffer<int> buffer;
};


#endif
Run Code Online (Sandbox Code Playgroud)

缓冲源(仅相关部分):

#include "pcdBuffer.h"
#include <iostream>

//boost::mutex io_mutex;

pcdBuffer::pcdBuffer(int buffSize)
{
    buffer.set_capacity(buffSize);
}

void pcdBuffer::put(int data)
{
    {
        boost::mutex::scoped_lock buffLock(bmutex);
        while(buffer.full())
        {
            std::cout << "Buffer is full" << std::endl;
            buffFull.wait(buffLock);
        }
        buffer.push_back(data);
    }
    buffEmpty.notify_one(); …
Run Code Online (Sandbox Code Playgroud)

c++ boost producer-consumer

0
推荐指数
1
解决办法
8094
查看次数

为消费者-生产者使用 ExecutorService 和 PipedReader/PipedWriter(或 PipedInputStream/PipedOutputStream)的 Java 示例

我正在寻找一个简单的生产者 - Java 中的消费者实现,不想重新发明轮子

我找不到同时使用新并发包和 Piped 类的示例

是否有为此使用PipedInputStream和新的 Java 并发包的示例?

有没有更好的方法而不使用 Piped 类来完成这样的任务?

java producer-consumer executorservice java.util.concurrent

0
推荐指数
1
解决办法
4348
查看次数

使用生产者读取大量文件会导致CPU使用率达到100%

我写了一个简单的消费者 - 生产者模式来帮助我完成以下任务:

  1. 从包含~500,000个TSV(制表符分隔)文件的目录中读取文件.
  2. 将每个文件操作为数据结构并将其放入阻塞队列.
  3. 使用使用者和查询DB消耗队列.
  4. 比较两个哈希映射,如果存在差异,则将差异打印到文件.

当我运行程序时,即使有5个线程,我的CPU消耗也会猛增到100%.这可能是因为我使用单个制作人来阅读文件吗?

文件示例(制表符分隔)

Column1   Column2   Column3   Column 4   Column5
A         1         *         -          -
B         1         *         -          -
C         1         %         -          -
Run Code Online (Sandbox Code Playgroud)

制片人

public class Producer implements Runnable{
private BlockingQueue<Map<String, Map<String, String>>> m_Queue;
private String m_Directory;

public Producer(BlockingQueue<Map<String, Map<String, String>>> i_Queue, String i_Directory)
{
    m_Queue = i_Queue;
    m_Directory = i_Directory;
}

@Override
public void run()
{
    if (Files.exists(Paths.get(m_Directory)))
    {
        File[] files = new File(m_Directory).listFiles();

        if (files != null)
        {
            for (File file …
Run Code Online (Sandbox Code Playgroud)

java multithreading file cpu-usage producer-consumer

0
推荐指数
1
解决办法
158
查看次数

如何正确使用 BlockingCollection.GetConsumingEnumerable?

我正在尝试使用生产者/消费者模式来实现,BlockingCollection<T>所以我编写了一个简单的控制台应用程序来测试它。

public class Program
{
    public static void Main(string[] args)
    {
        var workQueue = new WorkQueue();
        workQueue.StartProducingItems();
        workQueue.StartProcessingItems();

        while (true)
        {

        }
    }
}

public class WorkQueue
{
    private BlockingCollection<int> _queue;
    private static Random _random = new Random();

    public WorkQueue()
    {
        _queue = new BlockingCollection<int>();

        // Prefill some items.
        for (int i = 0; i < 100; i++)
        {
            //_queue.Add(_random.Next());
        }
    }

    public void StartProducingItems()
    {
        Task.Run(() =>
        {
            _queue.Add(_random.Next()); // Should be adding items to the queue …
Run Code Online (Sandbox Code Playgroud)

c# producer-consumer task-parallel-library blockingcollection

0
推荐指数
1
解决办法
2376
查看次数

Python CancelledError 与 asyncio 队列

我使用这个答案中的代码,但是asyncio.exceptions.CancelledError当队列为空时得到。在实际项目中,我将任务添加到消费者的队列中,这就是我使用while True语句的原因

我压缩该代码以使调试更容易:

import asyncio
import traceback


async def consumer(queue: asyncio.Queue):
    try:
        while True:
            number = await queue.get()  # here is exception
            queue.task_done()
            print(f'consumed {number}')
    except BaseException:
        traceback.print_exc()


async def main():
    queue = asyncio.Queue()
    for i in range(3):
        await queue.put(i)
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
    await queue.join()
    for c in consumers:
        c.cancel()


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

和错误:

consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
  File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
    number …
Run Code Online (Sandbox Code Playgroud)

python queue producer-consumer python-asyncio

0
推荐指数
1
解决办法
3925
查看次数

使用完整缓冲区的生产者-消费者算法

我正在阅读 Galvin OS 关于生产者消费者问题的书,并浏览了这段代码。

\n\n

全局定义

\n\n
#define BUFFER_SIZE 10\ntypedef struct {\n    . . .\n} item;\n\nint in  = 0;\nint out = 0;\n
Run Code Online (Sandbox Code Playgroud)\n\n

制片人

\n\n
while (((in + 1) % BUFFER_SIZE) == out)\n    ; /* do nothing */\nbuffer[in] = next_produced;\nin = (in + 1) % BUFFER_SIZE ;\n
Run Code Online (Sandbox Code Playgroud)\n\n

消费者

\n\n
while (in == out)\n    ; /* do nothing */\nnext_consumed = buffer[out];\nout = (out + 1) % BUFFER_SIZE;\n
Run Code Online (Sandbox Code Playgroud)\n\n

这就是加尔文书中所说的:

\n\n
\n

此方案最多允许缓冲区中同时存在 BUFFER_SIZE \xe2\x88\x92 1 个项目。我们将其作为练习,让您提供一个解决方案,\n BUFFER_SIZE 项可以同时位于缓冲区中。

\n
\n\n …

c algorithm producer-consumer

-1
推荐指数
1
解决办法
1439
查看次数

得到“错误:在';'之前预期']' 标记”在 C 中声明数组时

我正在编写一个生产者-消费者解决方案,但我不断收到“错误:预期 ']' 之前的 ';' 标记”时声明“buffer_item buffer[BUFFER_SIZE];”。我不太确定该怎么办?

这是我的 buffer.h 文件:

typedef int buffer_item;

#define BUFFER_SIZE 5;
Run Code Online (Sandbox Code Playgroud)

buffer.c 文件

#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include "buffer.h"

buffer_item buffer[BUFFER_SIZE];

void *producer(void *param);
void *consumer(void *param);
//etc..
Run Code Online (Sandbox Code Playgroud)

c producer-consumer

-1
推荐指数
1
解决办法
47
查看次数