这个问题是在运行使用消费者/生产者设计创建的套接字服务器时出现的,程序cpu time limit exceeded因日志错误而崩溃。我还发现cpu使用量比当时更多90%。这是服务器的代码,它可能出了什么问题,我该如何优化它?
我使用这种queue方法来避免threads为每个请求创建如此多的请求。
在主方法中(主线程)
//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();
//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));
producer.start();
consumer.start();
Run Code Online (Sandbox Code Playgroud)
请求生产者线程
//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029 …Run Code Online (Sandbox Code Playgroud) Disruptor github地址为:https://github.com/LMAX-Exchange/disruptor
我对其进行了一个简单的测试,如下所示:
public class DisruptorMain {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
class Element {
private int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
EventHandler<Element> handler = …Run Code Online (Sandbox Code Playgroud) 上下文:我正在尝试通过日志接收器将 GCP 审核日志存储在 Pub/Sub 主题中,以便我可以在订阅中提取这些消息(并让第三方订阅该订阅)。
尝试的解决方案:
带有包含过滤器的日志接收器:logName:"cloudaudit.googleapis.com"将日志路由到-> Pub/Sub 主题以存储这些消息->订阅(基于拉取)以查看这些消息。
问题:来自接收器的日志似乎并未出现在主题中。当我尝试查看主题中的消息(通过单击 GCP GUI 中的“pull”)时,我没有收到任何消息,即使我确信审核日志存在(我针对上面的包含过滤器运行了查询)并继续非常频繁地生成。
问题:为什么日志没有路由到主题,或者为什么当我从主题拉取时主题没有显示日志?
logging producer-consumer audit-logging google-cloud-platform google-cloud-pubsub
我有以下代码:
var channel = Channel.CreateUnbounded<string>();
var consumers = Enumerable
.Range(1, 5)
.Select(consumerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}
}));
var producers = Enumerable
.Range(1, 5)
.Select(producerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3))); …Run Code Online (Sandbox Code Playgroud) 我最近对我的框架进行了基准测试,发现它分配了大量垃圾。
我正在使用 andChannel<T>或TryRead操作ReadAsync在每次调用时分配内存。所以我用 a 交换了它,BlockingCollection<T>它也在 期间分配内存TryTake。
我使用了一个带有单个写入器/读取器的无界通道。还有一个正常的BlockingCollection<T>。
// Each thread runs this, jobmeta is justa struct
while (!token.IsCancellationRequested)
{
var jobMeta = await Reader.ReadAsync(token); // <- allocs here
jobMeta.Job.Execute();
jobMeta.JobHandle.Notify();
}
Run Code Online (Sandbox Code Playgroud)
探查器告诉我,所有分配都是由该ChannelReader.ReadAsync方法引起的。不幸的是,我无法显示完整的代码,但是由于我在热路径中使用它们,所以我需要不惜一切代价避免分配。
是否有任何替代方案在读/写/获取期间不分配内存并且行为相同(生产者/消费者多线程的并发类)?我怎样才能自己实现一个?
c# multithreading asynchronous producer-consumer system.threading.channels
我开始用C#代码示例在这里.我试图调整它有几个原因:1)在我的场景中,所有任务将在消费者开始之前预先放入队列中; 2)我想将工作者抽象为一个单独的类而不是班上的原始Thread成员WorkerQueue.
我的队列似乎并没有自行处理,它只是挂起,当我在Visual Studio中打破时,它就停留在#1 _th.Join()线上WorkerThread.另外,有更好的方法来组织这个吗?暴露WaitOne()和Join()方法的东西似乎是错误的,但我想不出一个让WorkerThread队列进行交互的合适方法.
另外,如果我q.Start(#)在using块的顶部调用,则每次启动时只有一些线程(例如,线程1,2和8处理每个任务).为什么是这样?这是某种竞争条件,还是我做错了什么?
using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;
namespace QueueTest
{
class Program
{
static void Main(string[] args)
{
using (WorkQueue q = new WorkQueue())
{
q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });
Random r = new Random();
foreach (int i in Enumerable.Range(1, 10))
q.Enqueue(r.Next(100, 500));
Console.WriteLine("All jobs queued"); …Run Code Online (Sandbox Code Playgroud) 我有以下代码片段的问题.它旨在处理添加到事件队列(ConcurrentLinkedQueue)的事件(通过对processEvent方法的调用提供).事件被添加到事件队列中并在run方法中定期处理.
一切都很好.但有时在调用processEvent方法之后,当一个事件被添加到队列时,运行部件无法看到有新事件.
什么是错的?除了使用String常量作为锁定的明显错误之外?
import java.util.concurrent.ConcurrentLinkedQueue;
public class MyCommunicator implements Runnable {
private ConcurrentLinkedQueue<MyEvent> eventQueue = null;
private boolean stopped = false;
private String lock = "";
private Thread thread = null;
public MyCommunicator() {
eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}
public void start() {
thread = new Thread(this, "MyCommunicatorThread");
thread.start();
}
public void stop() {
stopped = true;
synchronized (lock) {
lock.notifyAll();
}
eventQueue.clear();
}
public void run() {
while (!stopped) {
try {
MyEvent event = null;
while (!stopped && …Run Code Online (Sandbox Code Playgroud) 我无法调试以下程序,因为我家里没有linux.我无法执行该程序,但在实验室会话中编译程序时,我遇到了两个错误.谁能帮我吗?如果有人能够在调试后执行该程序,请发布输出.
代码如下:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#define BUFFER_SIZE 10
sem_t empty;
sem_t full;
pthread_mutex_t mutex;
int buffer[BUFFER_SIZE];
pthread_t ptid,ctid;
pthread_attr_t attr;
void *producer(void *param);
void *consumer(void *param);
int counter;
int main()
{
init();
pthread_create(&ptid, &attr, producer, NULL);
pthread_create(&ctid, &attr, consumer, NULL);
pthread_join(ptid,NULL);
pthread_join(ctid,NULL);
return 0;
}
void init()
{
pthread_mutex_init(&mutex, NULL);
pthread_attr_init(&attr);
sem_init(&full, 0, 0);
sem_init(&empty, 0, BUFFER_SIZE);
counter = 0;
for(int j=0;j<BUFFER_SIZE;j++)
{ buffer[j] = 0;}
}
void *producer(void *param)
{
int item;
while(1)
{ …Run Code Online (Sandbox Code Playgroud) 我在使用bootstrap-server(即Kafka服务器)消费消息时遇到问题.知道为什么没有zookeeper就无法使用消息?
制作一些消息:
[kfk03.mp.com ~]$ /bnsf/kafka/bin/kafka-console-producer.sh --broker-list kfk03.mp.com:9092 --topic test
>hi
>hi
Run Code Online (Sandbox Code Playgroud)
如果我给出消费者,消费者无法使用消息–-bootstrap-server:
[kfk03.mp.com ~]$
/bnsf/kafka/bin/kafka-console-consumer.sh --bootstrap-server kfk03.mp.com:9092 --topic test --from-beginning
Run Code Online (Sandbox Code Playgroud)
消费者能够在--zookeeper给出服务器而不是--bootstrap-server- 时使用消息:
[kfk03.mp.com ~]$ /bnsf/kafka/bin/kafka-console-consumer.sh --zookeeper zkp02.mp.com:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"properties": {"messageType": "test", "sentDateTime": "2018-02-25T21:46:00.000+0000"}, "name": "Uttam Anand", "age": 29}
{"properties": {"messageType": …Run Code Online (Sandbox Code Playgroud) 在下面的代码中,我正在创建一个producer thread并n consumer threads从每个专用queue和打印到的读取stdout.这段代码有时会在声明中崩溃consumerQueues[id]->empty().通过调试去我看到consumerQueues[id]是0x0当它崩溃.现在在init()函数中,我在创建worker 之前创建了ith使用者.我不确定为什么会留下来.请帮我弄清楚发生了什么.queueiththreadconsumerQueues[id]0x0
#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>
class Test
{
private:
void producer()
{
while(true)
{
std::string s = "abc";
for(const auto& q : consumerQueues)
{
std::unique_lock<std::mutex> lock(mutex);
q->push(s);
condition_variable.notify_all();
}
}
}
void consumer(int id)
{
while (true)
{
std::string job;
{
std::unique_lock<std::mutex> lock(mutex);
while(consumerQueues[id]->empty())
{ …Run Code Online (Sandbox Code Playgroud) c# ×3
java ×3
concurrency ×2
apache-kafka ×1
asynchronous ×1
c ×1
c++ ×1
channel ×1
logging ×1
pthreads ×1
queue ×1
semaphore ×1
sockets ×1