Observable不响应阻塞集合在不同的线程上发生了变化

Amc*_*tty 1 c# multithreading system.reactive observer-pattern

我有以下代码:

class Program
{
    static void Main(string[] args)
    {
        var watcher = new SNotifier(DumpToConsole);
        watcher.StartQueue();

        Console.ReadLine();
    }

    private static void DumpToConsole(IList<Timestamped<int>> currentCol)
    {
        Console.WriteLine("buffer time elapsed, current collection contents is: {0} items.", currentCol.Count);
        Console.WriteLine("holder has: {0}", currentCol.Count);
    }
}
Run Code Online (Sandbox Code Playgroud)

SNotifier:

public class SNotifier
{
    private BlockingCollection<int> _holderQueue;
    private readonly Action<IList<Timestamped<int>>> _dumpAction;

    public SNotifier(Action<IList<Timestamped<int>>> dumpAction)
    {
        PopulateListWithStartValues();
        _dumpAction = dumpAction;
    }

    public void StartQueue()
    {
        PopulateQueueOnDiffThread();

        var observableCollection = _holderQueue.ToObservable();

        var myCollectionTimestamped = observableCollection.Timestamp();
        var bufferedTimestampedCollection = myCollectionTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3));

        using (bufferedTimestampedCollection.Subscribe(_dumpAction))
        {
            Console.WriteLine("started observing collection");
        }
    }

    private void PopulateQueueOnDiffThread()
    {
        Action addToCollectionAction = AddToCollection;
        var t = new TaskFactory();
        t.StartNew(addToCollectionAction);

    }

    private static IEnumerable<int> GetInitialElements()
    {
        var random = new Random();
        var items = new List<int>();
        for (int i = 0; i < 10; i++)
            items.Add(random.Next(1, 10));

        return items;
    }

    private void AddToCollection()
    {
        while (true)
        {
            var newElement = new Random().Next(1, 10);
            _holderQueue.Add(newElement);
            Console.WriteLine("added {0}", newElement);
            Console.WriteLine("holder has: {0}", _holderQueue.Count);
            Thread.Sleep(1000);
        }
    }

    private void PopulateListWithStartValues()
    {
        _holderQueue = new BlockingCollection<int>();
        var elements = GetInitialElements();
        foreach (var i in elements)
            _holderQueue.Add(i);
    }
}
Run Code Online (Sandbox Code Playgroud)

我需要运行DumpToConsole()方法以每3秒显示一次集合计数,而此集合的内容在另一个线程上更改.我的问题是DumpToConsole()只被调用一次.这是为什么?!我已经花了整整一天的时间.由于我已经使用我的dump方法订阅了observable,它应该"观察"集合更改并每隔3秒重新调用DumpToConsole()方法; 这就是我需要的.

想法?谢谢

(PS传递给SNotifier类的动作是我在SNotifier中删除控制台相关内容的方法,我需要更好地重构,它可以被忽略,因为它与问题本身无关)

Jam*_*rld 5

你在呼唤ToObservable()你的BlockingCollection<int>.此扩展方法只接受IEnumerable<int>集合上的接口并将其转换为IObservable<int>.这具有在订阅点获取集合的内容列表并通过Observable流转储它们的效果.

它们不会继续枚举添加的项目.

GetConsumingEnumerable()在前面使用ToObservable()会解决这个问题.

但是,需要注意,因为这也会从集合中删除项目,这可能是不可取的.

如果这是可以接受的,您可能希望在多个订阅者的情况下发布生成的observable,以避免造成严重破坏.

如果您只是添加,您可以考虑改变整个过程 - 使用Subject来支持"Add"方法并让一个订阅者填充List(或者如果您需要的话可以使用BlockingCollection)来跟踪集合,第二个订阅者可以然后报告进展情况.

另一种方法是使用ObservableCollection并订阅其事件.

在最后两个建议中,您需要使"添加"线程安全,因为它们本身既不是线程安全的Subject<T>也不ObservableCollection<T>是.

附录

布兰登评论说你正在处理订阅StartQueue让我意识到另一个问题 - StartQueue永远不会回来!这是因为在通话Subscribe上做出ToObservable()的转换IEnumerable将不会返回,直到枚举已完成-它也因此拥有了处理(因为IDisposable是的返回值Subscribe),这就是为什么我没有注意到using@Brandon指出,无论是!

有了以上两点,您需要进行以下额外更改.首先,删除using订阅周围的语句,隐式处理将取消订阅.当我们解决阻塞订阅调用时,这将导致订阅立即被取消.IDisposable如果您确实需要在某些时候明确取消订阅,则应保留句柄.

其次,在SubscribeOn(Scheduler.Default)之后立即添加呼叫ToObservable()以防止Subscribe呼叫阻塞.