Wou*_*ter 5 c# sorting reactive-programming system.reactive
说我有一个类型T:
class T {
public int identifier; //Arbitrary but unique for each character (Guids in real-life)
public char character; //In real life not a char, but I chose char here for easy demo purposes
}
Run Code Online (Sandbox Code Playgroud)
我有一个预定义的有序标识符序列:
int[] identifierSequence = new int[]{
9, 3, 4, 4, 7
};
Run Code Online (Sandbox Code Playgroud)
我现在需要订购IObservable<T>产生以下对象序列的an :
{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}
Run Code Online (Sandbox Code Playgroud)
使生成的 IObservable 产生hello. 我不想使用 ToArray,因为我想在对象到达时立即接收它们,而不是等到所有内容都被观察到。更具体地说,我希望像这样接收它们:
Input: e h l l o
Output: he l l o
Run Code Online (Sandbox Code Playgroud)
这样做的正确反应方式是什么?我能想到的最好的办法是:
Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;
inputObserable.SelectMany(item =>
{
buffer[item.identifier] = item;
IEnumerable<ReportTemplate> GetReadyElements()
{
while (true)
{
int nextItemIdentifier = identifierSequence[curIndex];
T nextItem;
if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
{
buffer.Remove(nextItem.identifier);
curIndex++;
yield return nextItem;
}
else
{
break;
}
}
}
return GetReadyElements();
});
Run Code Online (Sandbox Code Playgroud)
编辑:
Schlomo 对我的代码提出了一些非常有效的问题,这就是为什么我将他的答案标记为正确的原因。我对他的代码进行了一些修改以使其可用:
这给了我以下代码:
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
return source.Scan(initialState, (oldState, item) =>
{
//Function to be called upon receiving new item
//If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
//Otherwise, if nothing is available yet, just return the input state
OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
{
int index = state.Index;
ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
IList<T> output = new List<T>();
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
ImmutableList<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
{
//No values available yet
break;
}
T toOutput = nextValues[nextValues.Count - 1];
output.Add(toOutput);
buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
index++;
}
return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
}
//Before calling the recursive function, add the new item to the buffer
TId itemIdentifier = identifierFunc(item);
ImmutableList<T> valuesList;
if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = ImmutableList<T>.Empty;
}
var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));
return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
})
// Use Dematerialize/Notifications to detect and emit end of stream.
.SelectMany(output =>
{
var notifications = output.Output
.Select(item => Notification.CreateOnNext(item))
.ToList();
if (output.Index == identifierSequence.Count)
{
notifications.Add(Notification.CreateOnCompleted<T>());
}
return notifications;
})
.Dematerialize();
}
class OrderByIdentifierSequenceState<T, TId>
{
//Index shows what T we're waiting on
public int Index { get; }
//Buffer holds T that have arrived that we aren't ready yet for
public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
//Output holds T that can be safely emitted.
public IEnumerable<T> Output { get; }
public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
{
this.Index = index;
this.Buffer = buffer;
this.Output = output;
}
}
Run Code Online (Sandbox Code Playgroud)
但是,这段代码仍然有几个问题:
ImmutableDictionary),这可能非常昂贵。可能的解决方案:为每个观察者维护一个单独的状态,而不是每个接收到的项目。identifierSequence源 observable 中不存在 中的一个或多个元素时,就会出现问题。这当前阻止了有序的 observable,它永远不会完成。可能的解决方案:超时,在源可观察完成时抛出异常,在源可观察完成时返回所有可用项,...identifierSequence,我们就会出现内存泄漏。源 observable 中但identifierSequence当前不在的项目会被添加到字典中,但在源 observable 完成之前不会被删除。这是潜在的内存泄漏。可能的解决方案:identifierSequence在将其添加到字典之前检查该项目是否在,绕过代码并立即输出该项目,...我的解决方案:
/// <summary>
/// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
/// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
/// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
/// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
/// </summary>
/// <typeparam name="T">The type that is produced by the source observable</typeparam>
/// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
/// <param name="source">The source observable</param>
/// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
/// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
/// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (identifierSequence == null)
{
throw new ArgumentNullException(nameof(identifierSequence));
}
if (identifierFunc == null)
{
throw new ArgumentNullException(nameof(identifierFunc));
}
if (identifierSequence.Count == 0)
{
return Observable.Empty<T>();
}
HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);
return Observable.Create<T>(observer =>
{
//current index of pending item in identifierSequence
int index = 0;
//buffer of items we have received but are not ready for yet
Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();
return source.Select(
item =>
{
//Function to be called upon receiving new item
//We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
//If it is not available yet, stop.
IEnumerable<T> GetAvailableOutput()
{
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available yet
break;
}
yield return nextValues[nextValues.Count - 1];
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
}
//Get the identifier for this item
TId itemIdentifier = identifierFunc(item);
//If this item is not in identifiersInSequence, we ignore it.
if (!identifiersInSequence.Contains(itemIdentifier))
{
return Enumerable.Empty<T>();
}
//Add the new item to the buffer
List<T> valuesList;
if (!buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = new List<T>();
buffer[itemIdentifier] = valuesList;
}
valuesList.Add(item);
//Return all available items
return GetAvailableOutput();
})
.Subscribe(output =>
{
foreach (T cur in output)
{
observer.OnNext(cur);
}
if (index == identifierSequence.Count)
{
observer.OnCompleted();
}
},(ex) =>
{
observer.OnError(ex);
}, () =>
{
//When source observable is completed, return the remaining available items
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available
index++;
continue;
}
observer.OnNext(nextValues[nextValues.Count - 1]);
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
//Mark observable as completed
observer.OnCompleted();
});
});
}
Run Code Online (Sandbox Code Playgroud)
请注意,您的实施存在一些问题:
OnCompleted消息。GetPatternMatchOriginal你的代码在哪里):-
var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Take(3).Dump(); //Linqpad
stateMachine.Take(3).Dump(); //Linqpad
Run Code Online (Sandbox Code Playgroud)
第一个输出是,h e l第二个输出是l o。他们都应该输出h e l.
此实现解决了这些问题,并且使用不可变数据结构也没有副作用:
public static class X
{
public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence)
{
//State is held in an anonymous type:
// Index shows what character we're waiting on,
// Buffer holds characters that have arrived that we aren't ready yet for
// Output holds characters that can be safely emitted.
return source
.Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() },
(state, item) =>
{
//Function to be called recursively upon receiving new item
//If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item
//Otherwise just return the inputs
(int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results)
{
if (index == identifierSequence.Length)
return (index, buffer, results);
var key = identifierSequence[index];
if (buffer.ContainsKey(key) && buffer[key].Any())
{
var toOuptut = buffer[key][buffer[key].Count - 1];
return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut }));
}
else
return (index, buffer, results);
}
//Before calling the recursive function, add the new item to the buffer
var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier)
? state.Buffer
: state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty);
var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item));
var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>());
return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output };
})
// Use Dematerialize/Notifications to detect and emit end of stream.
.SelectMany(output =>
{
var notifications = output.Output
.Select(item => Notification.CreateOnNext(item))
.ToList();
if (output.Index == identifierSequence.Length)
notifications.Add(Notification.CreateOnCompleted<T>());
return notifications;
})
.Dematerialize();
}
}
Run Code Online (Sandbox Code Playgroud)
那么你可以这样称呼它:
var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Dump(); //LinqPad
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 7, Character = 'o' });
src.OnNext(new T { Identifier = 3, Character = 'e' });
src.OnNext(new T { Identifier = 9, Character = 'h' });
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1135 次 |
| 最近记录: |