.NET 4中是否有Threadsafe Observable集合?

Vai*_*hav 21 wpf .net-4.0 observablecollection task-parallel-library c#-4.0

平台: WPF, .NET 4.0, C# 4.0

问题:在Mainwindow.xaml中,我有一个ListBox绑定到Customer集合,该集合当前是一个ObservableCollection <Customer>.

ObservableCollection<Customer> c = new ObservableCollection<Customer>();

此集合可以通过多个源进行更新,如FileSystem,WebService等.

为了允许并行加载Customers,我创建了一个帮助类

public class CustomerManager(ref ObsevableCollection<Customer> cust)

内部为每个客户源生成一个新任务(来自并行扩展库),并将新的Customer实例添加到客户集合对象(由ref传递给它的ctor).

问题是ObservableCollection <T>(或任何集合)不能在UI线程以外的调用中使用并遇到异常:

"NotSupportedException - 这种类型的CollectionView不支持从与Dispatcher线程不同的线程更改其SourceCollection."

我试过用了

System.Collections.Concurrent.ConcurrentBag<Customer>

集合但它没有实现INotifyCollectionChanged接口.因此我的WPF UI不会自动更新.

那么,是否有一个集合类可以实现属性/集合更改通知,还允许来自其他非UI线程的调用?

通过我最初的bing /谷歌搜索,没有提供开箱即用.

编辑:我创建了自己的集合,它继承自ConcurrentBag <Customer>,并且还实现了INotifyCollectionChanged接口.但令我惊讶的是,即使在单独的任务中调用它之后,WPF UI也会挂起,直到任务完成.是不应该并行执行任务而不阻止UI线程

提前感谢您的任何建议.

Ste*_*ett 6

有两种可能的方法.第一种方法是从并发集合继承并添加INotifyCollectionChanged功能,第二种方法是从实现INotifyCollectionChanged的集合继承并添加并发支持.我认为将INotifyCollectionChanged支持添加到并发集合中要容易得多,也更安全.我的建议如下.

它看起来很长,但大多数方法只调用内部并发集合,就像调用者直接使用它一样.从集合中添加或删除的少数几个方法注入一个私有方法的调用,该方法在构造时提供的调度程序上引发通知事件,从而允许该类是线程安全的,但确保在同一个线程上引发通知所有时间.

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Security.Permissions;
using System.Windows.Threading;

namespace Collections
{
    /// <summary>
    /// Concurrent collection that emits change notifications on a dispatcher thread
    /// </summary>
    /// <typeparam name="T">The type of objects in the collection</typeparam>
    [Serializable]
    [ComVisible(false)]
    [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
    public class ObservableConcurrentBag<T> : IProducerConsumerCollection<T>,
        IEnumerable<T>, ICollection, IEnumerable
    {
        /// <summary>
        /// The dispatcher on which event notifications will be raised
        /// </summary>
        private readonly Dispatcher dispatcher;

        /// <summary>
        /// The internal concurrent bag used for the 'heavy lifting' of the collection implementation
        /// </summary>
        private readonly ConcurrentBag<T> internalBag;

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that will raise <see cref="INotifyCollectionChanged"/> events
        /// on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>();
        }

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that contains elements copied from the specified collection 
        /// that will raise <see cref="INotifyCollectionChanged"/> events on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher, IEnumerable<T> collection)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>(collection);
        }

        /// <summary>
        /// Occurs when the collection changes
        /// </summary>
        public event NotifyCollectionChangedEventHandler CollectionChanged;

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event on the <see cref="dispatcher"/>
        /// </summary>
        private void RaiseCollectionChangedEventOnDispatcher(NotifyCollectionChangedEventArgs e)
        {
            this.dispatcher.BeginInvoke(new Action<NotifyCollectionChangedEventArgs>(this.RaiseCollectionChangedEvent), e);
        }

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event
        /// </summary>
        /// <remarks>
        /// This method must only be raised on the dispatcher - use <see cref="RaiseCollectionChangedEventOnDispatcher" />
        /// to do this.
        /// </remarks>
        private void RaiseCollectionChangedEvent(NotifyCollectionChangedEventArgs e)
        {
            this.CollectionChanged(this, e);
        }

        #region Members that pass through to the internal concurrent bag but also raise change notifications

        bool IProducerConsumerCollection<T>.TryAdd(T item)
        {
            bool result = ((IProducerConsumerCollection<T>)this.internalBag).TryAdd(item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
            }
            return result;
        }

        public void Add(T item)
        {
            this.internalBag.Add(item);
            this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
        }

        public bool TryTake(out T item)
        {
            bool result = this.TryTake(out item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, item));
            }
            return result;
        }

        #endregion

        #region Members that pass through directly to the internal concurrent bag

        public int Count
        {
            get
            {
                return this.internalBag.Count;
            }
        }

        public bool IsEmpty
        {
            get
            {
                return this.internalBag.IsEmpty;
            }
        }

        bool ICollection.IsSynchronized
        {
            get
            {
                return ((ICollection)this.internalBag).IsSynchronized;
            }
        }

        object ICollection.SyncRoot
        {
            get
            {
                return ((ICollection)this.internalBag).SyncRoot;
            }
        }

        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            return ((IEnumerable<T>)this.internalBag).GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable)this.internalBag).GetEnumerator();
        }

        public T[] ToArray()
        {
            return this.internalBag.ToArray();
        }

        void IProducerConsumerCollection<T>.CopyTo(T[] array, int index)
        {
            ((IProducerConsumerCollection<T>)this.internalBag).CopyTo(array, index);
        }

        void ICollection.CopyTo(Array array, int index)
        {
            ((ICollection)this.internalBag).CopyTo(array, index);
        }

        #endregion
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 在此发现一个错误:当调用 ``Take()`` 时,它会调用自己。结果是溢出异常 (2认同)

Sev*_*ate 3

请看一下Caliburn.MicroBindableCollection<T>库:

/// <summary>
/// A base collection class that supports automatic UI thread marshalling.
/// </summary>
/// <typeparam name="T">The type of elements contained in the collection.</typeparam>
#if !SILVERLIGHT && !WinRT
[Serializable]
#endif
public class BindableCollection<T> : ObservableCollection<T>, IObservableCollection<T> {

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    public BindableCollection() {
        IsNotifying = true;
    }

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    /// <param name = "collection">The collection from which the elements are copied.</param>
    /// <exception cref = "T:System.ArgumentNullException">
    ///   The <paramref name = "collection" /> parameter cannot be null.
    /// </exception>
    public BindableCollection(IEnumerable<T> collection) : base(collection) {
        IsNotifying = true;
    }

#if !SILVERLIGHT && !WinRT
    [field: NonSerialized]
#endif
    bool isNotifying; //serializator try to serialize even autogenerated fields

    /// <summary>
    ///   Enables/Disables property change notification.
    /// </summary>
#if !WinRT
    [Browsable(false)]
#endif
    public bool IsNotifying {
        get { return isNotifying; }
        set { isNotifying = value; }
    }

    /// <summary>
    ///   Notifies subscribers of the property change.
    /// </summary>
    /// <param name = "propertyName">Name of the property.</param>
#if WinRT || NET45
    public virtual void NotifyOfPropertyChange([CallerMemberName]string propertyName = "") {
#else
    public virtual void NotifyOfPropertyChange(string propertyName) {
#endif
        if(IsNotifying)
            Execute.OnUIThread(() => OnPropertyChanged(new PropertyChangedEventArgs(propertyName)));
    }

    /// <summary>
    ///   Raises a change notification indicating that all bindings should be refreshed.
    /// </summary>
    public void Refresh() {
        Execute.OnUIThread(() => {
            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Inserts the item to the specified position.
    /// </summary>
    /// <param name = "index">The index to insert at.</param>
    /// <param name = "item">The item to be inserted.</param>
    protected override sealed void InsertItem(int index, T item) {
        Execute.OnUIThread(() => InsertItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "InsertItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void InsertItemBase(int index, T item) {
        base.InsertItem(index, item);
    }

#if NET || WP8 || WinRT
/// <summary>
/// Moves the item within the collection.
/// </summary>
/// <param name="oldIndex">The old position of the item.</param>
/// <param name="newIndex">The new position of the item.</param>
    protected sealed override void MoveItem(int oldIndex, int newIndex) {
        Execute.OnUIThread(() => MoveItemBase(oldIndex, newIndex));
    }

    /// <summary>
    /// Exposes the base implementation fo the <see cref="MoveItem"/> function.
    /// </summary>
    /// <param name="oldIndex">The old index.</param>
    /// <param name="newIndex">The new index.</param>
    /// <remarks>Used to avoid compiler warning regarding unverificable code.</remarks>
    protected virtual void MoveItemBase(int oldIndex, int newIndex) {
        base.MoveItem(oldIndex, newIndex);
    }
#endif

    /// <summary>
    ///   Sets the item at the specified position.
    /// </summary>
    /// <param name = "index">The index to set the item at.</param>
    /// <param name = "item">The item to set.</param>
    protected override sealed void SetItem(int index, T item) {
        Execute.OnUIThread(() => SetItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "SetItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void SetItemBase(int index, T item) {
        base.SetItem(index, item);
    }

    /// <summary>
    ///   Removes the item at the specified position.
    /// </summary>
    /// <param name = "index">The position used to identify the item to remove.</param>
    protected override sealed void RemoveItem(int index) {
        Execute.OnUIThread(() => RemoveItemBase(index));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "RemoveItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void RemoveItemBase(int index) {
        base.RemoveItem(index);
    }

    /// <summary>
    ///   Clears the items contained by the collection.
    /// </summary>
    protected override sealed void ClearItems() {
        Execute.OnUIThread(ClearItemsBase);
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "ClearItems" /> function.
    /// </summary>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void ClearItemsBase() {
        base.ClearItems();
    }

    /// <summary>
    ///   Raises the <see cref = "E:System.Collections.ObjectModel.ObservableCollection`1.CollectionChanged" /> event with the provided arguments.
    /// </summary>
    /// <param name = "e">Arguments of the event being raised.</param>
    protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e) {
        if (IsNotifying) {
            base.OnCollectionChanged(e);
        }
    }

    /// <summary>
    ///   Raises the PropertyChanged event with the provided arguments.
    /// </summary>
    /// <param name = "e">The event data to report in the event.</param>
    protected override void OnPropertyChanged(PropertyChangedEventArgs e) {
        if (IsNotifying) {
            base.OnPropertyChanged(e);
        }
    }

    /// <summary>
    ///   Adds the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void AddRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            var index = Count;
            foreach(var item in items) {
                InsertItemBase(index, item);
                index++;
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Removes the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void RemoveRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            foreach(var item in items) {
                var index = IndexOf(item);
                if (index >= 0) {
                    RemoveItemBase(index);
                }
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    /// Called when the object is deserialized.
    /// </summary>
    /// <param name="c">The streaming context.</param>
    [OnDeserialized]
    public void OnDeserialized(StreamingContext c) {
        IsNotifying = true;
    }

    /// <summary>
    /// Used to indicate whether or not the IsNotifying property is serialized to Xml.
    /// </summary>
    /// <returns>Whether or not to serialize the IsNotifying property. The default is false.</returns>
    public virtual bool ShouldSerializeIsNotifying() {
        return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

来源

附言。请记住,此类使用 Caliburn.Micro 中的一些其他类,以便您可以自行复制/粘贴所有依赖项 - 或者 - 如果您不使用任何其他应用程序框架 - 只需引用库二进制文件并提供它一个机会。

  • `BindableCollection` 不是线程安全的。它会重写“InsertItem”和“RemoveItem”方法,但公共方法在调用这些虚拟方法之前在调用线程上使用其他方法,例如“IndexOf”或“Count”。 (2认同)