标签: producer-consumer

创建具有相同基类型的不同对象。工厂模式?

我必须为大学课程实现一个多生产者/多消费者示例应用程序,并且很难找到以下问题的解决方案,这并没有让我觉得我做错了什么;)

我必须实现 aProducer产生给定类型的Component( CPUComponent, MainboardComponent. 公共Component类的所有子类)。a 的每个实例Producer只会产生给定数量的一种类型的组件(例如只有主板),然后退出。

Components为都或多或少不变的对象(仅final字段)和所有逻辑都在共同的基类实现Component(下面简化的)

public abstract class Component implements Serializable
{    
    private final long id;

    public Component(int id) { ... }

    public long getId()
    {
        return id;
    }
}
Run Code Online (Sandbox Code Playgroud)

的子类Component只是原始的,比如

public class CPUComponent extends Component
{
    public CPUComponent(long id) { ... }
}
Run Code Online (Sandbox Code Playgroud)

由于语言是 Java,我无法使用泛型轻松解决此对象生成问题(就像在 C# 中一样,因为我无法在 Java 中实例化泛型类型参数的新对象)。所以我开始实现一个工厂

public interface ComponentFactory {
    Component createComponent(Producer …
Run Code Online (Sandbox Code Playgroud)

java factory producer-consumer

5
推荐指数
1
解决办法
2755
查看次数

如何使用条件变量?

关于 Ruby 中的条件变量的资源并不多,但大多数都是错误的。就像ruby​​-doc一样,教程在这里在这里发布- 他们都可能遇到死锁。

sleep我们可以通过按给定顺序启动线程并可能在中间放置一些线程来强制同步来解决问题。但这只是推迟了真正的问题。

我将代码重写为经典的生产者-消费者问题

require 'thread'
queue = []
mutex = Mutex.new
resource = ConditionVariable.new
threads = []

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do 
      resource.wait(mutex)
      value = queue.pop
      print "consumed #{value}\n"
    end
  end
end

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do
      queue << i
      print "#{i} produced\n"
      resource.signal
    end
    sleep(1) #simulate expense
  end
end

threads.each(&:join)
Run Code Online (Sandbox Code Playgroud)

有时你会得到这个(但并非总是如此):

0 produced
1 produced
consumed 0
2 produced …
Run Code Online (Sandbox Code Playgroud)

ruby mutex deadlock producer-consumer race-condition

5
推荐指数
1
解决办法
4873
查看次数

Java 中的生产者-消费者多线程

我想用Java中的多线程等待和通知方法编写程序。
该程序有一个堆栈(最大长度 = 5)。生产者永远生成数字并将其放入堆栈中,消费者从堆栈中选取它。

当堆栈已满时,生产者必须等待,当堆栈为空时,消费者必须等待。
问题是它只运行一次,我的意思是一旦它产生 5 个数字,它就会停止,但我将 run 方法放在 while(true) 块中以不间断运行,但它没有。
这是我到目前为止所尝试的。
生产者类别:

package trail;
import java.util.Random;
import java.util.Stack;

public class Thread1 implements Runnable {
    int result;
    Random rand = new Random();
    Stack<Integer> A = new Stack<>();

    public Thread1(Stack<Integer> A) {
        this.A = A;
    }

    public synchronized void produce()
    {
        while (A.size() >= 5) {
            System.out.println("List is Full");
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        result = rand.nextInt(10);

        System.out.println(result + " produced ");
        A.push(result);
        System.out.println(A); …
Run Code Online (Sandbox Code Playgroud)

java multithreading producer-consumer notify wait

5
推荐指数
1
解决办法
3万
查看次数

高吞吐量非阻塞服务器设计:繁忙等待的替代方案

我一直在为多媒体消息构建一个高吞吐量的服务器应用程序,实现语言是C++。每台服务器可以独立使用,也可以将多台服务器连接在一起,创建一个基于DHT的覆盖网络;服务器就像Skype一样的超级对等点。

工作正在进行中。目前,服务器每秒可以处理大约 200,000 条消息(256 字节消息),并且在我的机器(Intel i3 Mobile 2 GHz、Fedora Core 18(64 位)、4 GB RAM)上的最大吞吐量约为 256 MB/s长度为 4096 字节的消息。服务器有两个线程,一个线程用于处理所有 IO(基于 epoll,边缘触发),另一个线程用于处理传入的消息。覆盖管理还有另一个线程,但在当前的讨论中无关紧要。

讨论中的两个线程使用两个循环缓冲区共享数据。线程编号 1 使用一个循环缓冲区为线程编号 2 的新消息入队,而线程编号 2 通过另一个循环缓冲区返回已处理的消息。服务器是完全无锁的。我没有使用任何同步原语,甚至没有使用原子操作。

循环缓冲区永远不会溢出,因为消息是池化的(在启动时预先分配)。事实上,所有重要/经常使用的数据结构都被集中起来以减少内存碎片并提高缓存效率,因此我们知道服务器启动时我们将创建的最大消息数,因此我们可以预先计算最大缓冲区的大小,然后相应地初始化循环缓冲区。

现在我的问题是:线程 #1 将序列化消息一次一条消息(实际上是指向消息对象的指针)排入队列,而线程 #2 以块(32/64/128 的块)从队列中取出消息,然后返回通过第二个循环缓冲区以块的形式处理消息。如果没有新消息,线程#2 会一直忙着等待,因此让 CPU 核心之一一直忙着。我怎样才能进一步改进设计?忙等待策略的替代方案是什么?我想优雅而有效地做到这一点。我考虑过使用信号量,但我担心这不是最好的解决方案,原因很简单,每次我在线程 #1 中排队消息时都必须调用“sem_post”,这可能会大大降低吞吐量,第二个线程必须调用“sem_post”等于防止信号量溢出的次数。另外我担心信号量实现可能在内部使用互斥锁。

第二个不错的选择可能是使用信号,如果我能发现一种算法,仅当第二个线程“清空队列并且正在调用 sigwait”或“已经在等待 sigwait”,简而言之信号必须提高最少次数,尽管如果信号比需要的次数多几次也不会受到伤害。是的,我确实使用过谷歌搜索,但我在互联网上找到的解决方案都不是令人满意的。以下是一些注意事项:

A. 服务器在进行系统调用时必须浪费最少的 CPU 周期,并且必须使用最少的系统调用次数。

B. 必须有非常低的开销并且算法必须是高效的。

C. 没有任何锁定。

我希望所有选项都摆在桌面上。

这是我共享服务器信息的站点的链接,以便更好地了解其功能和目的:www.wanhive.com

c++ algorithm multithreading producer-consumer busy-waiting

5
推荐指数
1
解决办法
1815
查看次数

Kafka突然重置消费者Offset

我正在使用 Kafka 0.8 和 zookeeper 3.3.5。实际上,我们有十几个主题正在消费,没有任何问题。

最近,我们开始喂食和消费一个行为怪异的新话题。消耗的偏移量突然重置。它尊重我们设置的 auto.offset.reset 策略(实际上是最小的),但我不明白为什么这个话题突然重置了它的偏移量。

我正在使用高级消费者。

这是我发现的一些错误日志: 我们有一堆这样的错误日志:

[2015-03-26 05:21:17,789] INFO 从代理 id:1,host:172.16.23.1,port:9092 获取元数据,1 个主题的相关 id 为 47 Set(MyTopic) (kafka.cl)
ient.ClientUtils$)
[2015-03-26 05:21:17,789] ERROR Producer 连接到 172.16.23.1:9092 不成功(kafka.producer.SyncProducer)
java.nio.channels.ClosedByInterruptException
        在 java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        在 sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681)
        在 kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        在 kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        在 kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        在 kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

每次发生此问题时,我都会看到 WARN 日志:

[2015-03-26 05:21:30,596] 警告由于套接字错误而重新连接:空(kafka.consumer.SimpleConsumer)

然后真正的问题发生了:

[2015-03-26 05:21:47,551] INFO 连接到 172.16.23.5:9092 进行生产(kafka.producer.SyncProducer)
[2015-03-26 05:21:47,552] INFO 从 172.16.23.5:9092 断开连接 (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,553] 信息 [ConsumerFetcherManager-1427047649942] 为分区 …

java producer-consumer apache-kafka kafka-consumer-api apache-zookeeper

5
推荐指数
1
解决办法
1万
查看次数

将列表值按顺序传递给单值消费者的最佳方法?

我正在玩弄 Java8 的流和CompletableFutures。我预先存在的代码有一个类,它接受一个 URL 并下载它:

public class FileDownloader implements Runnable {
    private URL target;
    public FileDownloader(String target) {
        this.target = new URL(target);
    }
    public void run() { /* do it */ }
}
Run Code Online (Sandbox Code Playgroud)

现在,这个类从另一个发出的部分List<String>(单个主机上的多个目标)获取它的信息。

我已将周围的代码切换为CompletableFuture

public class Downloader {
    public static void main(String[] args) {
        List<String> hosts = fetchTargetHosts();
        for (String host : hosts) {
            HostDownloader worker = new HostDownloader(host);
            CompletableFuture<List<String>> future = 
                CompletableFuture.supplyAsync(worker);
            future.thenAcceptAsync((files) -> {
                for (String target : files) {
                    new …
Run Code Online (Sandbox Code Playgroud)

java producer-consumer java-8 completable-future

5
推荐指数
1
解决办法
1335
查看次数

在同一个互斥锁上使用两个 std::unique_lock 会导致死锁吗?

我找到了这个代码,它实现了生产者-消费者问题。我在这里发布一段代码。

在给定的代码中,让我们考虑这样一个场景:生产者通过调用生成一个值void add(int num),它获取互斥体上的锁mu,并且 buffer.size()==size_这使得生产者由于条件变量 而进入等待队列cond

与此同时,发生上下文切换,消费者调用函数int remove()来消费 value ,它尝试获取 mutex 上的锁mu,但是生产者之前已经获取了锁,因此它失败并且永远不会消费该值,从而导致僵局。

我这里哪里出错了?因为代码在我运行时似乎工作正常,所以调试它对我没有帮助。

谢谢

void add(int num) {
        while (true) {
            std::unique_lock<std::mutex> locker(mu);
            cond.wait(locker, [this](){return buffer_.size() < size_;});
            buffer_.push_back(num);
            locker.unlock();
            cond.notify_all();
            return;
        }
    }
    int remove() {
        while (true)
        {
            std::unique_lock<std::mutex> locker(mu);
            cond.wait(locker, [this](){return buffer_.size() > 0;});
            int back = buffer_.back();
            buffer_.pop_back(); 
            locker.unlock();
            cond.notify_all();
            return back;
        }
    }
Run Code Online (Sandbox Code Playgroud)

c++ mutex deadlock producer-consumer unique-lock

5
推荐指数
1
解决办法
2313
查看次数

SQL Server 序列线程安全吗?

标题太宽泛了,但我找不到更具体的,请随时更换更好的。

我有一个使用序列而不是身份的表。我有三个同时插入表的生产者应用程序,一个消费者应用程序从未处理状态的表中选择,然后处理它们,最后更新已处理的行。

消费者应用程序有一个规则,它不处理 id(标识列值)小于最后自己处理的 id 的行。

问题是,虽然我从未假设会发生,但我的消费者应用程序在运行时会落入此规则。(为其他目的制定的规则)。形象化;

问题

每个应用程序都会向数据库发送新数据,在正常情况下,每个应用程序都应该由消费者选择并处理(轮询),但是有时(在工作期间内)我的表中总是有未处理的数据。

这里我的insert sp长什么样子,是生产者常用的;

CREATE PROCEDURE spInsData
    @Data VARCHAR(MAX)
AS
BEGIN
    SET @Id = NEXT VALUE FOR dbo.sequenceId

    INSERT INTO dbo.MyTable(Id, Data, Status)
    VALUES (@Id, @Data, 0)
END
Run Code Online (Sandbox Code Playgroud)

所以我在想,生产者2和生产者3调用存储过程的时候,他们首先拿到的是序列值。然后,当涉及到插入时,不知何故,生产者 3 的插入发生得比生产者 2 快。然后消费者在较小的 id 之前处理较大的 id,因此永远不会处理 id 26。

我希望我清楚这个问题。可能是我解释的问题还是其他问题?如果是关于序列,我可以为每个消费者锁定整个过程 - 获取序列并插入吗?

sql-server sequence producer-consumer

5
推荐指数
1
解决办法
1896
查看次数

TPL Dataflow BufferBlock 线程安全吗?

我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产由一个消费者消费的输出。

为此,我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>

一个BufferBlock对象被创建。一个Consumer是听这个BufferBlock,并处理任何接收到的输入。

send data to the同时有两个“生产者BufferBlock”

简化:

BufferBlock<int> bufferBlock = new BufferBlock<int>();

async Task Consume()
{
    while(await bufferBlock.OutputAvailable())
    {
         int dataToProcess = await outputAvailable.ReceiveAsync();
         Process(dataToProcess);
    }
}

async Task Produce1()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

async Task Produce2()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for …
Run Code Online (Sandbox Code Playgroud)

c# dataflow producer-consumer task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
2198
查看次数

ActiveMQ 消费者级别超时

我试图在 Active MQ(版本 5.15.0)中创建消费者级别超时。考虑消费者选择了一条消息但无法确认,因此在这种情况下,我希望消费者超时,以便其他消费者可以选择收听经纪人的消息。

我的生产者代码在其中设置了两个消费者侦听器:

public class JmsMessageListenerAckExample {
  public static void main(String[] args) throws URISyntaxException, Exception {
    Connection connection = null;
    try {
      // Producer
      ConnectionFactory factory = createActiveMQConnectionFactory();
      connection = factory.createConnection();
      Session session = connection.createSession(false,
          Session.CLIENT_ACKNOWLEDGE);
      Queue queue = session.createQueue("customerQueue");
      String payload = "Important Task";
      Message msg = session.createTextMessage(payload);
      MessageProducer producer = session.createProducer(queue);

      System.out.println("Sending text '" + payload + "'");
      producer.send(msg);

      // Consumer
      MessageConsumer consumer1 = session.createConsumer(queue);
      consumer1.setMessageListener(
          new AckMessageListener(false, "consumer1"));
      Thread.sleep(1000);
      System.out.println("Creating new message listener to acknowledge"); …
Run Code Online (Sandbox Code Playgroud)

java activemq-classic jms producer-consumer

5
推荐指数
1
解决办法
1814
查看次数