Reactive Extensions - 将来自IObservable <byte []>的流反序列化为单独的分隔消息,而不使用Subject

joe*_*dev 6 .net c# sockets system.reactive

我正在从一个泵送给我的消息IObservable<byte[]>和反序列化为字符串,然后通过一个字符串泵出IObservable<string>.A 通过转换Socket填充IObservable<byte[]>消息FromEventPattern.来自Socket换行符分隔字符串的反序列化消息.由于从中接收的单个消息Socket不需要是单个分隔的字符串(它可以是任意数量的消息的任何部分,并且部分消息是可能的).想到解决这个问题的第一种方法是使用a Subject和一个闭包这样:

private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes)
{
    const byte byteLineFeed = 10;
    var subject = new Subject<string>();
    byte[] leftovers = null;

    bytes.Subscribe(current =>
    {
        var lastPositionOfLineFeed = -1;
        for (var i = 0; i < current.Length; i++)
        {
            if (current[i] == byteLineFeed)
            {
                if (leftovers != null)
                {
                    subject.OnNext(
                        Encoding.ASCII.GetString(
                            leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                                i - lastPositionOfLineFeed))
                                        .ToArray()));
                    leftovers = null;
                }
                else
                {
                    subject.OnNext(
                        Encoding.ASCII.GetString(
                            current.Slice(lastPositionOfLineFeed + 1,
                                i - lastPositionOfLineFeed)));
                }
                lastPositionOfLineFeed = i;
            }
        }
        if (lastPositionOfLineFeed != current.Length - 1)
        {
            if (leftovers != null)
            {
                leftovers = leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                    current.Length - lastPositionOfLineFeed - 1))
                                        .ToArray();
            }
            else
            {
                leftovers = current.Slice(lastPositionOfLineFeed + 1,
                    current.Length - lastPositionOfLineFeed - 1);
            }
        }
    });

    return subject.AsObservable();
}
Run Code Online (Sandbox Code Playgroud)

这很好用,但我知道由于Subjects各种原因而不赞成使用,其中一些原因在这段代码中有所体现.我觉得我可能会在这里重新发明轮子,因为我并不完全熟悉Rx中的所有方法.我可以在没有封闭的情况下这样做Subject吗?如果是这样,我该怎么办呢?或者使用Subject这里有意义吗?

Foo*_*ole 4

我将使用 SelectMany 和返回 IEnumerable<string> 的选择器。

例如:

    public static IObservable<string> GetCompleteMessage(this IObservable<byte[]> source)
    {
        const byte byteLineFeed = 10;
        IEnumerable<byte> remanider = Enumerable.Empty<byte>();

        Func<byte[], IEnumerable<string>> selector = data =>
        {
            var result = new List<string>();
            var current = new ArraySegment<byte>(data);

            while (true)
            {
                var dividerOffset = ((IList<byte>)current).IndexOf(byteLineFeed);

                if (dividerOffset == -1) // No newline found
                {
                    remanider = remanider.Concat(current);
                    break;
                }

                var segment = new ArraySegment<byte>(current.Array, current.Offset, dividerOffset);
                var lineBytes = remanider.Concat(segment).ToArray();
                result.Add(Encoding.ASCII.GetString(lineBytes));

                remanider = Enumerable.Empty<byte>();
                current = new ArraySegment<byte>(current.Array, current.Offset + dividerOffset + 1, current.Count - 1 - dividerOffset);
            }

            return result;
        };

        return source.SelectMany(selector);
    }
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用 NetworkStream 和 StreamReader 来实现相同的结果:

    public static IObservable<string> ReadLineObservable(this TextReader reader)
    {
        return Observable.FromAsync(() => reader.ReadLineAsync())
            .Repeat()
            .TakeWhile(x => x != null);
    }
Run Code Online (Sandbox Code Playgroud)