while (true)
{
BasicDeliverEventArgs e = (BasicDeliverEventArgs)Consumer.Queue.Dequeue();
IBasicProperties properties = e.BasicProperties;
byte[] body = e.Body;
Console.WriteLine("Recieved Message : " + Encoding.UTF8.GetString(body));
ch.BasicAck(e.DeliveryTag, false);
}
Run Code Online (Sandbox Code Playgroud)
这是我们通过订阅检索消息时所做的事情.我们使用While循环,因为我们希望消费者不断地听...如果我想让这个甚至基于......那就是当时新消息到达队列的时候只有消费者应该消费消息..或任何类似的事件..
在上个学期,我在C中学习了OS实习,其中第一个项目涉及制作线程包,然后编写多个生产者 - 消费者程序来演示功能.然而,在获得评分反馈之后,我失去了"信号量的使用是巧妙的错误"和"程序假设抢占(例如使用产量来改变控制)"(我们从一个非抢占式线程包开始然后稍后添加抢占).请注意,注释和示例相互矛盾.我相信它也不会假设,并且可以在两种环境中工作).
这已经困扰了我很长一段时间 - 课程工作人员有点不知所措,所以我不能问他们这个学期有什么问题.我花了很长时间思考这个,我看不出问题.如果有人可以看一看并指出错误,或者向我保证实际上没有问题,我真的很感激.
我相信语法在线程包函数(minithreads和semaphores)方面应该是非常标准的,但是让我知道是否有任何令人困惑的事情.
#include <stdio.h>
#include <stdlib.h>
#include "minithread.h"
#include "synch.h"
#define BUFFER_SIZE 16
#define MAXCOUNT 100
int buffer[BUFFER_SIZE];
int size, head, tail;
int count = 1;
int out = 0;
int toadd = 0;
int toremove = 0;
semaphore_t empty;
semaphore_t full;
semaphore_t count_lock; // Semaphore to keep a lock on the
// global variables for maintaining the counts
/* Method to handle the working of a student
* The ID of a student is …
Run Code Online (Sandbox Code Playgroud) 我必须开发一个多线程应用程序,其中将有多个线程,每个线程生成自定义事件日志并需要保存在队列中保存(不是微软MSMQ).
将有另一个线程从队列中读取日志数据并使用某些信息对其进行操作以将日志信息保存到文件中.基本上,我们在这里实现多个生产者单个消费者范例.
任何机构都可以建议我如何在C++或C#中实现它.
谢谢,
我试图使用pthreads和信号量解决生产者 - 消费者问题,但看起来生产者线程没有生成,消费者线程也不消耗.似乎正在创建线程:
/* Do actual work from this point forward */
/* Create the producer threads */
for(c1=1; c1<=argarray[1]; c1++)
{
pthread_create(&tid, &attr, producer, NULL);
printf("Creating producer #%d\n", c1);
}
/* Create the consumer threads */
for(c1=1; c1<=argarray[2]; c1++)
{
pthread_create(&tid, &attr, consumer, NULL);
printf("Creating consumer #%d\n", c1);
}
Run Code Online (Sandbox Code Playgroud)
因为"创建生产者#x"和"创建消费者#x"被打印到屏幕上.但是,它不会从线程内部打印出来:
if(insert_item(item))
{
fprintf(stderr, "Producer error.");
}
else
{
printf("Producer produced %d\n", item);
}
Run Code Online (Sandbox Code Playgroud)
同样用于消费者线程.完整代码:
#include "buffer.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
/* Create Buffer */
buffer_item …
Run Code Online (Sandbox Code Playgroud) 我正在寻找一种方法来使用pthread rwlock结构和C++中的条件例程.
我有两个问题:
第一:如何可能,如果我们不能,为什么?
第二:为什么当前的POSIX pthread没有实现这种行为?
为了理解我的目的,我解释了我的用途:我有一个处理一个共享数组的生产者 - 消费者模型.消费者将在数组为空时cond_wait,但在读取某些元素时将使用rdlock.当添加(+信号)或从阵列中移除elems时,生产者将发生扭曲.
使用rdlock而不是mutex_lock的好处是提高性能:当使用mutex_lock时,几个读取器会阻塞,而使用rdlock时,几个读取器不会阻塞.
这些实现之间有什么区别,哪些情况应该使用一个或另一个队列?
在rxjs5
,我有一个AsyncSubject,并希望多次订阅它,但只有一个订阅者应该收到该next()
事件.所有其他人(如果他们还没有取消订阅)应立即获得该complete()
活动next()
.
例:
let fired = false;
let as = new AsyncSubject();
const setFired = () => {
if (fired == true) throw new Error("Multiple subscriptions executed");
fired = true;
}
let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);
// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered
setTimeout(() => {
as.next(undefined);
as.complete();
}, 500);
Run Code Online (Sandbox Code Playgroud) 据说消费者只能阅读整个主题。没有运气对经纪人进行评估以过滤消息。
这意味着我们必须消费/接收来自主题的所有消息并在客户端过滤它们。
这太多了。我想知道我们是否可以根据已经传递给代理的东西(例如 msg 键或其他东西)过滤和接收特定类型的消息。
从方法Consumer.poll(timeout)来看,我们似乎没有多余的事情可以做。
假设有1个生产者P和2个消费者C1和C2.并且有2个队列Q1和Q2,都具有特定容量.
P将生产物品并交替进入Q1和Q2.物品是为特定消费者生产的,不能被其他消费者消费.我如何在Java中实现以下内容:在我启动3个线程后,如果Q1为空,则线程C1被阻塞,直到在Q1中存在某些内容时通知它.Q2也是如此.当Q1和Q2都满时,P将被阻止,直到Q1或Q2未满时通知.
我正在考虑使用BlockingQueue,它会在队列为空时阻止使用者.但问题是当其中一个队列已满时,生产者将被阻止.我们可以使用Java中的任何数据结构来解决这个问题吗?
更新
我自己有一个解决方案,但我不确定它是否有效.我们仍然可以有2个BlockingQueues.当消费者从其队列中获取项目时,它会使用BlockingQueue.take()
,因此当队列中没有项目时它将被阻止.当生产者将项目添加到任一队列时,它使用BlockingQueue.offer()
.因此,它永远不会被此操作阻止,并且如果队列已满,它将变为"false".另外,我们保留一个AtomicInteger来指示未满的队列数.每当生产者P想要将一个项目放入队列时,如果它得到错误的返回,我们将AtomicInteger减少1.当它达到0时,生成器调用AtomicInteger.wait()
.每当消费者从其队列中获取项目时,它也会检查AtomicInteger.当它为0时,消费者将其增加1并进行呼叫AtomicInteger.notify()
.
请让我知道这个解决方案是否有意义.
非常感谢!
java concurrency messaging producer-consumer java.util.concurrent
是否有针对消费者的内置或强大的第三方抽象在Java 8+中返回了价值?
PS对于延迟执行,它也可能返回Future。
更新。 函数接口在语法上有完美的匹配,但是在语义方面存在一些考虑。在这种情况下使用Function显然会破坏不更改外部状态的合同。怎么处理呢?
java ×4
c ×3
concurrency ×3
c# ×2
c++ ×2
pthreads ×2
abstraction ×1
apache-kafka ×1
api ×1
ecmascript-6 ×1
filtering ×1
javascript ×1
messaging ×1
posix ×1
queue ×1
rabbitmq ×1
rxjs5 ×1
semaphore ×1
typescript ×1