标签: producer-consumer

具有消费者-生产者设计“超出 CPU 时间限制”的 TCP 套接字服务器

这个问题是在运行使用消费者/生产者设计创建的套接字服务器时出现的,程序cpu time limit exceeded因日志错误而崩溃。我还发现cpu使用量比当时更多90%。这是服务器的代码,它可能出了什么问题,我该如何优化它?

我使用这种queue方法来避免threads为每个请求创建如此多的请求。

在主方法中(主线程)

//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();

//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));

producer.start();
consumer.start();
Run Code Online (Sandbox Code Playgroud)

请求生产者线程

//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue

//constructor, initiate queue
public RequestProducer(
    ConcurrentLinkedQueue<Socket> queue
) {
    this.queue = queue;
}

public void run() {
    try {
        //create serversocket instance on port 19029 …
Run Code Online (Sandbox Code Playgroud)

java sockets concurrency multithreading producer-consumer

2
推荐指数
1
解决办法
969
查看次数

为什么我的 Disruptor 程序没有充分利用环形缓冲区

Disruptor github地址为:https://github.com/LMAX-Exchange/disruptor

我对其进行了一个简单的测试,如下所示:

public class DisruptorMain {
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void main(String[] args) throws Exception {
        class Element {

            private int value;

            public int get() {
                return value;
            }

            public void set(int value) {
                this.value = value;
            }

        }

        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        EventHandler<Element> handler = …
Run Code Online (Sandbox Code Playgroud)

java producer-consumer disruptor-pattern

2
推荐指数
1
解决办法
1101
查看次数

GCP 日志路由器接收器未将日志路由到主题?

上下文:我正在尝试通过日志接收器将 GCP 审核日志存储在 Pub/Sub 主题中,以便我可以在订阅中提取这些消息(并让第三方订阅该订阅)。

尝试的解决方案:
带有包含过滤器的日志接收器:logName:"cloudaudit.googleapis.com"将日志路由到-> Pub/Sub 主题以存储这些消息->订阅(基于拉取)以查看这些消息。

问题:来自接收器的日志似乎并未出现在主题中。当我尝试查看主题中的消息(通过单击 GCP GUI 中的“pull”)时,我没有收到任何消息,即使我确信审核日志存在(我针对上面的包含过滤器运行了查询)并继续非常频繁地生成。

问题:为什么日志没有路由到主题,或者为什么当我从主题拉取时主题没有显示日志?

logging producer-consumer audit-logging google-cloud-platform google-cloud-pubsub

2
推荐指数
1
解决办法
1407
查看次数

引导多个生产者和消费者

我有以下代码:

var channel = Channel.CreateUnbounded<string>();

var consumers = Enumerable
    .Range(1, 5)   
    .Select(consumerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var item))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }
        }));

var producers = Enumerable
    .Range(1, 5)    
    .Select(producerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            for (var i = 0; i < 10; i++)
            {
                var t = $"Message {i}";
                Console.WriteLine($"Producing {t} on producer {producerNumber}");

                await channel.Writer.WriteAsync(t);
                await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3))); …
Run Code Online (Sandbox Code Playgroud)

c# channel producer-consumer system.threading.channels

2
推荐指数
1
解决办法
2566
查看次数

Channel/BlockingCollection 分配免费替代方案?

我最近对我的框架进行了基准测试,发现它分配了大量垃圾。

我正在使用 andChannel<T>TryRead操作ReadAsync在每次调用时分配内存。所以我用 a 交换了它,BlockingCollection<T>它也在 期间分配内存TryTake

我使用了一个带有单个写入器/读取器的无界通道。还有一个正常的BlockingCollection<T>

// Each thread runs this, jobmeta is justa struct 
while (!token.IsCancellationRequested)
{
    var jobMeta = await Reader.ReadAsync(token);  // <- allocs here
    jobMeta.Job.Execute();
    jobMeta.JobHandle.Notify();
}
Run Code Online (Sandbox Code Playgroud)

探查器告诉我,所有分配都是由该ChannelReader.ReadAsync方法引起的。不幸的是,我无法显示完整的代码,但是由于我在热路径中使用它们,所以我需要不惜一切代价避免分配。

是否有任何替代方案在读/写/获取期间不分配内存并且行为相同(生产者/消费者多线程的并发类)?我怎样才能自己实现一个?

c# multithreading asynchronous producer-consumer system.threading.channels

2
推荐指数
1
解决办法
1387
查看次数

我的生产者 - 消费者队列设计有什么问题?

我开始用C#代码示例在这里.我试图调整它有几个原因:1)在我的场景中,所有任务将在消费者开始之前预先放入队列中; 2)我想将工作者抽象为一个单独的类而不是班上的原始Thread成员WorkerQueue.

我的队列似乎并没有自行处理,它只是挂起,当我在Visual Studio中打破时,它就停留在#1 _th.Join()线上WorkerThread.另外,有更好的方法来组织这个吗?暴露WaitOne()Join()方法的东西似乎是错误的,但我想不出一个让WorkerThread队列进行交互的合适方法.

另外,如果我q.Start(#)using块的顶部调用,则每次启动时只有一些线程(例如,线程1,2和8处理每个任务).为什么是这样?这是某种竞争条件,还是我做错了什么?

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;

namespace QueueTest
{
    class Program
    {
        static void Main(string[] args)
        {
            using (WorkQueue q = new WorkQueue())
            {
                q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });

                Random r = new Random();
                foreach (int i in Enumerable.Range(1, 10))
                    q.Enqueue(r.Next(100, 500));

                Console.WriteLine("All jobs queued"); …
Run Code Online (Sandbox Code Playgroud)

c# queue multithreading producer-consumer

1
推荐指数
1
解决办法
990
查看次数

使用ConcurrentLinkedQueue的Java线程问题

我有以下代码片段的问题.它旨在处理添加到事件队列(ConcurrentLinkedQueue)的事件(通过对processEvent方法的调用提供).事件被添加到事件队列中并在run方法中定期处理.

一切都很好.但有时在调用processEvent方法之后,当一个事件被添加到队列时,运行部件无法看到有新事件.

什么是错的?除了使用String常量作为锁定的明显错误之外?

import java.util.concurrent.ConcurrentLinkedQueue;

public class MyCommunicator implements Runnable {

private ConcurrentLinkedQueue<MyEvent> eventQueue = null;

private boolean stopped = false;

private String lock = "";
private Thread thread = null;

public MyCommunicator() {

    eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}

public void start() {
    thread = new Thread(this, "MyCommunicatorThread");
    thread.start();
}

public void stop() {
    stopped = true;
    synchronized (lock) {
        lock.notifyAll();
    }
    eventQueue.clear();
}

public void run() {
    while (!stopped) {
        try {

            MyEvent event = null;
            while (!stopped && …
Run Code Online (Sandbox Code Playgroud)

java concurrency multithreading producer-consumer

1
推荐指数
2
解决办法
3801
查看次数

POSIX线程和信号量

我无法调试以下程序,因为我家里没有linux.我无法执行该程序,但在实验室会话中编译程序时,我遇到了两个错误.谁能帮我吗?如果有人能够在调试后执行该程序,请发布输出.

代码如下:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#define BUFFER_SIZE 10
sem_t empty;
sem_t full;
pthread_mutex_t mutex;

int buffer[BUFFER_SIZE];

pthread_t ptid,ctid;
pthread_attr_t attr;

void *producer(void *param);
void *consumer(void *param);

int counter;
int main()
{

  init();
  pthread_create(&ptid, &attr, producer, NULL);
  pthread_create(&ctid, &attr, consumer, NULL);
  pthread_join(ptid,NULL);
  pthread_join(ctid,NULL);

  return 0;
}

void init()
{
  pthread_mutex_init(&mutex, NULL);
  pthread_attr_init(&attr);
  sem_init(&full, 0, 0);
  sem_init(&empty, 0, BUFFER_SIZE);
  counter = 0;
  for(int j=0;j<BUFFER_SIZE;j++)
  {  buffer[j] = 0;}
}

void *producer(void *param)
{
int item;
  while(1)
  { …
Run Code Online (Sandbox Code Playgroud)

c semaphore pthreads producer-consumer

1
推荐指数
1
解决办法
5300
查看次数

Kafka使用者无法使用引导服务器名称来使用消息

我在使用bootstrap-server(即Kafka服务器)消费消息时遇到问题.知道为什么没有zookeeper就无法使用消息?

  • 卡夫卡版本:kafka_2.11-1.0.0
  • Zookeeper版本:kafka_2.11-1.0.0
  • Zookeeper主机和端口:zkp02.mp.com:2181
  • Kafka主机和港口:kfk03.mp.com:9092

制作一些消息:

[kfk03.mp.com ~]$ /bnsf/kafka/bin/kafka-console-producer.sh --broker-list kfk03.mp.com:9092 --topic test
>hi
>hi
Run Code Online (Sandbox Code Playgroud)

如果我给出消费者,消费者无法使用消息–-bootstrap-server:

[kfk03.mp.com ~]$
/bnsf/kafka/bin/kafka-console-consumer.sh --bootstrap-server kfk03.mp.com:9092 --topic test --from-beginning
Run Code Online (Sandbox Code Playgroud)

消费者能够在--zookeeper给出服务器而不是--bootstrap-server- 时使用消息:

[kfk03.mp.com ~]$ /bnsf/kafka/bin/kafka-console-consumer.sh --zookeeper zkp02.mp.com:2181 --topic test --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

{"properties": {"messageType": "test", "sentDateTime": "2018-02-25T21:46:00.000+0000"}, "name": "Uttam Anand", "age": 29}
{"properties": {"messageType": …
Run Code Online (Sandbox Code Playgroud)

producer-consumer apache-kafka

1
推荐指数
1
解决办法
3555
查看次数

C++ Single Producer多个消费者程序偶尔崩溃

在下面的代码中,我正在创建一个producer threadn consumer threads从每个专用queue和打印到的读取stdout.这段代码有时会在声明中崩溃consumerQueues[id]->empty().通过调试去我看到consumerQueues[id]0x0当它崩溃.现在在init()函数中,我在创建worker 之前创建了ith使用者.我不确定为什么会留下来.请帮我弄清楚发生了什么.queueiththreadconsumerQueues[id]0x0

#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>

class Test
{
private:
    void producer()
    {
        while(true)
        {
            std::string s = "abc";
            for(const auto& q : consumerQueues)
            {
                std::unique_lock<std::mutex> lock(mutex);
                q->push(s);
                condition_variable.notify_all();
            }
        }
    }

    void consumer(int id)
    {
        while (true)
        {
            std::string job;
            {
                std::unique_lock<std::mutex> lock(mutex);
                while(consumerQueues[id]->empty())
                { …
Run Code Online (Sandbox Code Playgroud)

c++ multithreading producer-consumer race-condition

1
推荐指数
1
解决办法
433
查看次数