我正在考虑在C#中实现通用的生产者/消费者对+处理队列的想法.我们的想法是你可以创建实现适当的IProducer和IConsumer接口(提供默认实现)的对象,它们将主要由委托组成,将它们传递给QueueProcessor
类实例,告诉它你想要多少个消费者,然后去.
但我对自己说,"自我,这肯定已经做过了."
因此,没有人知道的好,一般实施在C#中的生产者/消费者模式(VB.Net是好的,太)?我正在寻找的基本要求:
或者如果没有,哪些陷阱阻止了它,你对如何实现它有任何想法吗?
我编写了包装其他流的后台InputStream
(和OutputStream
)实现,并在后台线程上预读,主要允许解压缩/压缩在解压缩流处理的不同线程中发生.
这是一个相当标准的生产者/消费者模型.
这似乎是一种简单的方法,可以通过简单的读取,处理和写入数据的流程来充分利用多核CPU,从而更有效地利用CPU和磁盘资源.也许"高效"并不是最好的词,但它提供了更高的利用率,对我来说更感兴趣,减少了运行时间,而不是直接从a读取ZipInputStream
并直接写入ZipOutputStream
.
我很高兴发布代码,但我的问题是我是否正在重新发明现有(和更多运行)库中现有的东西?
编辑 - 发布代码......
我的代码BackgroundInputStream
是在下面(BackgroundOutputStream
非常相似),但有一些方面我想改进.
BackgroundInputStream
,那么backgroundReaderThread
它将永远存在.eof
需要改进.Executor
.close()
方法应该通知后台线程,并且不应该关闭包装的流,因为包装的流应该由从其读取的后台线程拥有.package nz.co.datacute.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
public class BackgroundInputStream extends InputStream {
private static final int DEFAULT_QUEUE_SIZE = 1;
private static final int DEFAULT_BUFFER_SIZE = 64*1024;
private final int queueSize;
private final int bufferSize;
private volatile boolean eof …
Run Code Online (Sandbox Code Playgroud) java compression multithreading inputstream producer-consumer
我试图通过跳过昂贵的事件操作来提高生产者/消费者线程的情况,如果有必要,可以使用以下方法:
//cas(variable, compare, set) is atomic compare and swap
//queue is already lock free
running = false
// dd item to queue – producer thread(s)
if(cas(running, false, true))
{
// We effectively obtained a lock on signalling the event
add_to_queue()
signal_event()
}
else
{
// Most of the time if things are busy we should not be signalling the event
add_to_queue()
if(cas(running, false, true))
signal_event()
}
...
// Process queue, single consumer thread
reset_event()
while(1)
{
wait_for_auto_reset_event() // Preferably IOCP …
Run Code Online (Sandbox Code Playgroud) 我已经构建了一个应用程序,通过Amazon SES为网站发送电子邮件邮件.它用C#编码.
每封电子邮件需要0.3秒才能通过Amazon SES API发送.这意味着,使用单线程应用程序,我每秒只能发送3封电子邮件.
我已经实现了一个生产者/消费者,多线程应用程序,其中1个生产者用于查询为每个客户定制电子邮件,25个消费者从队列中提取并发送电子邮件.
我的多线程应用程序每秒发送12封电子邮件(速度提高四倍).我原本预计25线程应用程序的速度会有更大的提升.
我的问题是:我能在单处理器机器上真正加快发送邮件的速度吗?我的收益是否合理,或者我的速度问题更可能是由于编码而不是计算机无法快速处理电子邮件?
提前致谢!
更新:如果其他人面临同样的问题....连接到AWS以发送电子邮件需要花费大量时间.AWS Developer论坛上的以下主题提供了一些见解(您可能需要向下滚动才能获得更有用的帖子).
c# performance multithreading producer-consumer amazon-web-services
我正在使用C#TPL并且我遇到了生产者/消费者代码的问题...出于某种原因,TPL不重用线程并不断创建新线程
我做了一个简单的例子来演示这种行为:
class Program
{
static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
static CancellationTokenSource m_Cts = new CancellationTokenSource();
static void Producer()
{
try
{
while (!m_Cts.IsCancellationRequested)
{
Console.WriteLine("Enqueuing job");
m_Buffer.Add(0);
Thread.Sleep(1000);
}
}
finally
{
m_Buffer.CompleteAdding();
}
}
static void Consumer()
{
Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
}
static void Run(int i)
{
Console.WriteLine
("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
Thread.CurrentThread.ManagedThreadId,
Process.GetCurrentProcess().Threads.Count);
}
static void Main(string[] args)
{
Task producer = new Task(Producer);
Task consumer = new Task(Consumer);
producer.Start();
consumer.Start();
Console.ReadKey();
m_Cts.Cancel();
Task.WaitAll(producer, consumer); …
Run Code Online (Sandbox Code Playgroud) c# parallel-processing multithreading producer-consumer task-parallel-library
假设有1个生产者P和2个消费者C1和C2.并且有2个队列Q1和Q2,都具有特定容量.
P将生产物品并交替进入Q1和Q2.物品是为特定消费者生产的,不能被其他消费者消费.我如何在Java中实现以下内容:在我启动3个线程后,如果Q1为空,则线程C1被阻塞,直到在Q1中存在某些内容时通知它.Q2也是如此.当Q1和Q2都满时,P将被阻止,直到Q1或Q2未满时通知.
我正在考虑使用BlockingQueue,它会在队列为空时阻止使用者.但问题是当其中一个队列已满时,生产者将被阻止.我们可以使用Java中的任何数据结构来解决这个问题吗?
更新
我自己有一个解决方案,但我不确定它是否有效.我们仍然可以有2个BlockingQueues.当消费者从其队列中获取项目时,它会使用BlockingQueue.take()
,因此当队列中没有项目时它将被阻止.当生产者将项目添加到任一队列时,它使用BlockingQueue.offer()
.因此,它永远不会被此操作阻止,并且如果队列已满,它将变为"false".另外,我们保留一个AtomicInteger来指示未满的队列数.每当生产者P想要将一个项目放入队列时,如果它得到错误的返回,我们将AtomicInteger减少1.当它达到0时,生成器调用AtomicInteger.wait()
.每当消费者从其队列中获取项目时,它也会检查AtomicInteger.当它为0时,消费者将其增加1并进行呼叫AtomicInteger.notify()
.
请让我知道这个解决方案是否有意义.
非常感谢!
java concurrency messaging producer-consumer java.util.concurrent
我有一个C++ 11应用程序,它具有生成数据的高优先级线程,以及一个消耗它的低优先级线程(在我的情况下,将其写入磁盘).我想确保高优先级的生产者线程永远不会被阻止,即它只使用无锁算法.
使用无锁队列,我可以从生产者线程将数据推送到队列,并从消费者线程中轮询它,从而实现上述目标.我想修改我的程序,以便消费者线程在非活动状态而不是轮询时阻塞.
似乎C++ 11条件变量可能对阻塞使用者线程很有用.有人能告诉我一个如何使用它的例子,同时避免消费者睡觉时数据仍在队列中吗?更具体地说,我想确保在生产者将最后一个项目推入队列后,消费者总是在某个有限的时间内被唤醒.生产者保持非阻塞也很重要.
我有一个生产者,我想通过一致的散列在消费者中一致地分配工作.例如,对于消费者节点X和Y,任务A,B,C应始终转到消费者X,而D,E,F应转到消费者Y.但如果Z加入消费者池,则可能会转移一点.
我不想处理编写自己的逻辑来连接到消费者节点,特别是不管理节点加入和离开池,所以我走了使用RabbitMQ的路径,每个消费者节点都有一个独占队列.
我遇到的一个问题是列出这些队列,因为生产者需要在分配工作之前知道所有可用的队列.AMQP甚至不支持列表队列,这使我不确定我的整个方法.RabbitMQ和Alice(目前已经破碎)添加了这个功能:是否有用于在RabbitMQ上列出队列和交换的API?
这是一个明智的使用兔子?我应该使用消息队列吗?有没有更好的设计,所以队列可以在消费者之间不断分配我的工作,而不是我需要这样做?
我是HornetQ的新手所以请耐心等待.我先告诉你我的要求:
我需要一个消息排队中间件,它可以在不同进程之间传递大小约1k的消息,具有低延迟和持久性(即它应该能够在系统崩溃中存活).我将有多个进程写入相同的队列,同样多个进程从同一队列中读取.
为此,我选择了HornetQ,因为它具有持久性消息传递的最佳评级.
我目前使用Hornetq v2.2.2Final作为独立服务器.
我能够使用核心api (ClientSession)成功创建持久/非持久队列,并成功将消息发布到队列(ClientProducer).
同样,我可以使用核心API (ClientConsumer)从队列中读取消息.
在此之后问题出现时,客户端已读取消息,消息仍保留在队列中,即队列中的消息数保持不变.也许我弄错了但是我的印象是,一旦消息被消耗 (读取+确认),它就会从队列中删除.但这种情况在我的情况下并没有发生,并且正在反复阅读相同的消息再次.
此外,我想告诉我,我已经尝试使用非持久性队列和非持久性消息.但问题仍然存在.
我正在使用的生产者代码:
public class HQProducer implements Runnable {
private ClientProducer producer;
private boolean killme;
private ClientSession session;
private boolean durableMsg;
public HQProducer(String host, int port, String address, String queueName,
boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
this.durableMsg = durableMsg;
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new …
Run Code Online (Sandbox Code Playgroud) 我正在学习C#中的异步/等待模式.目前我正在尝试解决这样的问题:
有一个生产者(硬件设备)每秒生成1000个数据包.我需要将此数据记录到文件中.
该设备仅具有一次ReadAsync()
报告单个数据包的方法.
我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次.
如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败.
到目前为止,我写了类似下面的内容.它有效,但我不确定这是否是解决问题的最佳方法.有任何意见或建议吗?在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么?
static async Task TestLogger(Device device, int seconds)
{
const int bufLength = 1000;
bool firstIteration = true;
Task writerTask = null;
using (var writer = new StreamWriter("test.log")))
{
do
{
var buffer = new byte[bufLength][];
for (int i = 0; i < bufLength; i++)
{
buffer[i] = await device.ReadAsync();
}
if (!firstIteration)
{
if (!writerTask.IsCompleted)
throw new Exception("Write Time Out!");
}
writerTask = Task.Run(() =>
{
foreach (var b in buffer)
writer.WriteLine(ToHexString(b)); …
Run Code Online (Sandbox Code Playgroud) c# ×4
java ×3
.net ×2
amqp ×1
async-await ×1
c++11 ×1
compression ×1
concurrency ×1
hornetq ×1
inputstream ×1
messaging ×1
performance ×1
queuing ×1
rabbitmq ×1