标签: producer-consumer

Java BlockingQueue与批处理?

我对与Java BlockingQueue相同的数据结构感兴趣,但它必须能够批处理队列中的对象.换句话说,我希望生产者能够将对象放入队列,但是使用消费者块,take()直到队列达到一定的大小(批量大小).

然后,一旦队列达到批量大小,生产者必须阻止,put()直到消费者已经消耗了队列中的所有元素(在这种情况下,生产者将再次开始生产并且消费者块直到再次到达批次).

是否存在类似的数据结构?或者我应该写它(我不介意),如果有什么东西,我只是不想浪费我的时间.


UPDATE

也许稍微澄清一下事情:

情况总是如下.可以有多个生产者向队列添加项目,但永远不会有多个消费者从队列中获取项目.

现在,问题是这些设置有多个并行和串行.换句话说,生产者为多个队列生产物品,而消费者本身也可以是生产者.这可以更容易地被视为生产者,消费者 - 生产者和最终消费者的有向图.

生产者应该阻塞直到队列为空(@Peter Lawrey)的原因是因为每个都将在一个线程中运行.如果你让它们只是在空间可用的情况下生成,你最终会遇到太多线程试图同时处理太多东西的情况.

也许将它与执行服务相结合可以解决问题?

java queue producer-consumer blockingqueue

17
推荐指数
1
解决办法
1万
查看次数

Apple doc的GCD Producer-Consumer解决方案错了吗?

在Apple的并发编程指南的迁移远离线程部分中,有
改变生产者 - 消费者实现,声称可以使用GCD简化典型的多步骤pthread互斥+条件变量实现.

使用调度队列,您可以将生产者和消费者实现简化为单个调用:

dispatch_async(queue, ^{
  // Process a work item.
});
Run Code Online (Sandbox Code Playgroud)

当您的生产者要完成工作时,它所要做的就是将该工作添加到队列中并让队列处理该项目.

生产者 - 消费者问题也被称为有界缓冲问题,但上面没有提到缓冲区,它的边界或消费者,更不用说阻止生产者和消费者以避免过度/不足的运行.

这怎么可能是一个有效的解决方案?

macos multithreading producer-consumer grand-central-dispatch ios

16
推荐指数
1
解决办法
4454
查看次数

RabbitMQ:快速生产者和缓慢的消费者

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方.发件人以非常快的方式发送消息.接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入).由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列.所以我的问题是:这会导致消息队列溢出吗?

消息使用者如下所示:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}
Run Code Online (Sandbox Code Playgroud)

消费者收到的每条消息都包含一个caseID.对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间.目前只为RabbitMQ设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅caseID.那么如何才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费率吗?或者我应该使用多个消费者同时使用传入消息?或者是否存在任何异步方式让消费者异步使用消息而不等待它完成?欢迎任何建议.

java multithreading producer-consumer amqp rabbitmq

16
推荐指数
2
解决办法
1万
查看次数

Python - 来自管道的简单读取线

我正在尝试从管道中读取线条并处理它们,但我正在做一些愚蠢的事情,我无法弄清楚是什么.生产者将无限期地继续生产生产线,如下所示:

producer.py

import time

while True:
    print 'Data'
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

消费者只需要定期检查线路:

consumer.py

import sys, time
while True:
    line = sys.stdin.readline()
    if line:
        print 'Got data:', line
    else:
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

当我在Windows shell中运行它时python producer.py | python consumer.py,它只是永远睡觉(似乎永远不会得到数据?)似乎问题是生产者永远不会终止,因为如果我发送有限数量的数据然后它工作正常.

如何获取数据并显示给消费者?在实际应用程序中,生产者是一个我无法控制的C++程序.

python pipe producer-consumer

15
推荐指数
2
解决办法
2万
查看次数

去:一个生产者很多消费者

所以我已经看到很多方法在Go中实现一个消费者和许多生产者 - 来自Concurrency in Go talk 的经典fanIn函数.

我想要的是fanOut功能.它将从中读取值的通道作为参数,并返回将该值的副本写入的通道切片.

是否有正确/推荐的实现方式?

producer-consumer go

14
推荐指数
1
解决办法
8962
查看次数

错误:无法找到或加载主类config.zookeeper.properties

我正在尝试使用Apache Kafka执行示例生产者使用者应用程序.我从https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka-0.10.0.0-src.tgz下载了它.然后我开始按照http://www.javaworld.com/article/3060078/big-data/big-data-messaging-with-kafka-part-1.html中给出的步骤进行操作.

当我尝试运行bin/zookeeper-server-start.sh config/zookeeper.properties时,我收到错误:无法找到或加载主类config.zookeeper.properties我搜索了该问题但没有得到任何有用的信息就此而言.任何人都可以帮我继续吗?

producer-consumer apache-kafka

14
推荐指数
1
解决办法
2万
查看次数

如何在多线程生产者 - 消费者模式中完成工作后退出工作线程?

我正在尝试使用Python 2.7中的Queue.Queue实现多线程生产者 - 消费者模式.我试图找出如何使消费者,即工人线程,一旦完成所有必要的工作,停止.

请参阅Martin James对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080

发送'我完成'任务,指示池线程终止.任何获得此类任务的线程都会重新排队,然后自杀.

但这对我不起作用.例如,请参阅以下代码.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is …
Run Code Online (Sandbox Code Playgroud)

python concurrency multithreading producer-consumer python-2.7

14
推荐指数
2
解决办法
2509
查看次数

在C#中实现生产者/消费者模式

如何使用事件和代理在C#中实现Producer/Consumer模式?在使用这些设计模式时,我需要注意什么?我需要注意哪些边缘情况?

.net c# events design-patterns producer-consumer

13
推荐指数
1
解决办法
2万
查看次数

看看我是否在团结线上

如何检查我所在的线程是否是Unity线程?

我尝试在构造函数时捕获threadId,但是在程序生命周期的某个地方,threadId会移动.

在我的项目中,一些辅助线程进程需要访问新创建的对象.
我使用生产者 - 消费者模式,因此可以在Unity线程上创建它们.对象工厂将请求排队,并在Update()我请求的对象上在正确的线程上实例化.在Queued和Instantiated之间,工厂方法使用AutoResetEvent等待ObjectCreated事件.

现在有时这个工厂将从主线程调用,AutoResetEvent将阻塞自己的线程.我也尝试过肮脏的方式

// First try on this thread
try
{
    return action();
}
catch (ArgumentException ex)
{
    Debug.Log("Tried on same thread, but failed. "+ex.Message);
}
PushToQueueAndWait(action);
Run Code Online (Sandbox Code Playgroud)

但是当团结抛出异常时,无论是否捕获,程序都会停止.

如果我可以检查我是否在正确的线程上,我可以在排队和执行之间切换.

c# multithreading producer-consumer unity-game-engine

13
推荐指数
1
解决办法
7013
查看次数

AngularJS中的生产者 - 消费者队列

几年前我就认识python和数据库了.

但我想提高我有限的JavaScript知识.对于我的玩具项目,我想在Web浏览器中使用异步队列并使用AngularJS.

在python中有一个很好的类叫做multiprocessing.Queue,我以前用过它.

现在我搜索这样的东西,但是在AngularJS中

异步队列-JS

  • 第1步:队列中的工作项(粉色圆圈).只是一个视图json字节.

  • 第2步:用户处理数据.

  • 步骤3:out-queue关心将结果发送到服务器.

为什么这个"复杂"的设置?因为我希望应用程序尽可能地响应.队列中应该预先加载一些数据,出队列应该处理响应通信.

另一个好处是,通过此设置,应用程序可以处理服务器或网络中断几分钟.

AngularJS的双向数据绑定立即更新用户编辑的数据并不适合我的问题.或者我错过了什么.我是AngularJS的新手.

图中的粉色圆圈表示JSON数据结构.我想通过一个请求将每个请求推送到浏览器.

例:

用户看到一个问题,然后他需要填写三个字段.例如:

  • 回答:输入文字
  • 喜欢这个问题:1..5的整数
  • 难度:1..5的整数

在使用了"提交"后,数据应该被放入队列中.他应该立即得到下一个问题.

题:

是否已经为AngularJS提供了生产者 - 消费者队列?如果没有,如何实施呢?

更新

从客户端发送数据可以使用普通的AJAX实现.预取数据的队列是更复杂的部分.虽然两者都可以使用相同的实现.客户端以超低延迟获取新数据非常重要.队列中每次最多应填充5个项目,以避免客户端等待数据.

在我的情况下,如果浏览器关闭并且队列中的项目丢失则无关紧要.填充队列在服务器部分是只读的.

我没有修复AngularJS.如果有充分的理由,我很乐意改变框架.

保留浏览器重新加载之间的队列可以使用localStorage(html5)完成

javascript producer-consumer angularjs

13
推荐指数
1
解决办法
1672
查看次数