标签: producer-consumer

这个Python生产者 - 消费者无锁方法是否是线程安全的?

我最近编写了一个使用简单生产者/消费者模式的程序.它最初有一个与不正确使用线程有关的错误.我最终解决了这个问题.但它让我想到是否有可能以无锁的方式实现生产者/消费者模式.

我的要求很简单:

  • 一个生产者线程.
  • 一个消费者线程.
  • 队列只有一个项目.
  • 生产者可以在消耗当前物品之前生成下一个物品.因此当前项目丢失了,但这对我来说没问题.
  • 消费者可以在生成下一个项目之前使用当前项目.因此,当前项目被消耗两次(或更多),但这对我来说没问题.

所以我写了这个:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)
Run Code Online (Sandbox Code Playgroud)

我的问题是:这段代码是否是线程安全的?

直接评论:这段代码不是真的无锁 - 我使用CPython并且它有GIL.

我测试了一点代码,它似乎工作.它转换为一些由于GIL而成为原子的LOAD和STORE操作.但是我也知道del x当x实现__del__方法时,操作不是原子的.因此,如果我的项目有__del__方法并且发生了一些讨厌的调度,那么事情可能会破裂.或不?

另一个问题是:为了使上述代码正常工作,我必须施加什么样的限制(例如生成的项目类型)?

我的问题只是关于利用CPython和GIL的怪癖的理论可能性,以便提出无锁(即没有像代码中的threadading.Lock那样的锁)解决方案.

python locking producer-consumer thread-safety

6
推荐指数
2
解决办法
3457
查看次数

Java:高性能消息传递(单一生产者/单一消费者)

我最初在这里问过这个问题,但我意识到我的问题不是关于一个真正的循环.我想知道的是,在Java中进行高性能异步消息传递的正确方法是什么?

我想做什么......

我有大约10,000名消费者,每个消费者都从他们的私人队列中消费消息.我有一个线程一个接一个地生成消息并将它们放在正确的消费者队列中.每个消费者无限循环,检查消息是否出现在队列中并进行处理.

我认为这个术语是"单一生产者/单一消费者",因为有一个生产者,每个消费者只能在他们的私人队列上工作(多个消费者从不从同一个队列中读取).

在Consumer.java里面:

@Override
public void run() {
    while (true) {
        Message msg = messageQueue.poll();
        if (msg != null) {
            ... // do something with the message
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Producer正在快速地将消息放入消费者消息队列中(每秒数百万条消息).消费者应该尽快处理这些消息!

注意:while (true) { ... }由Producer作为最后一条消息发送的KILL消息终止.

但是,我的问题是关于设计此消息传递的正确方法.我应该为messageQueue使用什么样的队列?应该是同步还是异步?如何设计消息?我应该使用while-true循环吗?消费者应该是一个线程,还是其他什么?10,000个线程会慢慢爬行吗?什么是线程的替代品?

那么,在Java中进行高性能消息传递的正确方法是什么?

java asynchronous message-passing producer-consumer

6
推荐指数
1
解决办法
6335
查看次数

C#producer/consumer/observer?

我有一个生产者/消费者队列,除了有特定类型的对象.因此,不只是任何消费者都可以使用添加的对象.我不想为每种类型创建一个特定的队列,因为有太多.(它有点延伸了生产者/消费者的定义,但我不确定正确的术语是什么.)

是否有EventWaitHandle这样的东西允许带参数的脉冲?例如myHandle.Set(AddedType = "foo").现在我正在使用Monitor.Wait,然后每个消费者都会检查脉冲是否真的是针对他们的,但这似乎毫无意义.

我现在拥有的pseduocode版本:

class MyWorker {
    public string MyType {get; set;}
    public static Dictionary<string, MyInfo> data;

    public static void DoWork(){
        while(true){
             if(Monitor.Wait(data, timeout)){
                   if (data.ContainsKey(MyType)){
                        // OK, do work
                   }
             }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,当其他东西被添加到字典中时,我可能会得到脉冲.我只关心将MyType添加到dict中.有没有办法做到这一点?这不是一个大问题,但是,例如,我现在必须手动处理超时,因为每次锁定都可以在超时内成功,但MyType永远不会添加到dict中timeout.

c# asp.net multithreading event-handling producer-consumer

6
推荐指数
1
解决办法
1390
查看次数

Java:Producer = Consumer,如何知道何时停止?

我有几个工人,使用ArrayBlockingQueue.

每个worker从队列中获取一个对象,对其进行处理,结果可以得到几个对象,这些对象将被放入队列中进行进一步处理.所以,工人=生产者+消费者.

工人:

public class Worker implements Runnable
{
    private BlockingQueue<String> processQueue = null;

    public Worker(BlockingQueue<String> processQueue)
    {
        this.processQueue = processQueue;
    }

    public void run()
    {
        try
        {
            do
            {
                String item = this.processQueue.take();
                ArrayList<String> resultItems = this.processItem(item);

                for(String resultItem : resultItems)
                {
                    this.processQueue.put(resultItem);
                }
            }
            while(true);
        }
        catch(Exception)
        {
            ...
        }
    }

    private ArrayList<String> processItem(String item) throws Exception
    {
        ...
    }
}
Run Code Online (Sandbox Code Playgroud)

主要:

public class Test
{
    public static void main(String[] args) throws Exception
    {
        new Test().run();
    }

    private …
Run Code Online (Sandbox Code Playgroud)

java producer-consumer blockingqueue

6
推荐指数
1
解决办法
2557
查看次数

用bash异步消耗管道

我有这样的bash脚本

data_generator_that_never_guits | while read data 
do
 an_expensive_process_with data
done
Run Code Online (Sandbox Code Playgroud)

第一个过程连续生成事件(以不规则的间隔),需要在它们可用时进行处理.这个脚本的一个问题是read on消耗了一行输出; 由于处理非常昂贵,我希望它能够使用当前可用的所有数据.另一方面,如果有新数据可用,则必须立即开始处理.简而言之,我想做这样的事情

data_generator_that_never_guits | while read_all_available data 
do
 an_expensive_process_with data
done
Run Code Online (Sandbox Code Playgroud)

如果没有数据可供使用,命令read_all_available将等待,或者将所有当前可用数据复制到变量.如果数据不是由实线组成,那就完全没了问题.基本上,我正在寻找一个读取模拟,它将读取整个管道缓冲区而不是从管道中读取一行.

对于你们之间的好奇,我有一个需要触发源文件重建的构建脚本的问题的背景会发生变化.我想避免经常触发重建.请不要建议我使用grunt,gulp或其他可用的构建系统,它们不能很好地用于我的目的.

谢谢!

shell pipe producer-consumer

6
推荐指数
1
解决办法
1274
查看次数

在scala中实现Producer Consumer的正确方法是什么

我尝试在scala中实现Producer Consumer程序而不使用Queue.因为我认为Actor已经实现了"邮件队列"或其他东西,所以再次编写代码将是多余的.

我试图纯粹在Actor中编写程序.以下是多生产者多个消费者计划.制片人睡了一会儿,模拟做某事.消费者根本不睡觉.

但是我不知道如果我没有添加监督者演员来监视消费者,如何关闭程序,以及使用"Await"(代码中的监督者类)的Promise对象

反正有没有摆脱他们?

import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  def receive = {
    case _ =>
      while (true) {
        val sleepTime = scala.util.Random.nextInt(1000)
        Thread.sleep(sleepTime)
        println("Producer %s send food" format name)
        pool ! name
      }
  }
}

class Consumer(supervisor : ActorRef)(val name:String) extends Actor {

  var counter = 0

  def receive = {
    case s => 
      counter += 1
      println("%s eat food produced by %s" …
Run Code Online (Sandbox Code Playgroud)

scala producer-consumer actor akka

6
推荐指数
1
解决办法
3260
查看次数

LMAX Disruptor - 决定批量大小的因素是什么?

我最近一直在学习LMAX Disruptor并且正在做一些实验.令我困惑的一件事是处理程序方法的endOfBatch参数.请考虑以下代码.首先,我调用的虚拟消息和消费者类,以及:onEventEventHandlerTest1Test1Worker

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,我已经延迟了500毫秒,以替代一些现实世界的工作.我也在控制台中打印了序列号

然后我的驱动程序类(作为生产者)调用DisruptorTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new …
Run Code Online (Sandbox Code Playgroud)

java multithreading producer-consumer disruptor-pattern lmax

6
推荐指数
1
解决办法
2512
查看次数

Kafka使用者:从代理[ArrayBuffer(id:0,host:user-Desktop,port:9092)]获取主题的主题元数据失败

试图在一台机器上运行Kafka生产商,在另一台机器上运行消费者.设置以下属性:advertised.host.name advertised.port

但是在控制台消费者上获得以下错误:

bin/kafka-console-consumer.sh --zookeeper ip:2181 - topic topic --from-beginning

[2016-01-18 16:38:00,939] WARN Fetching topic metadata with correlation id 2112 for topics [Set(topic)] from broker [id:0,host:user-Desktop,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2016-01-18 16:38:00,939] WARN [console-consumer-82496_gopikrishnan-B85M-D3H-A-1453114849146-e6661d41-leader-finder-thread], Failed to find leader for Set([topic,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(topic)] from broker [ArrayBuffer(id:0,host:user-Desktop,port:9092)] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) …
Run Code Online (Sandbox Code Playgroud)

producer-consumer apache-kafka apache-zookeeper

6
推荐指数
1
解决办法
4312
查看次数

C#等待生产者/消费者中的多个事件

我正在使用生产者/消费者模式实现数据链路层.数据链路层有自己的线程和状态机,通过线路传输数据链路协议(以太网,RS-232 ......).物理层的接口表示为System.IO.Stream.另一个线程将消息写入数据链接对象并从中读取消息.

数据链接对象具有空闲状态,必须等待以下四种情况之一:

  1. 收到一个字节
  2. 网络线程提供了一条消息
  3. 保持活动计时器已过期
  4. 所有通信都被网络层取消

我很难找到最好的方法来实现这一点,而无需将通信分成读/写线程(从而大大增加了复杂性).以下是我如何获得4分中的3分:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
Run Code Online (Sandbox Code Playgroud)

要么

BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
Run Code Online (Sandbox Code Playgroud)

我该怎么做才能阻止线程,等待所有四个条件?欢迎所有建议.我没有设置任何架构或数据结构.

编辑:********感谢大家的帮助.这是我的解决方案********

首先,我认为没有生产者/消费者队列的异步实现.所以我实现了类似于这个stackoverflow帖子的东西.

我需要一个外部和内部取消源来分别停止使用者线程并取消中间任务,类似于本文.

byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
    CancellationToken …
Run Code Online (Sandbox Code Playgroud)

c# asynchronous producer-consumer

6
推荐指数
1
解决办法
629
查看次数

如何从任何两个BlockingCollections中获取优先于第一个集合的项目?

我有两个BlockingCollection<T>对象,collection1collection2.我想要使​​用这些集合中的项目优先考虑项目collection1.也就是说,如果两个集合都有项目,我想先从中获取项目collection1.如果他们都没有物品,我想等待物品可用.

我有以下代码:

public static T Take<T>(
    BlockingCollection<T> collection1,
    BlockingCollection<T> collection2) where T:class
{
    if (collection1.TryTake(out var item1))
    {
        return item1;
    }

    T item2;

    try
    {
        BlockingCollection<T>.TakeFromAny(
            new[] { collection1, collection2 },
            out item2);
    }
    catch (ArgumentException)
    {
        return null;
    }

    return item2;
}
Run Code Online (Sandbox Code Playgroud)

在两个集合上调用此代码nullCompleteAdding,它们都会返回,并且它们都是空的.

我对此代码的主要问题是该TakeFromAny方法的文档指定TakeFromAny将在"集合"上调用ArgumentExceptionif CompleteAdding:

ArgumentException的

collections参数是一个0长度数组或包含null元素,或者已在集合上调用CompleteAdding().

如果CompleteAdding在任何集合上调用它会抛出吗?或两个集合?

如果CompleteAdding被调用并且集合仍然有一些项目,它会抛出吗?

我需要一种可靠的方法来做到这一点.

在这段代码中,我试图从collection1第一个开始,因为TakeFromAny …

c# producer-consumer

6
推荐指数
1
解决办法
423
查看次数