我有一个BlockingCollection.生产者任务向其添加项目,消费者任务删除项目.
现在我想限制集合中的项目数,如果添加更多项目,则自动丢弃旧数据.该集合永远不应包含多于N最近添加的项目.
因此,如果生产者添加新项目的速度快于消费者删除它们,我希望消费者只处理最新的项目.
我可以BlockingCollection在其构造函数中限制a的大小,但当然这只是意味着它在添加更多项时会阻塞,而不是它会删除旧项.
(我不希望生产者端阻塞,只有消费者方在从空集合中检索项目时才会阻塞.)
我目前的解决方案是黑客攻击,只适用于1的大小限制:(
而且我不确定它是否可靠.)
// My consumer task:
foreach (var item in blockingCollection.GetConsumingEnumerable())
{
var lastItem = item;
var lastItemTmp = item;
while (blockingCollection.TryTake(out lastItemTmp))
lastItem = lastItemTmp;
// Now lastItem contains the most recent item in the collection,
// and older items have been discarded.
// Proceed consuming lastItem ...
}
Run Code Online (Sandbox Code Playgroud)
有更清洁的解决方案吗?
c# collections multithreading producer-consumer thread-safety
嗨,我正在考虑使用快速可靠的生产者消费者队列进行线程切换.我正在使用VC++在Windows上工作.
我的设计基于Anthony Williams队列,也就是说,基本上是一个boost :: mutex,带有boost :: condition_variable.现在通常,notify_one()和唤醒之间的时间在10(罕见)和100微秒之间变化,大多数值在50微秒的范围内.但是,1000中大约有1个超过1毫秒,有些时间超过5毫秒.
我只是想知道这些是否是典型值?是否有更快的信号传递方式?是从这里到管理线程优先级吗?我还没有开始优先考虑,但我只是想知道是否有可能将它变成一个大约10微秒的相当稳定的区域?
谢谢
编辑:使用SetPriorityClass(GetCurrentProcess(),REALTIME_PRIORITY_CLASS),平均唤醒时间仍约为50微秒,但异常值较少,现在大多数都在150-200微米左右.除了7毫秒的怪异异常值.嗯...不好.
从我的主要开始,我开始了两个名为生产者和消费者的线程。两者都包含while(true)循环。生产者循环是 UDP 服务器,因此它不需要睡眠。我的问题出在消费者循环中。消费者循环从链接队列中删除对象并将其传递给函数以进行进一步处理。根据研究,在循环中使用线程睡眠不是一个好习惯,因为有时 O/S 不会在设置时间结束时释放。如果我在应用程序理想时删除线程睡眠,它会将 CPU 拖到 20% 到 30%。
class Producer implements Runnable {
private DatagramSocket dsocket;
FError fer = new FError();
int port =1548;
ConcurrentLinkedQueue<String> queue;
Producer(ConcurrentLinkedQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try {
// Create a socket to listen on the port.
dsocket = new DatagramSocket(port);
// Create a buffer to read datagrams into.
byte[] buffer = new byte[30000];
// Create a packet to receive data into the buffer
DatagramPacket …Run Code Online (Sandbox Code Playgroud) while (true)
{
BasicDeliverEventArgs e = (BasicDeliverEventArgs)Consumer.Queue.Dequeue();
IBasicProperties properties = e.BasicProperties;
byte[] body = e.Body;
Console.WriteLine("Recieved Message : " + Encoding.UTF8.GetString(body));
ch.BasicAck(e.DeliveryTag, false);
}
Run Code Online (Sandbox Code Playgroud)
这是我们通过订阅检索消息时所做的事情.我们使用While循环,因为我们希望消费者不断地听...如果我想让这个甚至基于......那就是当时新消息到达队列的时候只有消费者应该消费消息..或任何类似的事件..
在上个学期,我在C中学习了OS实习,其中第一个项目涉及制作线程包,然后编写多个生产者 - 消费者程序来演示功能.然而,在获得评分反馈之后,我失去了"信号量的使用是巧妙的错误"和"程序假设抢占(例如使用产量来改变控制)"(我们从一个非抢占式线程包开始然后稍后添加抢占).请注意,注释和示例相互矛盾.我相信它也不会假设,并且可以在两种环境中工作).
这已经困扰了我很长一段时间 - 课程工作人员有点不知所措,所以我不能问他们这个学期有什么问题.我花了很长时间思考这个,我看不出问题.如果有人可以看一看并指出错误,或者向我保证实际上没有问题,我真的很感激.
我相信语法在线程包函数(minithreads和semaphores)方面应该是非常标准的,但是让我知道是否有任何令人困惑的事情.
#include <stdio.h>
#include <stdlib.h>
#include "minithread.h"
#include "synch.h"
#define BUFFER_SIZE 16
#define MAXCOUNT 100
int buffer[BUFFER_SIZE];
int size, head, tail;
int count = 1;
int out = 0;
int toadd = 0;
int toremove = 0;
semaphore_t empty;
semaphore_t full;
semaphore_t count_lock; // Semaphore to keep a lock on the
// global variables for maintaining the counts
/* Method to handle the working of a student
* The ID of a student is …Run Code Online (Sandbox Code Playgroud) 我必须开发一个多线程应用程序,其中将有多个线程,每个线程生成自定义事件日志并需要保存在队列中保存(不是微软MSMQ).
将有另一个线程从队列中读取日志数据并使用某些信息对其进行操作以将日志信息保存到文件中.基本上,我们在这里实现多个生产者单个消费者范例.
任何机构都可以建议我如何在C++或C#中实现它.
谢谢,
我试图使用pthreads和信号量解决生产者 - 消费者问题,但看起来生产者线程没有生成,消费者线程也不消耗.似乎正在创建线程:
/* Do actual work from this point forward */
/* Create the producer threads */
for(c1=1; c1<=argarray[1]; c1++)
{
pthread_create(&tid, &attr, producer, NULL);
printf("Creating producer #%d\n", c1);
}
/* Create the consumer threads */
for(c1=1; c1<=argarray[2]; c1++)
{
pthread_create(&tid, &attr, consumer, NULL);
printf("Creating consumer #%d\n", c1);
}
Run Code Online (Sandbox Code Playgroud)
因为"创建生产者#x"和"创建消费者#x"被打印到屏幕上.但是,它不会从线程内部打印出来:
if(insert_item(item))
{
fprintf(stderr, "Producer error.");
}
else
{
printf("Producer produced %d\n", item);
}
Run Code Online (Sandbox Code Playgroud)
同样用于消费者线程.完整代码:
#include "buffer.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
/* Create Buffer */
buffer_item …Run Code Online (Sandbox Code Playgroud) 我在Albahari的一本Nutshell书中重用了C#中的示例生产者消费者队列(http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT),一位同事评论道:"为什么不调用Dispose关于收集处理中的BlockingCollection?"
我找不到答案,我能想出的唯一理由是不会处理队列剩余工作量的执行.但是,当我处理队列时,为什么不停止处理呢?
除了"为什么你不应该处理BlockingCollection?" 我还有第二个问题"如果你不处理BlockingCollection,会不会受到伤害?".我想当你产生/处理大量的生产者消费者队列时,它会产生问题(不是我想要的,而只是为了知道的原因).
根据BlockingCollection.Dispose实际做了什么?BlockingCollection包含两个等待句柄(显然),所以不调用Dispose会给你一些问题.谢谢ken2k指出这一点.
我正在谈论的代码:
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public void EnqueueTask (Action action) { _taskQ.Add (action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available …Run Code Online (Sandbox Code Playgroud) 我在不同的任务上运行一个非常典型的生产者/消费者模型
Task1:从二进制文件中读取一批byte [],并为每个字节数组集合启动一个新任务.(该操作是为了内存管理目的而批量处理的).
任务2-n:这些是工作任务,每个都在字节数组的传入集合(来自Tasks1)上运行,并对字节数组进行反序列化,按特定条件对它们进行排序,然后存储结果对象的集合(每个字节数组)在"并发字典"中反序列化为此类对象.
任务(n + 1)我选择了并发字典,因为此任务的工作是以与它们来自Task1的顺序相同的顺序合并存储在并发字典中的那些集合.我通过传递一个collectionID(它是int类型并为Task1中的每个新集合递增)从Task1到此任务一直向下实现.此任务基本上检查下一个预期的collectionID是否已存储在并发字典中,如果是,则将其取出,将其添加到Final Queue并检查并发字典中的下一个集合.
现在,从我所看到的和我观看的视频看来,TPL Dataflow可能是这种生产者/消费者模型的完美候选者.我似乎无法设计并因此开始,因为我从未使用过TPL Dataflow.在吞吐量和延迟方面,这个库甚至可以完成任务吗?我目前处理250万字节数组,因此在生成的集合中每秒处理对象.TPL Dataflow可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL Dataflow可以在产生工作任务时保留Task1中的集合批次的顺序,并在工作任务完成后重新合并它们吗?它是否优化了什么?在对整个结构进行分析之后,我觉得由于旋转和涉及太多并发集合而浪费了相当多的时间.
任何想法,想法?
c# concurrency producer-consumer task-parallel-library tpl-dataflow
我是多线程编程的新手,我只知道最常见的Producer-Consumer-Queue.我正在使用boost c ++库,我不知道是否更好地使用boost :: lockfree :: queue或使用`mutex`和`condition_variable`的std :: queue周围的包装类.
哪里更好地使用无锁数据结构哪里更好是使用基于`mutex`和`condition_variables`的简单实现?
c# ×5
c++ ×3
c ×2
concurrency ×2
api ×1
boost ×1
collections ×1
dispose ×1
java ×1
lock-free ×1
performance ×1
pthreads ×1
rabbitmq ×1
semaphore ×1
tpl-dataflow ×1