这是一个经典的 c/p 问题,其中一些线程生成数据而其他线程读取数据。生产者和消费者都共享一个常量大小的缓冲区。如果缓冲区为空,则消费者必须等待,如果缓冲区已满,则生产者必须等待。我正在使用信号量来跟踪已满或空的队列。生产者将减少空闲位置信号量,增加值,并增加填充槽信号量。所以我试图实现一个程序,从生成器函数中获取一些数字,然后打印出这些数字的平均值。通过将此视为生产者-消费者问题,我试图在程序执行中节省一些时间。generateNumber 函数会导致过程中出现一些延迟,因此我想创建多个生成数字的线程,并将它们放入队列中。然后是“主线程” 正在运行的主函数必须从队列中读取并找到总和,然后求平均值。所以这是我到目前为止所拥有的:
#include <cstdio>
#include <cstdlib>
#include <time.h>
#include "Thread.h"
#include <queue>
int generateNumber() {
int delayms = rand() / (float) RAND_MAX * 400.f + 200;
int result = rand() / (float) RAND_MAX * 20;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = delayms * 1000000;
nanosleep(&ts, NULL);
return result; }
struct threadarg {
Semaphore filled(0);
Semaphore empty(n);
std::queue<int> q; };
void* threadfunc(void *arg) {
threadarg *targp = (threadarg *) arg;
threadarg &targ = *targp;
while (targ.empty.value() …
Run Code Online (Sandbox Code Playgroud) 这个问题似乎很简单,但我想发送一个事件来通知我的用户空间程序模块缓冲区已准备好被读取.
例如,我的内核模块中有一个缓冲区,其数据将由用户空间程序使用.如果消耗了所有数据,则内核模块必须在新数据到达时通知我的程序.
这是生产者/消费者的典型问题.生产者是内核模块,消费者是用户空间程序.
今天,我向我的程序(事件)发送一个信号,并使用ioctl函数访问数据缓冲区.但我不知道这种方法是否足以解决这类问题.我害怕不必要地使用netlink或内存映射来解决这个问题.
我需要实现一个可以从多个线程填充的请求队列。当此队列大于 1000 个已完成的请求时,应将此请求存储到数据库中。这是我的实现:
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest …
Run Code Online (Sandbox Code Playgroud) c# multithreading producer-consumer task-parallel-library tpl-dataflow
我想这是一种代码审查,但这是我对生产者/消费者模式的实现。我想知道的是,是否会出现ReceivingThread()
orSendingThread()
方法中的 while 循环停止执行的情况。请注意,它EnqueueSend(DataSendEnqeueInfo info)
是从多个不同线程调用的,我可能无法在这里使用任务,因为我肯定必须在单独的线程中使用命令。
private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;
private void ReceivingThread()
{
while (mIsRunning)
{
mRcWaitHandle.WaitOne();
DataRecievedEnqeueInfo item = null;
while (mReceivingThreadQueue.Count > 0)
{
lock (mReceivingQueueLock)
{
item = mReceivingThreadQueue.Dequeue();
}
ProcessReceivingItem(item);
}
mRcWaitHandle.Reset();
}
}
private void SendingThread()
{
while (mIsRunning)
{
mSeWaitHandle.WaitOne(); …
Run Code Online (Sandbox Code Playgroud) c# multithreading producer-consumer blockingcollection tpl-dataflow
我们有一个运行/生产系统,其中int32
protobuf 字段在组件之间来回通信。我们需要int64
在不关闭当前系统的情况下动态更新它。和部署也将占用大量的时间,所以一个组件可以得到更新,int64
而其他仍处于int32
。我们需要在没有任何故障的情况下进行更新。
如何才能做到这一点?
我在 MySQL 数据库中有一个非常大的表,表中有 2 亿条记录Users
。
我使用 JDBC 进行查询:
public List<Pair<Long, String>> getUsersAll() throws SQLException {
Connection cnn = null;
CallableStatement cs = null;
ResultSet rs = null;
final List<Pair<Long, String>> res = new ArrayList<>();
try {
cnn = dataSource.getConnection();
cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");
rs = cs.executeQuery();
while (rs.next()) {
res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
}
return res;
} catch (SQLException ex) {
throw ex;
} finally {
DbUtils.closeQuietly(cnn, cs, rs);
}
}
Run Code Online (Sandbox Code Playgroud)
接下来,我处理结果:
List<Pair<Long, String>> users= dao.getUsersAll(); …
Run Code Online (Sandbox Code Playgroud) 我正在研究多线程并编写了一个基本的生产者/消费者。我对下面写的生产者/消费者有两个问题。1)即使将消费者睡眠时间设置为低于生产者睡眠时间,生产者似乎仍然执行得更快。2)在消费者中,我复制了生产者完成向队列添加但队列中仍有元素的情况下的代码。对于构建代码的更好方法有什么建议吗?
#include <iostream>
#include <queue>
#include <mutex>
class App {
private:
std::queue<int> m_data;
bool m_bFinished;
std::mutex m_Mutex;
int m_ConsumerSleep;
int m_ProducerSleep;
int m_QueueSize;
public:
App(int &MaxQueue) :m_bFinished(false), m_ConsumerSleep(1), m_ProducerSleep(5), m_QueueSize(MaxQueue){}
void Producer() {
for (int i = 0; i < m_QueueSize; ++i) {
std::lock_guard<std::mutex> guard(m_Mutex);
m_data.push(i);
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
}
m_bFinished = true;
}
void Consumer() {
while (!m_bFinished) {
if (m_data.size() > 0) {
std::lock_guard<std::mutex> guard(m_Mutex);
std::cout << "Consumer Thread, …
Run Code Online (Sandbox Code Playgroud) 假设我有很多生产者,1个消费者未绑定 Channel,有一个消费者:
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
}
Run Code Online (Sandbox Code Playgroud)
问题在于该consume
函数会进行一些 IO 访问,并且可能还会进行一些网络访问,因此在消耗 1 条消息之前可能会产生更多消息。但由于IO资源不能并发访问,所以我不能有很多消费者,也不能把函数扔到consume
一个Task中然后忘记它。
该consume
功能可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有一种方法可以让消费者在尝试访问通道队列时获取通道队列中的所有消息,如下所示:
while (true) {
Message[] messages = await channel.Reader.TakeAll();
await consumeAll(messages);
}
Run Code Online (Sandbox Code Playgroud)
编辑:我能想到的 1 个选项是:
List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
Message msg;
while (channel.Reader.TryRead(out msg))
messages.Add(msg);
if (messages.Count > 0)
{
await consumeAll(messages);
messages.Clear();
}
}
Run Code Online (Sandbox Code Playgroud)
但我觉得这应该是一个更好的方法来做到这一点。
我遇到一种情况,我需要从多个 IAsyncEnumerable 源接收数据。为了提高性能,应该以并行方式执行。
我已经使用AsyncAwaitBestPractices、System.Threading.Tasks.Dataflow和System.Linq.Async nuget 包编写了这样的代码来实现此目标:
public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
this IEnumerable<IAsyncEnumerable<T>> sources,
int outputQueueCapacity = 1,
TaskScheduler scheduler = null)
{
var sourcesCount = sources.Count();
var channel = outputQueueCapacity > 0
? Channel.CreateBounded<T>(sourcesCount)
: Channel.CreateUnbounded<T>();
sources.AsyncParallelForEach(
async body =>
{
await foreach (var item in body)
{
await channel.Writer.WaitToWriteAsync();
await channel.Writer.WriteAsync(item);
}
},
maxDegreeOfParallelism: sourcesCount,
scheduler: scheduler)
.ContinueWith(_ => channel.Writer.Complete())
.SafeFireAndForget();
while (await channel.Reader.WaitToReadAsync())
yield return await channel.Reader.ReadAsync();
}
public static async Task AsyncParallelForEach<T>( …
Run Code Online (Sandbox Code Playgroud) c# producer-consumer task-parallel-library iasyncenumerable system.threading.channels
c# ×5
c ×2
concurrency ×2
java ×2
tpl-dataflow ×2
.net ×1
c++ ×1
c++11 ×1
channel ×1
jdbc ×1
linux ×1
linux-kernel ×1
mutex ×1
mysql ×1