joe*_*dev 6 .net c# sockets system.reactive
我正在从一个泵送给我的消息IObservable<byte[]>和反序列化为字符串,然后通过一个字符串泵出IObservable<string>.A 通过转换Socket填充IObservable<byte[]>消息FromEventPattern.来自Socket换行符分隔字符串的反序列化消息.由于从中接收的单个消息Socket不需要是单个分隔的字符串(它可以是任意数量的消息的任何部分,并且部分消息是可能的).想到解决这个问题的第一种方法是使用a Subject和一个闭包这样:
private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes)
{
const byte byteLineFeed = 10;
var subject = new Subject<string>();
byte[] leftovers = null;
bytes.Subscribe(current =>
{
var lastPositionOfLineFeed = -1;
for (var i = 0; i < current.Length; i++)
{
if (current[i] == byteLineFeed)
{
if (leftovers != null)
{
subject.OnNext(
Encoding.ASCII.GetString(
leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
i - lastPositionOfLineFeed))
.ToArray()));
leftovers = null;
}
else
{
subject.OnNext(
Encoding.ASCII.GetString(
current.Slice(lastPositionOfLineFeed + 1,
i - lastPositionOfLineFeed)));
}
lastPositionOfLineFeed = i;
}
}
if (lastPositionOfLineFeed != current.Length - 1)
{
if (leftovers != null)
{
leftovers = leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
current.Length - lastPositionOfLineFeed - 1))
.ToArray();
}
else
{
leftovers = current.Slice(lastPositionOfLineFeed + 1,
current.Length - lastPositionOfLineFeed - 1);
}
}
});
return subject.AsObservable();
}
Run Code Online (Sandbox Code Playgroud)
这很好用,但我知道由于Subjects各种原因而不赞成使用,其中一些原因在这段代码中有所体现.我觉得我可能会在这里重新发明轮子,因为我并不完全熟悉Rx中的所有方法.我可以在没有封闭的情况下这样做Subject吗?如果是这样,我该怎么办呢?或者使用Subject这里有意义吗?
我将使用 SelectMany 和返回 IEnumerable<string> 的选择器。
例如:
public static IObservable<string> GetCompleteMessage(this IObservable<byte[]> source)
{
const byte byteLineFeed = 10;
IEnumerable<byte> remanider = Enumerable.Empty<byte>();
Func<byte[], IEnumerable<string>> selector = data =>
{
var result = new List<string>();
var current = new ArraySegment<byte>(data);
while (true)
{
var dividerOffset = ((IList<byte>)current).IndexOf(byteLineFeed);
if (dividerOffset == -1) // No newline found
{
remanider = remanider.Concat(current);
break;
}
var segment = new ArraySegment<byte>(current.Array, current.Offset, dividerOffset);
var lineBytes = remanider.Concat(segment).ToArray();
result.Add(Encoding.ASCII.GetString(lineBytes));
remanider = Enumerable.Empty<byte>();
current = new ArraySegment<byte>(current.Array, current.Offset + dividerOffset + 1, current.Count - 1 - dividerOffset);
}
return result;
};
return source.SelectMany(selector);
}
Run Code Online (Sandbox Code Playgroud)
或者,您可以使用 NetworkStream 和 StreamReader 来实现相同的结果:
public static IObservable<string> ReadLineObservable(this TextReader reader)
{
return Observable.FromAsync(() => reader.ReadLineAsync())
.Repeat()
.TakeWhile(x => x != null);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1050 次 |
| 最近记录: |