标签: blockingcollection

BlockingCollection vs Subject用作消费者

我正在尝试用C#实现一个消费者.有许多发布者可以同时执行.我创建了三个示例,一个使用Rx和subject,一个使用BlockingCollection,第三个使用BlockingCollection中的ToObservable.在这个简单的例子中,它们都做同样的事情,我希望它们与多个生产者一起工作.

每种方法有哪些不同的特质?

我已经在使用Rx,所以我更喜欢这种方法.但我担心OnNext没有线程安全保证,我不知道排队语义是什么主题和默认调度程序.

有线程安全主题吗?

是否要处理所有消息?

当这不起作用时还有其他任何情况吗?是同时处理?

void SubjectOnDefaultScheduler()
{
    var observable = new Subject<long>();
    observable.
        ObserveOn(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    observable.OnNext(1);
    observable.OnNext(2);
    observable.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)

不是Rx,但很容易适应使用/订阅它.它需要一个项目然后处理它.这应该是连续发生的.

void BlockingCollectionAndConsumingTask()
{
    var blockingCollection = new BlockingCollection<long>();
    var taskFactory = new TaskFactory();
    taskFactory.StartNew(() =>
    {
        foreach (var i in blockingCollection.GetConsumingEnumerable())
        {
            DoWork(i);
        }
    });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud)

使用阻塞集合有点像主题似乎是一个很好的妥协.我猜是隐式地会安排到任务,所以我可以使用async/await,这是正确的吗?

void BlockingCollectionToObservable()
{
    var blockingCollection = new BlockingCollection<long>();
    blockingCollection.
        GetConsumingEnumerable().
        ToObservable(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud)

c# subject task-parallel-library system.reactive blockingcollection

3
推荐指数
1
解决办法
1639
查看次数

C#BlockingCollection生成者使用者而不阻塞使用者线程

我有一种情况,我需要有大量(数百)队列,其中的项目应按顺序处理(需要单线程消费者).我的第一个实现,基于示例,我使用单个长时间运行的Task per BlockingCollection来使用队列项.但是,我最终得到了一个拥有数百个线程的应用程序,这些线程大部分都处于空闲状态,除了占用内存之外什么都不做

我认为只有在队列中有东西需要处理才能运行消费者任务会更好,但是,我无法找到能够提供最佳实践的样本.

我提出了类似于下面的解决方案.但问题是,每个项目都会产生一个新任务(这可能是效率低下的?浪费资源?).但是如果我没有为每个项目创建一个新任务,我不能保证一个项目不会在未处理的队列中.

    private object _processSyncObj = new object();
    private volatile bool _isProcessing;
    private BlockingCollection<string> _queue = new BlockingCollection<string>();

    private void EnqueueItem(string item)
    {
        _queue.Add(item);
        Task.Factory.StartNew(ProcessQueue);
    }

    private void ProcessQueue()
    {
        if (_isProcessing)
            return;

        lock (_processSyncObj)
        {
             string item;
             while (_isProcessing = _queue.TryTake(out item))
             {
                 // process item
             }
        }
    }
Run Code Online (Sandbox Code Playgroud)

针对这种情况的最佳实践/最佳解决方案是什么,并保证项目在队列中没有任何情况,但没有消费者在运行?

.net c# blockingcollection

3
推荐指数
1
解决办法
2545
查看次数

什么更好以及为什么使用List作为线程安全:BlockingCollection或ReaderWriterLockSlim或锁定?

我有System.Collections.Generic.List _myList很多线程可以从中读取或同时添加项目.根据我的阅读,我应该使用'BlockingCollection',这样就行了.我也读到ReaderWriterLockSlimlock,但我不知道如何使用它们替代的BlockingCollection,所以我的问题是,我可以做同样的:

  1. ReaderWriterLockSlim

而不是使用'BlockingCollection'.如果是,可以请你提供简单的例子,用的什么利弊BlockingCollection,ReaderWriterLockSlim,lock

更新的 读者将远远超过作家!

.net c# multithreading locking blockingcollection

2
推荐指数
1
解决办法
2981
查看次数

多线程BlockingCollection的值相同

我在C#应用程序中使用两个线程来访问相同的BlockingCollection.这工作正常,但我想要检索第一个值两次,以便两个线程检索相同的值*.

几秒钟后,我想轮询两个线程的currentIndex并删除每个值<index.因此,例如,线程的最低currentIndex是5,应用程序删除队列中索引0 -5处的项目.另一种解决方案是,如果所有线程都处理了该值,则删除队列中的值.

我怎么能做到这一点?我想我需要另一种类型的缓冲区..?

先感谢您!

*如果.Take()由thread1调用,则该项目将在集合中删除,而thread2无法再次获取相同的项目.


更新:

我想将数据存储在缓冲区中,因此例如thread1将数据保存到HDD,而thread2分析(相同)数据(并发).

c# multithreading blockingcollection

2
推荐指数
1
解决办法
692
查看次数

Parallel.Foreach/For 如何调用BlockingCollection.Take?有或没有 CancellationToken

         try
         {
             ParallelOptions Options = new ParallelOptions();
             Options.CancellationToken = base.DownloadCancellation.Token;
             Parallel.ForEach(base.BlockingCollection1, Options, ActiveSeeder =>
                 {
                     //...
                 });
         }
         catch
         {
             if (base.DownloadCancellation.IsCancellationRequested)
                 return false;
         }
Run Code Online (Sandbox Code Playgroud)

Parallel.Foreach/For 是否使用或不使用我放入 ParallelOptions 的 CancellationToken 调用 BlockingCollection1.Take 函数?

有机会知道吗?

c# multithreading cancellationtokensource parallel.foreach blockingcollection

1
推荐指数
1
解决办法
606
查看次数

具有BlockingCollection的线程池

问题:有多个线程访问资源.我需要将它们的数量限制为常数MaxThreads.无法进入线程池的线程应该收到错误消息.

解决方案:我开始BlockingCollection<string> pool在下面的算法中使用a ,但是我看到BlockingCollection需要调用CompleteAdding,我不能这样做,因为我总是得到传入的线程(我在下面的示例中硬编码为10用于调试目的),想想Web请求.

public class MyTest {

    private const int MaxThreads = 3;

    private BlockingCollection<string> pool;

    public MyTest() { 
        pool = new BlockingCollection<string>(MaxThreads);
    }

    public void Go() {
        var addSuccess = this.pool.TryAdd(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        if (!addSuccess) Console.WriteLine(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine(string.Format("Adding thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine(string.Format("Pool size: {0}", pool.Count));

        // simulate work
        Thread.Sleep(1000);

        Console.WriteLine("Thread ID#{0} " + Thread.CurrentThread.ManagedThreadId + " is done doing work.");
        string val;
        var takeSuccess = this.pool.TryTake(out val);
        if (!takeSuccess) …
Run Code Online (Sandbox Code Playgroud)

c# multithreading blockingqueue blockingcollection

1
推荐指数
1
解决办法
987
查看次数

.NET BlockingCollection <T> CPU使用率

运行此程序将在四核系统中咀嚼25%的CPU功率.所以基本上一些东西正在全力以赴.我把它缩小到了消费者,然而按下"x"时负载不会停止,这应该终止我的消费者.

我的代码

internal class TestBlockingCollectionConsumerProducer2
{
    private int _itemCount;

    internal void Run()
    {
        BlockingCollection<string> blockingCollection = new BlockingCollection<string>();

        // The token source for issuing the cancelation request.
        CancellationTokenSource cts = new CancellationTokenSource();

        // Simple thread waiting for a Console 'x'
        Task.Factory.StartNew(() =>
        {
            if (Console.ReadKey().KeyChar == 'x')
            {
                cts.Cancel();
            }
        });

        // start producer
        Task.Factory.StartNew(() => Produce(blockingCollection, cts.Token));

        // start multiple consumers
        const int THREAD_COUNT = 5;
        for (int i = 0; i < THREAD_COUNT; i++)
        {
            Task.Factory.StartNew(() => Consume(blockingCollection, …
Run Code Online (Sandbox Code Playgroud)

c# multithreading asynchronous blockingcollection

0
推荐指数
1
解决办法
986
查看次数

为什么没有PriorityBlockingQueue队列根据优先级对元素进行排序

这是我的代码,代码运行结束不是我的例外.

我认为PriorityBlockingQueue按优先顺序排序但结束不是我的预期,谁可以告诉我原因.

 public class TestPriorityQueue {  

    static Random r=new Random(47);  

    public static void main(String args[]) throws InterruptedException{  
        final PriorityBlockingQueue q=new PriorityBlockingQueue();  
        ExecutorService se=Executors.newCachedThreadPool();  
        //execute producer  
        se.execute(new Runnable(){  
            public void run() {  
                int i=0;  
                while(true){  
                    q.put(new PriorityEntity(r.nextInt(10),i++));  
                    try {  
                        TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));  
                    } catch (InterruptedException e) {  
                        // TODO Auto-generated catch block  
                        e.printStackTrace();  
                    }  
                }  
            }  
        }); 

        //execute consumer  
        se.execute(new Runnable(){  
            public void run() {  
                while(true){  
                    try {  
                        System.out.println("take== "+q.take()+" left:== ["+q.toString()+"]");  
                        try {  
                            TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));  
                        } catch (InterruptedException e) {  
                            // TODO Auto-generated …
Run Code Online (Sandbox Code Playgroud)

java concurrency blockingqueue blockingcollection

0
推荐指数
1
解决办法
206
查看次数

如何正确使用 BlockingCollection.GetConsumingEnumerable?

我正在尝试使用生产者/消费者模式来实现,BlockingCollection<T>所以我编写了一个简单的控制台应用程序来测试它。

public class Program
{
    public static void Main(string[] args)
    {
        var workQueue = new WorkQueue();
        workQueue.StartProducingItems();
        workQueue.StartProcessingItems();

        while (true)
        {

        }
    }
}

public class WorkQueue
{
    private BlockingCollection<int> _queue;
    private static Random _random = new Random();

    public WorkQueue()
    {
        _queue = new BlockingCollection<int>();

        // Prefill some items.
        for (int i = 0; i < 100; i++)
        {
            //_queue.Add(_random.Next());
        }
    }

    public void StartProducingItems()
    {
        Task.Run(() =>
        {
            _queue.Add(_random.Next()); // Should be adding items to the queue …
Run Code Online (Sandbox Code Playgroud)

c# producer-consumer task-parallel-library blockingcollection

0
推荐指数
1
解决办法
2376
查看次数