这是一个介绍Reactive Framework的简单程序.但我想通过修改程序来尝试错误处理程序:
var cookiePieces = Observable.Range(1, 10);
cookiePieces.Subscribe(x =>
{
Console.WriteLine("{0}! {0} pieces of cookie!", x);
throw new Exception(); // newly added by myself
},
ex => Console.WriteLine("the exception message..."),
() => Console.WriteLine("Ah! Ah! Ah! Ah!"));
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)
在此示例中,使用了以下过载.
public static IDisposable Subscribe<TSource>(
this IObservable<TSource> source,
Action<TSource> onNext,
Action<Exception> onError,
Action onCompleted);
Run Code Online (Sandbox Code Playgroud)
我希望我会看到打印的异常消息,但控制台应用程序崩溃了.是什么原因?
我搜索了一个样本,但找不到清楚解释如何使用RX设置的东西:我有这个要求......
听起来非常简单,我在后台线程中完成了第三位(没有RX,但在list <int>上有标准查找)并且很容易添加到列表框中.当没有背景工作者等尝试做同样的事情并且只使用RX时,我被卡住了.
为可能的愚蠢问题道歉(对于那里的RX专家),但请帮助我们如何使用RX完成此WPF.
谢谢.
我究竟做错了什么 ?我刚下载了最新的Rx sdk,已安装.使用vs 2010,.net 4具有所有最新的sp/updates等.下载/安装了linqpad,添加了对被动dll的引用,如附带的屏幕截图所示.添加了一行,如linqpad演示中所示,但在运行时出错.请指教.右键单击图像并查看图像以获得清晰视图.
谢谢

我正在学习RX(Reactive Extensions),并且发现有人在近一年前使用F#和RX发布了一些代码来制作一个简单的webCrawler.我试着看看我是否可以重用代码.我下载了RX,并创建了一个F#windows应用程序,添加了对System.Reactive的引用.我的IDE是VS 2010 Ultimate,RX版本是:1.1.11111.以下是代码:
#light
open System
open System.Linq
open System.Collections.Generic
open System.Net
open System.IO
open System.Threading
open System.Text.RegularExpressions
open System.Reactive
open System.Reactive.Linq
let create f =
Observable.Create<_>(fun x ->
f x
new System.Action((fun () -> ())))
let ofAsync async =
create (fun obs -> Async.StartWithContinuations(async, obs.OnNext,obs.OnError,obs.OnError))
let fromEvent (event:IEvent<_,_>) = create (fun x -> event.Add x.OnNext)
let tickEvent = new Event<unit> ()
let tickEventObs = tickEvent.Publish |> fromEvent
let fetch(url:string) =
async { let req = WebRequest.Create(url)
let! resp …Run Code Online (Sandbox Code Playgroud) 我有一种感觉,这可能是一个非常简单的扩展方法,我已经错过但我看不到它...
我基本上想要一个产生流的流,其中值在每个新值上缓慢递增.我希望节流/采样,而不是时间,而是"容忍".例如
var ob = Enumerable.Range(0, 30).ToObservable(); // 0, 1, 2, 3, 4, 5,....., 30
var largeMovingOb = ob.WhenChangedBy(10); // 0, 10, 20, 30
Run Code Online (Sandbox Code Playgroud)
当我有[1,4,20,33]等序列时,我希望在值变化超过最后一个的15时输出 - 这将导致:[1,20].如果价值变化为12将导致:[1,20,33]
这是否有内置的Rx扩展?理想情况下,它可以在所有数字类型上工作,而无需为每个类型编写重载
嗨Subject<T>,如果您手动调用其Dispose方法,我一直在考虑基于它处理所有订阅.但是我最近发现它不起作用,只是清除它的内部观察者集合并用DisposedObserver帮助器类实例替换它.
我发现自己对行为有点困惑,只是假设"正常"只会传播并处置所有的嫌疑人.后来,试图找出为什么这样设计,我猜想他们设计这种方式的原因有两个.
Subject.Dispose它在语义上等同于Observable.Never观察者一侧的延续.如果想要在处理之前发出错误或完成信号,则Subject.Dispose调用者也可以调用OnComplete或OnError(因为它们在同一范围内).编辑注意:对不明问题抱歉.我已经明白了如何使用它,这更像是一个设计问题.让我更清楚地说明一下.
为什么你认为Rx的设计者以这种方式制作Dispose行为?
(以上两点是我的回答试验)
我最近进入了Rx,我正在使用它来帮助我从数据挖掘应用程序中的几个API中提取数据.
我有一个为每个API实现的接口,它封装了对每个API的公共调用,例如
public interface IMyApi {
IObservable<string> GetApiName(); //Cold feed for getting the API's name.
IObservable<int> GetNumberFeed(); //Hot feed of numbers from the API
}
Run Code Online (Sandbox Code Playgroud)
我的问题是关于冷IObservables vs Tasks.在我看来,冷可观察基本上是一项任务,它们的运作方式大致相同.当你可以争辩说任务就是你所需要的时候,把一个任务"抽象"为一个冷酷的观察者,这让我感到很奇怪.同时使用cold observable来包装Tasks会隐藏活动的性质,因为签名看起来与热的observable相同.
我可以代表上述界面的另一种方式是:
public interface IMyApi {
Task<string> GetApiNameAsync(); //Async method for getting the API's name.
IObservable<int> GetNumberFeed(); //Hot feed of numbers from the API
}
Run Code Online (Sandbox Code Playgroud)
对于为什么我不应该在Tasks和IObservable之间混合和匹配,有一些传统的智慧吗?
编辑:澄清 - 我已经阅读了其他发布的讨论并理解了Rx和TPL之间的关系,但我的担忧主要在于将两者结合在一个应用程序中是否安全,以及它是否会导致不良做法或线程化和调度陷阱?
我正在尝试编写一个典型的股票交易程序,它从netmq接收股票代码/订单/交易,将流转换为IObservable,并在WPF前端显示它们.我尝试使用async/await与NetMQ阻塞ReceiveString(假设我期待一些字符串输入),以便ReceiveString循环不会阻止主(UI)线程.由于我还是C#的新手,我在这篇文章中接受了Dave Sexton的回答:(https://social.msdn.microsoft.com/Forums/en-US/b0cf96b0-d23e-4461-9d2b-ca989be678dc/where -is-iasyncenumens-the-the -stestst-release?forum = rx)并试图写一些这样的例子:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive;
using System.Reactive.Linq;
namespace App1
{
class MainClass
{
// publisher for testing, should be an external data publisher in real environment
public static Thread StartPublisher(PublisherSocket s)
{
s.Bind("inproc://test");
var thr = new Thread(() => {
Console.WriteLine("Start publishing...");
while (true) {
Thread.Sleep(500);
s.Send("hello");
}
});
thr.Start();
return thr;
}
public static IObservable<string> Receive(SubscriberSocket s)
{
s.Connect("inproc://test");
s.Subscribe("");
return Observable.Create<string>( …Run Code Online (Sandbox Code Playgroud) 我有一个项目,我需要每10秒发送一次状态消息,除非在此期间有更新.意思是,每次有更新时,计时器都会重置.
var res = Observable
.Interval(TimeSpan.FromSeconds(10))
.Where(_ => condition);
res.Subscribe(_ => Console.WriteLine("Status sent."));
Run Code Online (Sandbox Code Playgroud)
现在我知道"Where"只会在计时器结束时应用,所以它没有帮助.但是,我想知道是否有办法重置间隔; 或者使用带有重复的Timer().
在整个Rx.Net文献中,都提到了通常所知的可观察温度.
有冷可观察对象(如由Observable.Interval()类似的工厂方法创建的那些),每次创建新的订阅时都会产生副作用.
在频谱的另一方面,有一些热门的观察者(比如Subject<T>)会随着新的订阅而来.
还有一些温暖的可观察对象,就像RefCount()每次创建一个订阅时返回的那些将执行初始化,但仅在没有其他有效订阅时才会执行.这些温暖观测的行为解释这里由Dave塞克斯顿:
或者,您可以调用Publish然后调用RefCount来获取在多个连续观察者之间共享的IObservable.请注意,这不是一个真正的热门观察 - 它更像是一个温暖的可观察者.RefCount对底层observable进行单个订阅,而至少有一个查询观察者.当您的查询没有更多观察者时,将引用计数更改为0,将处理基础订阅.如果另一个观察者稍后订阅了您的查询,再次将引用计数从0移动到1,则RefCount会对基础observable进行新的订阅,从而导致订阅副作用再次发生.
是否有人应该注意的其他温度?是否有可能以编程方式获得Observable的温度?