在Rx.NET库中提供这三种方法
public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync) {...}
public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync) {...}
public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync) {...}
Run Code Online (Sandbox Code Playgroud)
我在MSVS 2013中编写以下示例代码:
var sequence =
Observable.Create<int>( async ( observer, token ) =>
{
while ( true )
{
token.ThrowIfCancellationRequested();
await Task.Delay( 100, token );
observer.OnNext( 0 );
}
} );
Run Code Online (Sandbox Code Playgroud)
由于模糊的重载,这不会编译.编译器的确切输出是:
Error 1 The call is ambiguous between the following methods or properties:
'System.Reactive.Linq.Observable.Create<int>(System.Func<System.IObserver<int>,System.Threading.CancellationToken,System.Threading.Tasks.Task<System.Action>>)'
and
'System.Reactive.Linq.Observable.Create<int>(System.Func<System.IObserver<int>,System.Threading.CancellationToken,System.Threading.Tasks.Task>)'
Run Code Online (Sandbox Code Playgroud)
但是只要我while( true )
用while( false …
我最近开始研究Reactive Extensions,主要是在客户端使用Angular 2进行观察.可观察的Rx和async的概念 - 等待dotnet似乎非常相似.是否有任何具体的例子,其中一个适用,另一个不适用.如果没有,微软推出Rx.Net是否还有其他原因,因为可观察量是Reactive Extensions的核心.任何链接或实时示例都足够了.我正在寻找差异线程/性能明智.
我想列举一个大IEnumerable
过一次,并观察附有各运营商(枚举Count
,Sum
,Average
等)。显而易见的方法是IObservable
使用 method将其转换为 an ToObservable
,然后为它订阅观察者。我注意到这比其他方法慢得多,比如做一个简单的循环并在每次迭代时通知观察者,或者使用Observable.Create
方法而不是ToObservable
. 差异很大:它慢了 20-30 倍。就是这样,还是我做错了什么?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static …
Run Code Online (Sandbox Code Playgroud) 我正在尝试解析表示消息的传入字节流.我需要拆分流并为每个部分创建一个消息结构.
消息始终以0x81(BOM)开头,以0x82(EOM)结束.
start: 0x81
header: 3 bytes
data: arbitrary length
stop: 0x82
Run Code Online (Sandbox Code Playgroud)
数据部分使用转义字节0x1B(ESC)进行转义:只要数据部分中的一个字节包含控制字节{ESC,BOM,EOM}之一,就会以ESC为前缀.
标题部分不会被转义,并且可能包含控制字节.
我想使用Rx.Net以功能性的反应式编码,通过消费IObservable<byte>
并将其转换为IObservable<Message>
.
最常用的方法是什么?
一些例子:
[81 01 02 03 82] single message
[81 82 81 82 82] single message, header = [82 81 82]
[81 01 02 1B 82] single message, header = [01 02 1B].
[81 01 02 03 1B 82 82] single message, header = [01 02 03], (unescaped) data = [82]
[81 01 02 03 1B 1B 82 82] single message …
Run Code Online (Sandbox Code Playgroud) 我有一个异步方法,说:
public async Task<T> GetAsync()
{
}
Run Code Online (Sandbox Code Playgroud)
并将从以下位置调用:
public async Task<IEnumerable<T>> GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
yield return result;
}
}
Run Code Online (Sandbox Code Playgroud)
上面的语法无效,但基本上我是在异步生成器之后。我知道它可以通过 Observables 处理。我对 Rx.NET 进行了实验,并且在一定程度上奏效了。但我试图避免它给代码库带来的复杂性,更重要的是,上述需求本质上仍然不是一个反应式系统(我们的仍然是基于拉取的)。例如,我只会在一段时间内收听传入的异步流,并且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。
我可以像这样反转方法签名:
public IEnumerable<Task<T>> GetAllAsync()
Run Code Online (Sandbox Code Playgroud)
但这使得在不阻塞的情况下进行 LINQ 操作有点棘手。我希望它是非阻塞的,并且不将整个内容加载到内存中。这个库:AsyncEnumerable正是我正在寻找的,但如何用Ix.NET做到这一点?它们的用途与我相信的相同。
换句话说,我如何利用 Ix.NETIAsyncEnumerable
在处理await
? 喜欢,
public async IAsyncEnumerable GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
return // what?
}
}
Run Code Online (Sandbox Code Playgroud) 我想要一种将异步方法转换为 observable 的通用方法。就我而言,我正在处理用于HttpClient
从 API 获取数据的方法。
假设我们有一个方法Task<string> GetSomeData()
需要成为一个单一的方法,Observable<string>
其中的值是作为以下组合生成的:
GetSomeData()
(例如每 x 秒)GetSomeData()
在任何给定时间手动触发调用(例如当用户点击刷新时)。由于有两种方法可以触发GetSomeData()
并发执行可能是一个问题。为了避免要求GetSomeData()
线程安全,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要使用某种策略来处理重叠的请求。我做了一个(某种)大理石图来描述问题和想要的结果
我的直觉告诉我有一个简单的方法可以实现这一点,所以请给我一些见解:)
这是我到目前为止的解决方案。不幸的是,它并没有解决并发问题。
public class ObservableCreationWrapper<T>
{
private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
private Func<Task<T>> _methodToCall;
private IObservable<T> _manualCalls;
public IObservable<T> Stream { get; private set; }
public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
{
_methodToCall = methodToCall;
_manualCalls = _manualCallsSubject.AsObservable()
.Select(x => Observable.FromAsync(x => methodToCall()))
.Merge(1);
Stream = Observable.FromAsync(() => _methodToCall())
.DelayRepeat(period)
.Merge(_manualCalls);
}
public void TriggerAdditionalCall() …
Run Code Online (Sandbox Code Playgroud) 我有一个类接收事件流,并推出另一个事件流.
所有事件都使用Reactive Extensions(RX).传入的事件流从外部源推送到IObserver<T>
使用中.OnNext
,并使用IObservable<T>
和推出传出的事件流.Subscribe
.我Subject<T>
在幕后用来管理这个.
我想知道RX中有什么技术暂时暂停输出.这意味着传入事件将在内部队列中累积,当它们取消暂停时,事件将再次流出.
我已经开始考虑在 EventStore 中使用 Reactive Extensions。作为概念证明,我想看看我是否可以让 Rx 使用事件流并输出按类型分组的事件计数一秒钟的窗口。
因此,假设我正在使用名为“orders”的流,我希望在控制台中看到如下所示的内容:
OrderCreated 201
OrderUpdated 111
Run Code Online (Sandbox Code Playgroud)
(一秒钟过去了..)
OrderCreated 123
OrderUpdated 132
Run Code Online (Sandbox Code Playgroud)
等等。
到目前为止,我已经能够获得每秒所有事件计数的输出。但似乎无法按事件类型对它们进行分组。
我使用的代码基于James Nugent 的一个要点:
internal class EventStoreRxSubscription
{
public Subject<ResolvedEvent> ResolvedEvents { get; }
public Subject<SubscriptionDropReason> DroppedReasons { get; }
public EventStoreSubscription Subscription { get; }
public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> resolvedEvent, Subject<SubscriptionDropReason> droppedReasons)
{
Subscription = subscription;
ResolvedEvents = resolvedEvent;
DroppedReasons = droppedReasons;
}
}
static class EventStoreConnectionExtensions
{
public static Task<EventStoreRxSubscription> SubscribeTo(this IEventStoreConnection connection, string streamName, bool …
Run Code Online (Sandbox Code Playgroud) 我有一个IObservable<Item>
内部类,我想公开一个只读属性,该属性提供在给定时间推送到 observable 的最后一个项目。因此它将提供 的单个值Item
。
如果没有推送任何值,那么它将必须返回一个默认值。
如何在不必订阅可观察对象并拥有“支持字段”的情况下执行此操作?
我对反应式扩展相当陌生,想要根据时间缓冲流,或者根据不超过阈值的运行总和(每个项目的大小由 lambda 指定)以先发生者为准,就像按计数或时间现有的Buffer
那样。
目前,我已经编写了自己的方法实现Buffer
,该方法按预期工作,使用IScheduler
for 触发超时,然后管理内存中自己的缓冲区,并在累积总和超过阈值时发出它们,但这感觉有点低水平,我认为必须有一个更优雅的解决方案来以某种方式使用现有的反应式操作来表达它,并且可能使用重载TBufferClosing
来Buffer
代替。
到目前为止,我想出的最佳解决方案如下,但它的缺点是包含最后一项,导致阈值导致总和大于请求的最大总和:
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
var shared = source.Publish().RefCount();
return shared.Buffer(() => Observable.Amb(
Observable.Timer(timeSpan)
.Select(_ => Unit.Default),
shared.Select(sizeSelector)
.Scan((a, b) => a + b)
.SkipWhile(accumulated => accumulated < maxSize)
.Select(_ => Unit.Default))
);
}
Run Code Online (Sandbox Code Playgroud)
这是否可以与现有的操作员一起工作(通过调整我的上述版本或完全以其他方式),或者我是否被迫继续使用我的自定义Buffer
实现处理计时器并自己缓冲?