标签: producer-consumer

消费者池的自动缩放

我们有一个Web应用程序,允许用户触发对外部资源的请求.外部资源花费了不确定的时间来收集结果,因此我们必须轮询它以获取更新,并在完成后收集最终结果.

我们希望这样做,以便当用户触发请求时,它会被添加到队列中,然后许多工作线程将获取每个请求并进行轮询,同时用户执行其他操作.

由于请求的数量在白天变化很大,我们认为在很慢的情况下让很多工人无所事事会浪费资源,但与此同时我们需要有足够的工人来处理高峰负荷.系统.

当有很多请求在等待时,我们希望能够增加更多工作人员,但在没什么可做的时候就会杀掉工人.

可以使用EJB执行此操作,但我们不想使用它.我们也不想使用JMS或其他大型框架来处理这个问题,除非我们已经在使用它(Spring,Quartz,很多Apache的东西).

由于EJB支持这一点,并且它是那里发现的更有用的功能之一,我们想象有人已经为我们解决了这个问题.建议?

java multithreading producer-consumer

1
推荐指数
1
解决办法
165
查看次数

使用多个互斥锁和条件变量

是否存在使条件变量使用多个互斥锁的机制?我在Linux和pthreads在C++中.

在一个应用程序中,我需要两个互斥量(而不是一个)由pthread_cond_wait()以原子方式获取和释放,但该函数只接受一个.

我有一个名为BlockingManager的类,它有以下方法:

blockMeFor( pthread_cond_t* my_cond, pthread_mutex_t* my_lock, set<int> waitees);
Run Code Online (Sandbox Code Playgroud)

我正在使用它,假设它像pthread_cond_wait一样获取/释放互斥锁.

问题是,为了实现blockingManager,我也需要一个内部互斥锁,这两个互斥锁应该被原子地获取和释放.

这里有一个相关的讨论,它说等待多个互斥锁产生未定义的行为. http://sourceware.org/ml/libc-help/2011-04/msg00011.html

我面临的问题的生产者/消费者模型如下:

我们有多个客户.每个客户都有一些任务.每个任务可能有多个先决条件(在同一客户端或其他客户端的任务中).每个客户端都有一个消费者线程 任务从一个生产者线程分配给客户端.新分配的任务可能有资格在先前任务之前完成.在某些时刻可能没有任务要完成,但如果要完成任务,至少应该完成一项任务.(它应该是节省工作的)

我每个消费者线程使用一个condvar,一旦没有任务要为该线程完成,它就会阻塞.condvar可以通过其中任何一个发出信号

  • 生产者线程分配新任务.

  • 另一个消费者线程完成任务.

我每个消费者使用一个互斥锁来保护生产者和消费者之间的共享数据结构.和一个互斥(内部互斥)来保护多个用户之间的共享数据结构.

c++ mutex pthreads producer-consumer race-condition

1
推荐指数
1
解决办法
3032
查看次数

Java:信号量:生产者消费者:线程和线程组

我为生产者和消费者实现了一个信号量类.

它工作正常,但我现在感觉我们使用notifyAll来通知唤醒或通知所有线程的线程.

我想知道是否有任何方式我们只能通知一组特定的组:生成器只使用组或其他任何方式通知消费者线程,反之亦然.

PS:我是java和Threading的新手.

java multithreading semaphore producer-consumer

1
推荐指数
1
解决办法
924
查看次数

为什么GC不收集未使用的对象?

我为我的实验用BlockingCollection实现了Producer/Consumer模式.

PerformanceCounter c = null;
void Main()
{
    var p  =System.Diagnostics.Process.GetCurrentProcess();
    c = new PerformanceCounter("Process", "Working Set - Private", p.ProcessName);

    (c.RawValue/1024).Dump("start");
    var blocking = new BlockingCollection<Hede>();
    var t = Task.Factory.StartNew(()=>{
        for (int i = 0; i < 10000; i++)
        {
            blocking.Add(new Hede{
            Field = string.Join("",Enumerable.Range(0,100).Select (e => Path.GetRandomFileName()))
            });
        }
        blocking.CompleteAdding();
    });

    var t2 = Task.Factory.StartNew(()=>{
        int x=0;
        foreach (var element in blocking.GetConsumingEnumerable())
        {
            if(x % 1000==0)
            {
                (c.RawValue/1024).Dump("now");
            }
            x+=1;   
        }
    });
    t.Wait();
    t2.Wait();
    (c.RawValue/1024).Dump("end");
}
Run Code Online (Sandbox Code Playgroud)

运行后我的内存转储:

start
211908 …
Run Code Online (Sandbox Code Playgroud)

c# memory-management producer-consumer

1
推荐指数
1
解决办法
1187
查看次数

消费者和生产者之间的Qt线程发布?Qmutex?QSemaphore?什么?

我有一个相当简单的情况:我在一个线程中有一个高速数据生成器,它生成一个带有[变长]元素的缓冲区.填充此缓冲区后,我有一个将其写入磁盘的使用者.

现在,生成器线程需要返回生成ASAP,如果缓冲区尚未被消费者线程写入,则会旋转到位:

int volatile datatogo=0; // global with starting condition

while(datatogo != 0) // spin, buffer not yet written out
{
    if (recflag == 0) return; // recording is terminated
}
// clipped code fills buffer, then:
datatogo = lengthoffill;
Run Code Online (Sandbox Code Playgroud)

...在另一个线程中,缓冲区编写器执行此操作:

    while(recflag)
    {
        if (datatogo)
        {
            if (m_File.write(sbuffer,datatogo) == -1)
            {
                recflag=0; // bail NOW
            }
            datatogo = 0; // buffer transferred
        }
        usleep(100); // 100 uS
    }
Run Code Online (Sandbox Code Playgroud)

这样做的最终结果是写入磁盘消费者在完成写入时放弃了CPU,知道生产者必须花一些时间来实际填充缓冲区.当消费者在没有数据时睡着了,CPU可供生产者使用.消费者睡眠时间为100 uS,检查数据,如果没有,则返回睡眠状态,因此除了睡眠状态之外,它在该状态下没有做太多事情.

但是,因为睡眠时间是任意的,所以这不可能是最佳的; 即使我仔细调整它以在我的机器上工作(8核,3 GHz),它在另一台机器上的行为也会不同.有时会有数据准备好被写入,消费者刚刚进入睡眠状态,将整个100美元扔出窗外,可以这么说.我也对这种小型计时窗口的分辨率和可靠性有疑虑 - 而较大的窗口将无法工作.

所以.Qt中有多种机制来控制对事物的访问,这基本上就是我想做的事情.但我不明白哪一个(如果有的话)会做我想做的事,这是:

1)让消费者睡眠直到缓冲区已满,然后写入并返回睡眠状态,或直到作业停止,这样它就可以关闭文件(它可以检查唤醒时哪个)

2)当生成器只在缓冲区被写出时才会休眠,否则填充缓冲区并返回生成其内容.

我需要尽可能多的CPU可用 - …

qt multithreading producer-consumer

1
推荐指数
1
解决办法
1691
查看次数

抽象类和不可空值类型

我正在尝试编译大师“Joe Duffy”为类生产者/消费者提出的代码(更多迭代器乐趣与生产者/消费者模式),但发生了此错误:

(我使用的是 Visual Studio 2010 和 Net 4.0.3)

Program.cs(37,34):错误CS0453:类型“T”必须是不可空值类型才能将其用作泛型类型或方法“System.Nullable”中的参数“T”
Program.cs(11,40):(相关位置)
Program.cs(37,61):错误CS0453:类型“T”必须是不可空值类型才能将其用作泛型类型或方法“System.Nullable”中的参数“T”
Program.cs(11,40):(相关位置)
Program.cs(44,53):错误CS0453:类型“T”必须是不可空值类型才能将其用作泛型类型或方法“System.Nullable”中的参数“T”
Program.cs(11,40):(相关位置)

对于我浅薄的知识来说实在是太多了!有人可以提出解决方案吗?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ProducerConsumerClass
{
    class Program
    {
        public abstract class Producer<T>
        {
            public Producer()
            {
                worker = new Thread(new ThreadStart(this.ProductionCycle));
            }
            private Queue<T> buffer = new Queue<T>();
            public Thread worker;
            private bool done;
            public bool Done
            {
                get
                {
                    return done;
                }
            }

            public IEnumerable<T> ConsumerChannel
            {
                get
                {
                    if (done)
                        throw new InvalidOperationException("Production …
Run Code Online (Sandbox Code Playgroud)

c# multithreading class producer-consumer non-nullable

1
推荐指数
1
解决办法
2591
查看次数

并发优先收集

我有许多线程从服务器列表中检索数据。每 5 分钟从服务器解析器下载一次服务器列表。我用于处理数据的线程应该仅使用响应时间最短的服务器。每个服务器的响应时间可能因请求而异。因此,在更新服务器列表之间的时间范围内,我应该验证每个服务器的响应时间。

我最初的方法是创建两个额外的线程:第一个线程更新服务器列表,第二个线程验证每个服务器的响应时间并根据服务器的响应时间对服务器列表进行排序。

我尝试使用BlockingCollection<T>它来连接生产者和消费者,但在我的任务中,我有两个并发消费者,并且也BlockingCollection<T>没有插入项目来创建服务器优先列表的本机能力。

ConcurrentStack<T>或者ConcurrentQueue<T>也不能按原样使用,因为它们像 as 一样是非阻塞的,BlockingCollection<T>并且它们需要额外的机制来阻塞需要队列中的项目的线程。

请帮我解决这个任务。

c# collections concurrency multithreading producer-consumer

1
推荐指数
1
解决办法
2299
查看次数

以FIFO顺序C#的TPL生产者消费者

我仅限于.NET 3.5,并且正在使用TPL。该方案是生产者-消费者,但是没有阻塞的问题。由于种种限制,不能在这种情况下使用PLINQ,而我们想要实现的是生产许多物品的最快方法(每次生产都是长期运行的,并且物品数量超过100,000),但是每个物品必须以FIFO顺序消费(这意味着,我要求生产的第一个商品必须首先被消费,即使它是在其他商品之后创建的)也必须尽快消费。

对于这个问题,我尝试使用任务列表,等待列表中的第一项完成(taskList.First()。IsCompleted()),然后在其上使用消耗函数,但是由于某些原因,我似乎用光了内存(可能由于等待启动的任务而导致任务列表中的项目过多?)还有更好的方法吗?(我正在努力实现最快的速度)

非常感谢!

c# multithreading producer-consumer task-parallel-library

1
推荐指数
1
解决办法
787
查看次数

在 Confluence kafka go 中读取来自 kafka 主题的消息时如何使用确认?

我正在开发一个推送通知,向客户端发送许多消息。消息被发布到主题中,订阅者从同一主题中读取消息。如果在从主题偏移量读取消息后立即出现错误,即使我无法发送消息,我的订阅者也需要读取下一条消息并发送它。我所说的错误是指服务器停机或出现严重问题。

如何阅读带有确认信息的消息?

producer-consumer go apache-kafka

1
推荐指数
1
解决办法
4395
查看次数

为什么消费者不是此计划中的consumin数据

我是java的新手,我正在尝试实现简单的生产者消费者问题.下面是我为测试它而编写的代码.我有3个类,主类,生产者类和消费者类.现在的问题是我的生产者正在生产数据,但我的消费者并没有消费它.有人可以解释一下为什么会发生这种情况.提前致谢.

public class ProducerConsumerWithQueue {
    /**
     * @param args
     */
    public static void main(String[] args) {

        ArrayList<String > queue = new ArrayList<String>();         
        Producer producer = new Producer( queue);           
        Consumer consumer = new Consumer( queue);

        consumer.start();
        producer.start();   
    }
}


public class Producer extends Thread{
    ArrayList<String> queue;
        public Producer(ArrayList<String> queue) {
        this.queue = queue;
    }

    public void run(){
        System.out.println("Producer Started");
        System.out.println("Producer size "+queue.size());
        for(int i=0;i<50;i++){
            try {
                    synchronized (this) {               
                    if(queue.size()>10){
                        System.out.println("Producer Waiting");
                        wait();
                    }else{
                        System.out.println("producing "+i);
                        queue.add("This is "+i);
                        notifyAll();
                    } …
Run Code Online (Sandbox Code Playgroud)

java producer-consumer

0
推荐指数
1
解决办法
63
查看次数