我是一个Rx新手,所以我希望你能忍受我.作为我自己的练习,也可能是我可以为同事演示的样本,我为Dns.BeginGetHostEntry()/ EndGetHostEntry()完成了两个包装类:DnsResolver和DnsResolverRx.
每个类都有一个公共静态方法:
void Resolve(string host, Action<IPHostEntry> getResult, Control context = null);
Run Code Online (Sandbox Code Playgroud)
...以及使其变得有趣的一些其他要求:1.如果提供了上下文,则必须在关联的线程2上调用getResult.对于MaxResultAge秒,将缓存同一主机的先前结果.
非Rx版本工作正常,但与此问题并不真正相关.Rx版本如下所示:
class DnsResolverRx
{
static Func<string, IObservable<IPHostEntry>> _resolver = Observable.FromAsyncPattern<string, IPHostEntry>(Dns.BeginGetHostEntry, Dns.EndGetHostEntry);
public static void Resolve(string host, Action<IPHostEntry> setResult, Control context = null)
{
IObservable<IPHostEntry> result;
result = _cache.GetOrCreateValue( // a trivial TryGetValue wrapper
host,
() => _resolver(host)
.Do(e => Debug.WriteLine("resolved"))
.Repeat()
.Do(e => Debug.WriteLine("repeated"))
.Replay(MaxResultAge)
.RefCount()
);
result = result.Take(1); // each request needs only 1 result
if (context != null)
result = result.ObserveOn(context);
result.Subscribe(
entry …Run Code Online (Sandbox Code Playgroud) 对于基于Rx的变更跟踪解决方案,我需要一个可以在可观察序列中获取第一个和最新项目的运算符.
我如何编写一个生成以下大理石图的Rx运算符(注意:括号仅用于排列项目...我不确定如何最好地在文本中表示这一点):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
Run Code Online (Sandbox Code Playgroud) 好的,试着去理解Rx,有点迷失在这里.
FromAsyncPattern现在已被弃用,所以我从这里拿到了示例(使用Rx轻松完成任务),它可以工作,我只是做了一些更改,而不是等待等待观察和订阅.....
我不明白的是为什么称为SumSquareRoots函数的两倍?
var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
.Timeout(TimeSpan.FromSeconds(5));
res.Subscribe(y => Console.WriteLine(y));
res.Wait();
class Program
{
static void Main(string[] args)
{
Samples();
}
static void Samples()
{
var x = 100000000;
try
{
var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
.Timeout(TimeSpan.FromSeconds(5));
res.Subscribe(y => Console.WriteLine(y));
res.Wait();
}
catch (TimeoutException)
{
Console.WriteLine("Timed out :-(");
}
}
static Task<double> SumSquareRoots(long count, CancellationToken ct)
{
return Task.Run(() =>
{
var res = 0.0;
Console.WriteLine("Why I'm called twice");
for (long i …Run Code Online (Sandbox Code Playgroud) 我有一个方法返回IObservable<long>我从async方法调用的方法.我想将此转换为正常List<long>,但如果我CancellationToken发出信号,则取消该操作.
我想做这样的事情:
List<long> result = await Foo().ToList(myCancellationToken);
Run Code Online (Sandbox Code Playgroud)
完成此任务的正确(和最简单)方法是什么?返回an 的ToList()扩展方法并不带参数.IObservable<T>IObservable<List<T>>CancellationToken
我有一个问题,这是对本网站提出的以下问题的扩展.
当返回类型不重要时,是否有更优雅的方法来合并observable?
我有一个IObservable<Unit>(比方说X),一个反应集合(Y)和一个属性(Z).返回类型并不重要.我只想在任何这些改变时订阅.
我知道如何观察所有3并Subscribe使用Observable.Merge如下.
Observable.Merge(X, Y.Changed, ObservableForProperty(Z).Select(_ => Unit.Default)).Subscribe(..)
Run Code Online (Sandbox Code Playgroud)
它有效.
但是,当我尝试使用WhenAny(...,....,....).Subscribe()时,我的X更改时不会触发订阅.做上述使用WhenAny(...)而不是Observable.Merge(..)?? 的语法是什么?
我喜欢使用,WhenAny(....)因为我ReactiveUI在其他地方使用.
示例:假设我有一个派生自ReactiveObject以下属性的类.
public class AnotherVM : ReactiveObject
{
public bool IsTrue
{
get { return this.isTrue; }
set { this.RaiseAndSetIfChanged(x => x.isTrue, ref this.isTrue, value); }
}
public IObservable<Unit> Data
{
get { return this.data; }
}
public ReactiveCollection MyCol
{
get …Run Code Online (Sandbox Code Playgroud) 此问题引用名称空间中的CompositeDisposable类System.Reactive.Disposables。
对其成员进行CompositeDisposable呼叫的顺序是否Dispose确定?
以下实验产生了list ['a', 'b'],但是我在文档中看不到保证一定顺序的任何东西。
var result = new List<char>();
var disposable = new CompositeDisposable(
Disposable.Create(() => result.Add('a')),
Disposable.Create(() => result.Add('b')));
disposable.Dispose();
Run Code Online (Sandbox Code Playgroud) 所以情况就是这样.假设某个地方我正在填写一个集合.每次添加元素时,IObservable都会为其订阅者调用OnNext.
现在,将有一个点将填充集合.(我正在读一些东西,我读完了......无论如何).此时,在订阅者上调用OnComplete().
但是,用户不会遵守此方法.他宁愿打电话给他等待的异步方法......他并不关心他读的东西,他只关心他读完了.
基本上,我想从IObservable中获取一个Task,它在IObservable调用OnComplete()时返回给它的订阅者.我特别希望用户不要使用观察者,但只是要知道在他的等待调用之后发生的任何事情都会在收集完成后发生.
也许ToTask()方法可以解决问题?我无法通过文档说出来.
reactive-programming task-parallel-library system.reactive observable
如何在响应式扩展流中处理单个事件并将其拆分为同一个流中的多个事件?
我有一个序列来检索json数据,这是一个顶层的数组.在json数据被解码的那一点上,我想获取该数组中的每个元素并继续沿着流传递这些元素.
这是一个我希望存在的虚构函数的例子(但名字较短!).它是在Python中,但我认为它很简单,它应该对其他Rx程序员清晰.
# Smallest possible example
from rx import Observable
import requests
stream = Observable.just('https://api.github.com/users')
stream.map(requests.get) \
.map(lambda raw: raw.json()) \
.SPLIT_ARRAY_INTO_SEPARATE_EVENTS() \
.subscribe(print)
Run Code Online (Sandbox Code Playgroud)
换句话说,我想像这样进行转换:
From:
# --[a,b,c]--[d,e]--|->
To:
# --a-b-c-----d-e---|->
Run Code Online (Sandbox Code Playgroud) System.Reactive.Subjects.Subject<T>和System.Reactive.Subjects.ReplaySubject<T>课程有什么区别?
一个不是从另一个派生的,但它们具有相同的描述并在MSDN中实现相同的接口.
我的问题有点像Nagle算法创建的问题,但不完全正确.我想要的是将OnNext通知缓冲IObservable<T>到一系列IObservable<IList<T>>s中,如下所示:
T通知到达时,将其添加到缓冲区并开始倒计时T在倒计时到期之前收到另一个通知,请将其添加到缓冲区并重新开始倒计时T通知转发为单个聚合IList<T>通知.IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎定期发送聚合通知,而不是"在第一个通知到达时启动计时器,当其他通知到达时重启它"我想要的行为,并且它还发送一个空列表在如果没有从下面生成通知,则每个时间窗口的结束.
我不希望放弃任何的T通知; 只是缓冲它们.
有这样的事情存在,还是我需要自己编写?
system.reactive ×10
c# ×4
.net ×2
observable ×2
.net-4.5 ×1
arrays ×1
reactiveui ×1
split ×1
stream ×1
task ×1