我正在尝试用C#实现一个消费者.有许多发布者可以同时执行.我创建了三个示例,一个使用Rx和subject,一个使用BlockingCollection,第三个使用BlockingCollection中的ToObservable.在这个简单的例子中,它们都做同样的事情,我希望它们与多个生产者一起工作.
每种方法有哪些不同的特质?
我已经在使用Rx,所以我更喜欢这种方法.但我担心OnNext没有线程安全保证,我不知道排队语义是什么主题和默认调度程序.
有线程安全主题吗?
是否要处理所有消息?
当这不起作用时还有其他任何情况吗?是同时处理?
void SubjectOnDefaultScheduler()
{
var observable = new Subject<long>();
observable.
ObserveOn(Scheduler.Default).
Subscribe(i => { DoWork(i); });
observable.OnNext(1);
observable.OnNext(2);
observable.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)
不是Rx,但很容易适应使用/订阅它.它需要一个项目然后处理它.这应该是连续发生的.
void BlockingCollectionAndConsumingTask()
{
var blockingCollection = new BlockingCollection<long>();
var taskFactory = new TaskFactory();
taskFactory.StartNew(() =>
{
foreach (var i in blockingCollection.GetConsumingEnumerable())
{
DoWork(i);
}
});
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud)
使用阻塞集合有点像主题似乎是一个很好的妥协.我猜是隐式地会安排到任务,所以我可以使用async/await,这是正确的吗?
void BlockingCollectionToObservable()
{
var blockingCollection = new BlockingCollection<long>();
blockingCollection.
GetConsumingEnumerable().
ToObservable(Scheduler.Default).
Subscribe(i => { DoWork(i); });
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud) c# subject task-parallel-library system.reactive blockingcollection
我有一种情况,我需要有大量(数百)队列,其中的项目应按顺序处理(需要单线程消费者).我的第一个实现,基于示例,我使用单个长时间运行的Task per BlockingCollection来使用队列项.但是,我最终得到了一个拥有数百个线程的应用程序,这些线程大部分都处于空闲状态,除了占用内存之外什么都不做
我认为只有在队列中有东西需要处理才能运行消费者任务会更好,但是,我无法找到能够提供最佳实践的样本.
我提出了类似于下面的解决方案.但问题是,每个项目都会产生一个新任务(这可能是效率低下的?浪费资源?).但是如果我没有为每个项目创建一个新任务,我不能保证一个项目不会在未处理的队列中.
private object _processSyncObj = new object();
private volatile bool _isProcessing;
private BlockingCollection<string> _queue = new BlockingCollection<string>();
private void EnqueueItem(string item)
{
_queue.Add(item);
Task.Factory.StartNew(ProcessQueue);
}
private void ProcessQueue()
{
if (_isProcessing)
return;
lock (_processSyncObj)
{
string item;
while (_isProcessing = _queue.TryTake(out item))
{
// process item
}
}
}
Run Code Online (Sandbox Code Playgroud)
针对这种情况的最佳实践/最佳解决方案是什么,并保证项目在队列中没有任何情况,但没有消费者在运行?
我有System.Collections.Generic.List _myList
很多线程可以从中读取或同时添加项目.根据我的阅读,我应该使用'BlockingCollection',这样就行了.我也读到ReaderWriterLockSlim
和lock
,但我不知道如何使用它们替代的BlockingCollection
,所以我的问题是,我可以做同样的:
而不是使用'BlockingCollection'.如果是,可以请你提供简单的例子,用的什么利弊BlockingCollection
,ReaderWriterLockSlim
,lock
?
更新的 读者将远远超过作家!
我在C#应用程序中使用两个线程来访问相同的BlockingCollection.这工作正常,但我想要检索第一个值两次,以便两个线程检索相同的值*.
几秒钟后,我想轮询两个线程的currentIndex并删除每个值<index.因此,例如,线程的最低currentIndex是5,应用程序删除队列中索引0 -5处的项目.另一种解决方案是,如果所有线程都处理了该值,则删除队列中的值.
我怎么能做到这一点?我想我需要另一种类型的缓冲区..?
先感谢您!
*如果.Take()由thread1调用,则该项目将在集合中删除,而thread2无法再次获取相同的项目.
更新:
我想将数据存储在缓冲区中,因此例如thread1将数据保存到HDD,而thread2分析(相同)数据(并发).
try
{
ParallelOptions Options = new ParallelOptions();
Options.CancellationToken = base.DownloadCancellation.Token;
Parallel.ForEach(base.BlockingCollection1, Options, ActiveSeeder =>
{
//...
});
}
catch
{
if (base.DownloadCancellation.IsCancellationRequested)
return false;
}
Run Code Online (Sandbox Code Playgroud)
Parallel.Foreach/For 是否使用或不使用我放入 ParallelOptions 的 CancellationToken 调用 BlockingCollection1.Take 函数?
有机会知道吗?
c# multithreading cancellationtokensource parallel.foreach blockingcollection
问题:有多个线程访问资源.我需要将它们的数量限制为常数MaxThreads
.无法进入线程池的线程应该收到错误消息.
解决方案:我开始BlockingCollection<string> pool
在下面的算法中使用a ,但是我看到BlockingCollection
需要调用CompleteAdding
,我不能这样做,因为我总是得到传入的线程(我在下面的示例中硬编码为10用于调试目的),想想Web请求.
public class MyTest {
private const int MaxThreads = 3;
private BlockingCollection<string> pool;
public MyTest() {
pool = new BlockingCollection<string>(MaxThreads);
}
public void Go() {
var addSuccess = this.pool.TryAdd(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
if (!addSuccess) Console.WriteLine(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
Console.WriteLine(string.Format("Adding thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
Console.WriteLine(string.Format("Pool size: {0}", pool.Count));
// simulate work
Thread.Sleep(1000);
Console.WriteLine("Thread ID#{0} " + Thread.CurrentThread.ManagedThreadId + " is done doing work.");
string val;
var takeSuccess = this.pool.TryTake(out val);
if (!takeSuccess) …
Run Code Online (Sandbox Code Playgroud) 运行此程序将在四核系统中咀嚼25%的CPU功率.所以基本上一些东西正在全力以赴.我把它缩小到了消费者,然而按下"x"时负载不会停止,这应该终止我的消费者.
我的代码
internal class TestBlockingCollectionConsumerProducer2
{
private int _itemCount;
internal void Run()
{
BlockingCollection<string> blockingCollection = new BlockingCollection<string>();
// The token source for issuing the cancelation request.
CancellationTokenSource cts = new CancellationTokenSource();
// Simple thread waiting for a Console 'x'
Task.Factory.StartNew(() =>
{
if (Console.ReadKey().KeyChar == 'x')
{
cts.Cancel();
}
});
// start producer
Task.Factory.StartNew(() => Produce(blockingCollection, cts.Token));
// start multiple consumers
const int THREAD_COUNT = 5;
for (int i = 0; i < THREAD_COUNT; i++)
{
Task.Factory.StartNew(() => Consume(blockingCollection, …
Run Code Online (Sandbox Code Playgroud) 我认为PriorityBlockingQueue按优先顺序排序但结束不是我的预期,谁可以告诉我原因.
public class TestPriorityQueue {
static Random r=new Random(47);
public static void main(String args[]) throws InterruptedException{
final PriorityBlockingQueue q=new PriorityBlockingQueue();
ExecutorService se=Executors.newCachedThreadPool();
//execute producer
se.execute(new Runnable(){
public void run() {
int i=0;
while(true){
q.put(new PriorityEntity(r.nextInt(10),i++));
try {
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//execute consumer
se.execute(new Runnable(){
public void run() {
while(true){
try {
System.out.println("take== "+q.take()+" left:== ["+q.toString()+"]");
try {
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用生产者/消费者模式来实现,BlockingCollection<T>
所以我编写了一个简单的控制台应用程序来测试它。
public class Program
{
public static void Main(string[] args)
{
var workQueue = new WorkQueue();
workQueue.StartProducingItems();
workQueue.StartProcessingItems();
while (true)
{
}
}
}
public class WorkQueue
{
private BlockingCollection<int> _queue;
private static Random _random = new Random();
public WorkQueue()
{
_queue = new BlockingCollection<int>();
// Prefill some items.
for (int i = 0; i < 100; i++)
{
//_queue.Add(_random.Next());
}
}
public void StartProducingItems()
{
Task.Run(() =>
{
_queue.Add(_random.Next()); // Should be adding items to the queue …
Run Code Online (Sandbox Code Playgroud) c# producer-consumer task-parallel-library blockingcollection