我正在实现IObservable<T>某些类的接口.我使用Reflector来弄清楚这是如何在Rx中完成的.关于观察者如何跟踪其订阅者并通过他们的OnNext方法通知他们,我偶然发现了与此类似的代码:
private List<Observer<T>> observers;
// subscribe a new observer:
public IDisposable Subscribe(IObserver<T> observer)
{
observers.Add(observer);
...
}
// trigger all observers' OnNext method:
...
foreach (IObserver<T> observer in observers)
{
observer.OnNext(value);
}
Run Code Online (Sandbox Code Playgroud)
由于所有代表都是多播的,因此不能简化为:
Action<T> observers;
// subscribe observer:
public IDisposable Subscribe(IObserver<T> observer)
{
observers += observer.OnNext;
...
}
// trigger observers' OnNext:
...
observers(value);
Run Code Online (Sandbox Code Playgroud)
或者第一种方法(性能,线程/并发问题......)有特定的优势吗?
我有一个方法返回接口IObservable<A>(在silverlight中)并希望将其转换为另一个IObservable<B>?
那么我需要代替'CONVERT_SOME_HOW'
public IObservable<Bar> Get(IEnumerable<object> @params)
{
IObservable<Foo> fooObservable = _resources.Get(@params);
IObservable<Bar> barObservable = CONVERT_SOME_HOW(fooObservable);
return barObservable;
}
Run Code Online (Sandbox Code Playgroud) 我有一个IObservable<byte[]>我转换成IObservable<XDocument>使用一些中间步骤:
var observedXDocuments =
from b in observedBytes
// Lot of intermediate steps to transform byte arrays into XDocuments
select xDoc;
Run Code Online (Sandbox Code Playgroud)
在某个时间点,我对观察到的XDocuments 感兴趣所以我订阅了IObserver<XDocument>.在稍后的时间点,我想订阅另一个IObserver<XDocument>并处理旧的.
如何在一次原子操作中完成此操作,而不会丢失任何观察到的XDocument?我可以这样做:
oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);
Run Code Online (Sandbox Code Playgroud)
我很担心,在这两个电话之间,我可以放松一下XDocument.如果我切换两个电话,我可能会收到XDocument两次相同的电话.
我的VS2012与Net4.5没有ISubject,Subject类.他们成了obselete?我应该只使用IObservable和IObserver吗?如果不是我怎么能得到它们?
所以我的情况似乎无法解决.
我有一种情况,我希望并行运行两个网络请求,然后在每个网络请求结束时运行一些代码,然后在每个网络请求的处理结束时运行额外的.
模仿这样
GET -> /users (run unique code to this request independently once the request is done)
GET -> /groups (run some unique code to this request independently once the request is done)
Both requests are done, now run some unique code independent of the request processing.
Run Code Online (Sandbox Code Playgroud)
我一直试图做一个Observable.merge,但这似乎有希望,因为它不允许我将订阅代码与一个大型处理程序分开.有没有人有建议?
我试图按固定的时间间隔合并两个传感器数据流,但是在Rx中无法正确执行此操作。我想出的最好的是下面的示例,但是我怀疑这是对Rx的最佳使用。
有没有更好的办法?
我已经尝试过Sample(),但是传感器以不规则的时间间隔,慢速(> 1秒)和快速(<1秒)产生值。Sample()似乎只处理快速数据。
Observable<SensorA> sensorA = ... /* hot */
Observable<SensorB> sensorB = ... /* hot */
SensorA lastKnownSensorA;
SensorB lastKnownSensorB;
sensorA.Subscribe(s => lastKnownSensorA = s);
sensorB.Subscribe(s => lastKnownSensorB = s);
var combined = Observable.Interval(TimeSpan.FromSeconds(1))
.Where(t => _lastKnownSensorA != null)
.Select(t => new SensorAB(lastKnownSensorA, lastKnownSensorB)
Run Code Online (Sandbox Code Playgroud) Rx很棒,但有时很难找到优雅的做事方式.这个想法很简单.我收到一个带有byte []的事件,这个数组可能包含一行,多行或一行.我想要的是找到一种方法来生成一个IObservable of Line IObservable<String>,其中序列的每个元素都是一条线.
几小时后,我发现最接近的解决方案非常难看,当然不能正常工作,因为扫描触发每个字符上的OnNext:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
Run Code Online (Sandbox Code Playgroud)
注意:_reactiveSubcription应该是IObservable<String> …
如果问题措辞不当,我会道歉,我会尽我所能.
如果我有一系列值的时间,Observable[(U,T)]其中U是一个值,而T是一个类似时间的类型(或者我认为有任何差异),我怎么能写一个自动重置一触式屏障的运算符,这是沉默的abs(u_n - u_reset) < barrier,但是t_n - t_reset如果触摸屏障会吐出,此时它也会重置u_reset = u_n.
也就是说,此运算符接收的第一个值成为基线,并且它不会发出任何结果.此后,它监视流的值,并且只要其中一个超出基线值(高于或低于),它就会发出经过的时间(通过事件的时间戳测量),并重置基线.然后将处理这些时间以形成波动率的高频估计.
作为参考,我试图写一个在http://www.amazon.com/Volatility-Trading-CD-ROM-Wiley/dp/0470181990中概述的波动率估算器,而不是测量标准偏差(在常规均匀时间的偏差) ),你反复测量一些固定障碍量突破障碍所需的时间.
具体来说,这可以使用现有的运营商编写吗?我有点不知道状态将如何被重置,尽管我可能需要制作两个嵌套的运算符,一个是一次性的,另一个是不断创建一次性的...我知道它可以通过写入来完成一个手工,但后来我需要写自己的出版商等.
谢谢!
我正在尝试实现以下方案:
interval通过调用GetInterval()方法计算An .DoSomething()立即调用该方法.DoSomething()每interval毫秒重复调用该方法,直到释放按钮.它可以实现如下:
Timer timer = new Timer();
timer.Tick += DoSomething();
//keyDown and keyUp are IObservables created from the button's events
keyDown.Do(_ =>
{
timer.Interval = GetInterval();
timer.Start();
DoSomething();
});
keyUp.Do(_ =>
{
timer.Stop();
});
Run Code Online (Sandbox Code Playgroud)
我知道这段代码有一些问题,但没关系.我想要实现的是使用纯Reactive Extensions实现这个场景,没有任何外部定时器和副作用(除了调用DoSomething()方法).
我知道Observable.Timer,但我不知道如何在这种情况下使用它.
编辑:我可以创建一个IObservable<int>包含当前值的interval.它可能会有所帮助.
有没有办法计算RxAndroid中已经在流中处理过的元素数量?
我有类似的东西:
Observable.fromArray(new String[]{"these", "are", "my", "values", "."})
.map(s -> doSomeCoolStuff(s))
// ...
.subscribe(amountOfProcessedItems -> Log.d("test", "" + amountOfProcessedItems));
Run Code Online (Sandbox Code Playgroud)
我正在寻找一些东西,以便我的输出看起来像1 2 3 4 5,基本上每个项目后计算已经发出的项目数量.
system.reactive ×10
c# ×6
.net ×4
rx-java ×3
delegates ×1
java ×1
rx-android ×1
rx-scala ×1
scala ×1
silverlight ×1
timer ×1
volatility ×1