我对与Java BlockingQueue相同的数据结构感兴趣,但它必须能够批处理队列中的对象.换句话说,我希望生产者能够将对象放入队列,但是使用消费者块,take()直到队列达到一定的大小(批量大小).
然后,一旦队列达到批量大小,生产者必须阻止,put()直到消费者已经消耗了队列中的所有元素(在这种情况下,生产者将再次开始生产并且消费者块直到再次到达批次).
是否存在类似的数据结构?或者我应该写它(我不介意),如果有什么东西,我只是不想浪费我的时间.
UPDATE
也许稍微澄清一下事情:
情况总是如下.可以有多个生产者向队列添加项目,但永远不会有多个消费者从队列中获取项目.
现在,问题是这些设置有多个并行和串行.换句话说,生产者为多个队列生产物品,而消费者本身也可以是生产者.这可以更容易地被视为生产者,消费者 - 生产者和最终消费者的有向图.
生产者应该阻塞直到队列为空(@Peter Lawrey)的原因是因为每个都将在一个线程中运行.如果你让它们只是在空间可用的情况下生成,你最终会遇到太多线程试图同时处理太多东西的情况.
也许将它与执行服务相结合可以解决问题?
在Apple的并发编程指南的迁移远离线程部分中,有
改变生产者 - 消费者实现,声称可以使用GCD简化典型的多步骤pthread互斥+条件变量实现.
使用调度队列,您可以将生产者和消费者实现简化为单个调用:
dispatch_async(queue, ^{
// Process a work item.
});
Run Code Online (Sandbox Code Playgroud)
当您的生产者要完成工作时,它所要做的就是将该工作添加到队列中并让队列处理该项目.
生产者 - 消费者问题也被称为有界缓冲问题,但上面没有提到缓冲区,它的边界或消费者,更不用说阻止生产者和消费者以避免过度/不足的运行.
这怎么可能是一个有效的解决方案?
macos multithreading producer-consumer grand-central-dispatch ios
我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方.发件人以非常快的方式发送消息.接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入).由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列.所以我的问题是:这会导致消息队列溢出吗?
消息使用者如下所示:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
Run Code Online (Sandbox Code Playgroud)
消费者收到的每条消息都包含一个caseID.对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间.目前只为RabbitMQ设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅caseID.那么如何才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费率吗?或者我应该使用多个消费者同时使用传入消息?或者是否存在任何异步方式让消费者异步使用消息而不等待它完成?欢迎任何建议.
我正在尝试从管道中读取线条并处理它们,但我正在做一些愚蠢的事情,我无法弄清楚是什么.生产者将无限期地继续生产生产线,如下所示:
producer.py
import time
while True:
print 'Data'
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
消费者只需要定期检查线路:
consumer.py
import sys, time
while True:
line = sys.stdin.readline()
if line:
print 'Got data:', line
else:
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
当我在Windows shell中运行它时python producer.py | python consumer.py,它只是永远睡觉(似乎永远不会得到数据?)似乎问题是生产者永远不会终止,因为如果我发送有限数量的数据然后它工作正常.
如何获取数据并显示给消费者?在实际应用程序中,生产者是一个我无法控制的C++程序.
所以我已经看到很多方法在Go中实现一个消费者和许多生产者 - 来自Concurrency in Go talk 的经典fanIn函数.
我想要的是fanOut功能.它将从中读取值的通道作为参数,并返回将该值的副本写入的通道切片.
是否有正确/推荐的实现方式?
我正在尝试使用Apache Kafka执行示例生产者使用者应用程序.我从https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka-0.10.0.0-src.tgz下载了它.然后我开始按照http://www.javaworld.com/article/3060078/big-data/big-data-messaging-with-kafka-part-1.html中给出的步骤进行操作.
当我尝试运行bin/zookeeper-server-start.sh config/zookeeper.properties时,我收到错误:无法找到或加载主类config.zookeeper.properties我搜索了该问题但没有得到任何有用的信息就此而言.任何人都可以帮我继续吗?
我正在尝试使用Python 2.7中的Queue.Queue实现多线程生产者 - 消费者模式.我试图找出如何使消费者,即工人线程,一旦完成所有必要的工作,停止.
请参阅Martin James对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080
发送'我完成'任务,指示池线程终止.任何获得此类任务的线程都会重新排队,然后自杀.
但这对我不起作用.例如,请参阅以下代码.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is …Run Code Online (Sandbox Code Playgroud) python concurrency multithreading producer-consumer python-2.7
如何使用事件和代理在C#中实现Producer/Consumer模式?在使用这些设计模式时,我需要注意什么?我需要注意哪些边缘情况?
如何检查我所在的线程是否是Unity线程?
我尝试在构造函数时捕获threadId,但是在程序生命周期的某个地方,threadId会移动.
在我的项目中,一些辅助线程进程需要访问新创建的对象.
我使用生产者 - 消费者模式,因此可以在Unity线程上创建它们.对象工厂将请求排队,并在Update()我请求的对象上在正确的线程上实例化.在Queued和Instantiated之间,工厂方法使用AutoResetEvent等待ObjectCreated事件.
现在有时这个工厂将从主线程调用,AutoResetEvent将阻塞自己的线程.我也尝试过肮脏的方式
// First try on this thread
try
{
return action();
}
catch (ArgumentException ex)
{
Debug.Log("Tried on same thread, but failed. "+ex.Message);
}
PushToQueueAndWait(action);
Run Code Online (Sandbox Code Playgroud)
但是当团结抛出异常时,无论是否捕获,程序都会停止.
如果我可以检查我是否在正确的线程上,我可以在排队和执行之间切换.
几年前我就认识python和数据库了.
但我想提高我有限的JavaScript知识.对于我的玩具项目,我想在Web浏览器中使用异步队列并使用AngularJS.
在python中有一个很好的类叫做multiprocessing.Queue,我以前用过它.
现在我搜索这样的东西,但是在AngularJS中
第1步:队列中的工作项(粉色圆圈).只是一个视图json字节.
第2步:用户处理数据.
步骤3:out-queue关心将结果发送到服务器.
为什么这个"复杂"的设置?因为我希望应用程序尽可能地响应.队列中应该预先加载一些数据,出队列应该处理响应通信.
另一个好处是,通过此设置,应用程序可以处理服务器或网络中断几分钟.
AngularJS的双向数据绑定立即更新用户编辑的数据并不适合我的问题.或者我错过了什么.我是AngularJS的新手.
图中的粉色圆圈表示JSON数据结构.我想通过一个请求将每个请求推送到浏览器.
例:
用户看到一个问题,然后他需要填写三个字段.例如:
在使用了"提交"后,数据应该被放入队列中.他应该立即得到下一个问题.
题:
是否已经为AngularJS提供了生产者 - 消费者队列?如果没有,如何实施呢?
更新
从客户端发送数据可以使用普通的AJAX实现.预取数据的队列是更复杂的部分.虽然两者都可以使用相同的实现.客户端以超低延迟获取新数据非常重要.队列中每次最多应填充5个项目,以避免客户端等待数据.
在我的情况下,如果浏览器关闭并且队列中的项目丢失则无关紧要.填充队列在服务器部分是只读的.
我没有修复AngularJS.如果有充分的理由,我很乐意改变框架.
保留浏览器重新加载之间的队列可以使用localStorage(html5)完成
c# ×2
java ×2
python ×2
.net ×1
amqp ×1
angularjs ×1
apache-kafka ×1
concurrency ×1
events ×1
go ×1
ios ×1
javascript ×1
macos ×1
pipe ×1
python-2.7 ×1
queue ×1
rabbitmq ×1