标签: producer-consumer

BlockingCollection - 高同步问题

从多个线程获取消息到队列的最佳方法是什么,并且一次只有一个线程处理此队列的项目?

尝试断开多线程的活动时,我经常使用此模式.

我正在使用BlockingCollection,如下面的代码提取中所示:

// start this task in a static constructor
Task.Factory.StartNew(() => ProcessMultiUseQueueEntries(), TaskCreationOptions.LongRunning);


private static BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>> _q = new BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>>();

    /// <summary>
    /// queued - Simple mechanism that will log the fact that this user is sending an xMsg (FROM a user)
    /// </summary>
    public static void LogXMsgFromUser(XClientMsgExt xMsg)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(xMsg, null, "", BOStatus.Ignore));
    }

    /// <summary>
    /// queued - Simple mechanism that will log the data being …
Run Code Online (Sandbox Code Playgroud)

c# multithreading producer-consumer

2
推荐指数
1
解决办法
2403
查看次数

在两个进程之间共享资源

我想知道在Python中两个进程之间共享队列(资源)所遵循的最佳实践。这是每个进程正在做的事情:

Process_1:从流式API连续获取数据(JSON格式)

Process_2:是一个守护程序(类似于Sander Marechal的代码),它将数据(一次一个)提交到数据库中

因此,Process_1(或Producer)将一个数据单元放到此共享资源上,而Process_2(或Consumer)将在此共享资源中轮询任何新的数据单元,并将它们存储在DB中(如果有)。

我想到了一些选择:

  • 使用泡菜(缺点:酸洗和去酸洗的额外开销)
  • 通过stdoutProcess_1到stdinProcess_2 传递数据(缺点:无,但不确定如何通过守护程序实现)
  • 使用库中的pool对象multiprocessing(缺点:不确定如何将其编码为一个进程是守护程序)

我想要在这方面实践最佳的解决方案,并附上一些代码:)。谢谢。

python producer-consumer shared-resource

2
推荐指数
1
解决办法
3384
查看次数

C++简单的线程问题

我正在编写一个简单的生产者/消费者程序来更好地理解c ++和多线程.在我运行消费者的线程中,我有前两行:

    pthread_cond_wait(&storageCond, &storageMutex);
    pthread_mutex_lock(&storageMutex);
Run Code Online (Sandbox Code Playgroud)

但程序陷入困境,可能是一个僵局.然后我换了线:

    pthread_mutex_lock(&storageMutex);
    pthread_cond_wait(&storageCond, &storageMutex);
Run Code Online (Sandbox Code Playgroud)

它奏效了.有人可以请帮助我理解为什么这有效,而前者没有?

谢谢.

c++ multithreading pthreads producer-consumer

2
推荐指数
1
解决办法
247
查看次数

调用创建其线程对象后实现runnable的Java类方法

我有一个java类

SomeClass implements Runnable
Run Code Online (Sandbox Code Playgroud)

其中有一个方法display().

当我创建这个类的线程时

Thread thread1 = new Thread(new SomeClass());
Run Code Online (Sandbox Code Playgroud)

现在我如何使用线程实例调用display()方法?

java multithreading producer-consumer runnable

2
推荐指数
2
解决办法
2万
查看次数

C# - 具有单个消费者线程的多个生产者线程

目前正在处理一个 C# 项目 - 我正在尝试做的一般想法是......

用户有一个或多个带有他们感兴趣的交易品种的投资组合,每个投资组合将交易品种的数据下载到 csv,解析它,然后对数据运行一组规则,根据这些规则的结果生成警报。每当经过设定的时间间隔时下载数据等

我计划让每个投资组合在它自己的线程上运行,这样当间隔过去时,每个投资组合可以继续下载数据,同时解析和运行规则,而不是一个一个。然后应将警报推送到包含警报队列的另一个线程 (!)。当它收到警报时,它会将它们发送给客户端。

有点多但是用 C# 解决这个问题的最好方法是什么 - 使用线程,或者像后台工作程序这样的东西,只是让警报队列在一个单独的线程上运行?

非常感谢您的任何建议,其中一些内容是新手,所以如果我完全错了,请随时告诉我:)

c# multithreading producer-consumer thread-safety blocking

2
推荐指数
1
解决办法
1106
查看次数

错误:'O_CREATE'未声明(首次使用此功能)

以下代码应该同步两个进程,一个编写一些整数的生产者,以及一个读取它们的消费者,现在执行它时会给我这个错误:

‘O_CREATE’ undeclared (first use in this function)
Run Code Online (Sandbox Code Playgroud)

但我已经包括了fcntl.h,还有什么可能是问题?

int main(void) 
{
int fd, n, i; 
pid_t pid, ppid; 
char buf[1];
if((fd=open("/tmp/data_file", O_APPEND|O_CREATE, 0640)) <0) exit(1);
sigset(SIGTERM,SIG_IGN);/* signal */   ; sigset(SIGINT,SIG_IGN); /* signal */ 
pid=fork(); 
switch (pid) { 
    case -1: { perror(“FORK”); exit(1); }
    case 0: /* child process - Producer */ 
        sigset(SIGUSR1,wakeup);   
        sighold(SIGUSR1);       /* block / hold signals SIGUSR1 until sigpause*/ 
        for (i=0; i<=100; i++) { 
            /* sleep a random amount of time */ 
            n = (int)(getpid()%256); …
Run Code Online (Sandbox Code Playgroud)

c linux signals producer-consumer

2
推荐指数
1
解决办法
2130
查看次数

C对如何初始化和实现pthread互斥锁和条件变量感到困惑

我对如何初始化和实现pthread互斥和条件变量有点困惑.该程序的目标是让生产者在队列中放置一定数量的整数,并且消费者将整数排除在队列之外.我还必须能够定义创建的生产者和消费者线程的数量.在入门代码中,我给出了这些:

// Locks & Condition Variables
pthread_mutex_t lock; // Lock shared resources among theads
pthread_cond_t full;  // Condition indicating queue is full
pthread_cond_t empty; // Condition indicating queue is empty
Run Code Online (Sandbox Code Playgroud)

作为共享资源.在//TODOmain方法的注释中,其中一个步骤是初始化锁和条件变量.我对pthread mutex和条件的理解非常薄弱,所以我会说:

lock = PTHREAD_MUTEX_INIT;
full = PTHREAD_MUTEX_INIT;
empty = PTHREAD_MUTEX_INIT;
Run Code Online (Sandbox Code Playgroud)

在使用者和制作者方法中,我是否只是通过说:

pthread_mutex_lock(&lock);
Run Code Online (Sandbox Code Playgroud)

pthread_cond_wait(&full, &lock);
Run Code Online (Sandbox Code Playgroud)

我的代码现在非常错误,所以我想在进一步调试之前至少确保我正确使用互斥锁和条件.提前致谢!

c mutex pthreads producer-consumer conditional-statements

2
推荐指数
1
解决办法
1万
查看次数

使用 CancellationToken 仅取消一项任务

假设我们有一个生产者-消费者模式,其中包含一个生产任务和 3 个消费者任务,如下所示:

        Task[] Consumer = new Task[10];
        for (int i = 0; i < 3; i++)
        {

            Consumer[i] = Task.Run(() => DoWork(CancellationToken ct));
        }
Run Code Online (Sandbox Code Playgroud)

问题是我怎么能只取消任务消费者[2]?当发送取消令牌时,所有消费者都会停止!如果需要,我希望能够取消单个消费者。

非常感谢

c# task producer-consumer

2
推荐指数
1
解决办法
865
查看次数

阻止具有多个元素的集合?

生产者-消费者集合 [1] [2] 的所有 C# 实现似乎都有类似于以下内容的接口:

private Queue<T> items;

public void Produce(T item)
public T Consume()
Run Code Online (Sandbox Code Playgroud)

有没有像下面这样的实现?

private Queue<T> items;

public void Produce(T[] item)
public T[] Consume(int count)
Run Code Online (Sandbox Code Playgroud)

希望这能让我一次生产/消费不同数量的项目,而无需过多地锁定每个项目。这对于生产/消费大量项目似乎是必要的,但我没有找到任何实现的运气。

[1] C#生产者/消费者

[2]在 .NET 中创建阻塞 Queue<T>?

.net c# producer-consumer

2
推荐指数
1
解决办法
2521
查看次数

TPL Dataflow - 非常快的生产者,没有那么快的消费者OutOfMemory异常

在将TPL Dataflow移植到我的生产代码之前,我正在尝试使用它.生产代码是传统的生产者/消费者系统 - 生产者生成消息(与金融领域相关),消费者处理这些消息.

我感兴趣的是,如果在某些时候生产者的生产速度比消费者能够处理它的速度快得多(系统会爆炸,或者会发生什么),那么环境将会保持稳定,更重要的是在这些情况下该怎么做.

因此,为了尝试类似的简单应用程序,我想出了以下内容.

    var bufferBlock = new BufferBlock<Item>();

    var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
                        {
                            MaxDegreeOfParallelism = Environment.ProcessorCount
                            ,
                            BoundedCapacity = 100000
                        };

        var dataFlowLinkOptions = new DataflowLinkOptions
                        {
                            PropagateCompletion = true
                        };

        var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
 executiondataflowBlockOptions);

            bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
            for (int i = 0; i < int.MaxValue; i++)
            {
                 bufferBlock.SendAsync(GenerateItem());
            }

            bufferBlock.Complete();
            Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)

Item 是一个非常简单的课程

internal class Item
    {
        public Item(string itemId)
        {
            ItemId = itemId;
        }

        public string ItemId { get; }
    } …
Run Code Online (Sandbox Code Playgroud)

.net c# producer-consumer tpl-dataflow

2
推荐指数
1
解决办法
1749
查看次数