在 Reactive Extensions 中按预定义的顺序对 Observable 进行排序

Wou*_*ter 5 c# sorting reactive-programming system.reactive

说我有一个类型T

class T {
    public int identifier; //Arbitrary but unique for each character (Guids in real-life)
    public char character; //In real life not a char, but I chose char here for easy demo purposes
}
Run Code Online (Sandbox Code Playgroud)

我有一个预定义的有序标识符序列:

int[] identifierSequence = new int[]{
    9, 3, 4, 4, 7
};
Run Code Online (Sandbox Code Playgroud)

我现在需要订购IObservable<T>产生以下对象序列的an :

{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}
Run Code Online (Sandbox Code Playgroud)

使生成的 IObservable 产生hello. 我不想使用 ToArray,因为我想在对象到达时立即接收它们,而不是等到所有内容都被观察到。更具体地说,我希望像这样接收它们:

 Input: e  h  l  l  o
Output:    he l  l  o
Run Code Online (Sandbox Code Playgroud)

这样做的正确反应方式是什么?我能想到的最好的办法是:

Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;

inputObserable.SelectMany(item =>
{
    buffer[item.identifier] = item;

    IEnumerable<ReportTemplate> GetReadyElements()
    {
        while (true)
        {
            int nextItemIdentifier = identifierSequence[curIndex];
            T nextItem;
            if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
            {
                buffer.Remove(nextItem.identifier);
                curIndex++;
                yield return nextItem;
            }
            else
            {
                break;
            }
        }
    }

    return GetReadyElements();
});
Run Code Online (Sandbox Code Playgroud)

编辑:

Schlomo 对我的代码提出了一些非常有效的问题,这就是为什么我将他的答案标记为正确的原因。我对他的代码进行了一些修改以使其可用:

  • 通用标识符和对象类型
  • 迭代而不是递归以防止在非常大的可观察量上发生潜在的计算溢出
  • 将匿名类型转换为真实类以提高可读性
  • 在可能的情况下,只在字典中查找一次值并将其存储为变量而不是多次查找
  • 固定式

这给了我以下代码:

public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
        return source.Scan(initialState, (oldState, item) =>
            {
                //Function to be called upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
                //Otherwise, if nothing is available yet, just return the input state
                OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
                {
                    int index = state.Index;
                    ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
                    IList<T> output = new List<T>();

                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        ImmutableList<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
                        {
                            //No values available yet
                            break;
                        }

                        T toOutput = nextValues[nextValues.Count - 1];
                        output.Add(toOutput);

                        buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
                        index++;
                    }

                    return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
                }

                //Before calling the recursive function, add the new item to the buffer
                TId itemIdentifier = identifierFunc(item);

                ImmutableList<T> valuesList;
                if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
                {
                    valuesList = ImmutableList<T>.Empty;
                }
                var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));

                return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();

                if (output.Index == identifierSequence.Count)
                {
                    notifications.Add(Notification.CreateOnCompleted<T>());
                }

                return notifications;
            })
            .Dematerialize();
    }

    class OrderByIdentifierSequenceState<T, TId>
    {
        //Index shows what T we're waiting on
        public int Index { get; }
        //Buffer holds T that have arrived that we aren't ready yet for
        public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
        //Output holds T that can be safely emitted.
        public IEnumerable<T> Output { get; }

        public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
        {
            this.Index = index;
            this.Buffer = buffer;
            this.Output = output;
        }
    }
Run Code Online (Sandbox Code Playgroud)

但是,这段代码仍然有几个问题:

  • 不断复制状态(主要是ImmutableDictionary),这可能非常昂贵。可能的解决方案:为每个观察者维护一个单独的状态,而不是每个接收到的项目。
  • identifierSequence源 observable 中不存在 中的一个或多个元素时,就会出现问题。这当前阻止了有序的 observable,它永远不会完成。可能的解决方案:超时,在源可观察完成时抛出异常,在源可观察完成时返回所有可用项,...
  • 当源 observable 包含的元素多于 时identifierSequence,我们就会出现内存泄漏。源 observable 中但identifierSequence当前不在的项目会被添加到字典中,但在源 observable 完成之前不会被删除。这是潜在的内存泄漏。可能的解决方案:identifierSequence在将其添加到字典之前检查该项目是否在,绕过代码并立即输出该项目,...

我的解决方案:

    /// <summary>
    /// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
    /// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
    /// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
    /// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
    /// </summary>
    /// <typeparam name="T">The type that is produced by the source observable</typeparam>
    /// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
    /// <param name="source">The source observable</param>
    /// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
    /// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
    /// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
    public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }
        if (identifierSequence == null)
        {
            throw new ArgumentNullException(nameof(identifierSequence));
        }
        if (identifierFunc == null)
        {
            throw new ArgumentNullException(nameof(identifierFunc));
        }

        if (identifierSequence.Count == 0)
        {
            return Observable.Empty<T>();
        }

        HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);

        return Observable.Create<T>(observer =>
        {
            //current index of pending item in identifierSequence
            int index = 0;
            //buffer of items we have received but are not ready for yet
            Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();

            return source.Select(
                    item =>
                    {
                        //Function to be called upon receiving new item
                        //We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
                        //If it is not available yet, stop.
                        IEnumerable<T> GetAvailableOutput()
                        {
                            while (index < identifierSequence.Count)
                            {
                                TId key = identifierSequence[index];
                                List<T> nextValues;
                                if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                                {
                                    //No values available yet
                                    break;
                                }

                                yield return nextValues[nextValues.Count - 1];

                                nextValues.RemoveAt(nextValues.Count - 1);
                                index++;
                            }
                        }

                        //Get the identifier for this item
                        TId itemIdentifier = identifierFunc(item);

                        //If this item is not in identifiersInSequence, we ignore it.
                        if (!identifiersInSequence.Contains(itemIdentifier))
                        {
                            return Enumerable.Empty<T>();
                        }

                        //Add the new item to the buffer
                        List<T> valuesList;
                        if (!buffer.TryGetValue(itemIdentifier, out valuesList))
                        {
                            valuesList = new List<T>();
                            buffer[itemIdentifier] = valuesList;
                        }
                        valuesList.Add(item);

                        //Return all available items
                        return GetAvailableOutput();
                    })
                .Subscribe(output =>
                {
                    foreach (T cur in output)
                    {
                        observer.OnNext(cur);
                    }

                    if (index == identifierSequence.Count)
                    {
                        observer.OnCompleted();
                    }
                },(ex) =>
                {
                    observer.OnError(ex);
                }, () =>
                {
                    //When source observable is completed, return the remaining available items
                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        List<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                        {
                            //No values available
                            index++;
                            continue;
                        }

                        observer.OnNext(nextValues[nextValues.Count - 1]);

                        nextValues.RemoveAt(nextValues.Count - 1);
                        index++;
                    }

                    //Mark observable as completed
                    observer.OnCompleted();
                });
        });
    }
Run Code Online (Sandbox Code Playgroud)

Shl*_*omo 4

请注意,您的实施存在一些问题:

  1. 如果两个人提前到来,其中一个会被吞掉,然后阻碍整个序列。您的字典应该映射到一个集合,而不是单个项目。
  2. 没有OnCompleted消息。
  3. 多个订阅者可能会搞砸状态。试试这个(GetPatternMatchOriginal你的代码在哪里):

-

var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 });

stateMachine.Take(3).Dump(); //Linqpad
stateMachine.Take(3).Dump(); //Linqpad
Run Code Online (Sandbox Code Playgroud)

第一个输出是,h e l第二个输出是l o。他们都应该输出h e l.

此实现解决了这些问题,并且使用不可变数据结构也没有副作用:

public static class X
{
    public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence)
    {
        //State is held in an anonymous type: 
        //  Index shows what character we're waiting on, 
        //  Buffer holds characters that have arrived that we aren't ready yet for
        //  Output holds characters that can be safely emitted.
        return source
            .Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() },
            (state, item) =>
            {
                //Function to be called recursively upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item
                //Otherwise just return the inputs
                (int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results)
                {
                    if (index == identifierSequence.Length)
                        return (index, buffer, results);

                    var key = identifierSequence[index];
                    if (buffer.ContainsKey(key) && buffer[key].Any())
                    {
                        var toOuptut = buffer[key][buffer[key].Count - 1];
                        return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut }));
                    }
                    else
                        return (index, buffer, results);
                }

                //Before calling the recursive function, add the new item to the buffer
                var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier)
                   ? state.Buffer
                   : state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty);

                var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item));

                var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>());
                return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output };
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();
                if (output.Index == identifierSequence.Length)
                    notifications.Add(Notification.CreateOnCompleted<T>());
                return notifications;
            })
            .Dematerialize();
    }
}
Run Code Online (Sandbox Code Playgroud)

那么你可以这样称呼它:

var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Dump(); //LinqPad

src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 7, Character = 'o' });
src.OnNext(new T { Identifier = 3, Character = 'e' });
src.OnNext(new T { Identifier = 9, Character = 'h' });
Run Code Online (Sandbox Code Playgroud)