标签: system.reactive

反应式框架Hello World

这是一个介绍Reactive Framework的简单程序.但我想通过修改程序来尝试错误处理程序:

var cookiePieces = Observable.Range(1, 10);
cookiePieces.Subscribe(x =>
   {
      Console.WriteLine("{0}! {0} pieces of cookie!", x);
      throw new Exception();  // newly added by myself
   },
      ex => Console.WriteLine("the exception message..."),
      () => Console.WriteLine("Ah! Ah! Ah! Ah!"));
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)

在此示例中,使用了以下过载.

public static IDisposable Subscribe<TSource>(
     this IObservable<TSource> source, 
     Action<TSource> onNext, 
     Action<Exception> onError, 
     Action onCompleted);
Run Code Online (Sandbox Code Playgroud)

我希望我会看到打印的异常消息,但控制台应用程序崩溃了.是什么原因?

.net c# reactive-programming system.reactive

3
推荐指数
1
解决办法
883
查看次数

为此提供RX简单示例帮助

我搜索了一个样本,但找不到清楚解释如何使用RX设置的东西:我有这个要求......

  1. 在WPF应用程序中,我有一个列表框
  2. 调度程序计时器例程每隔2秒向本地列表添加一些随机数
  3. 现在我想设置一个observable/observer来观察这个List <int>,因为它不断构建,并将最新添加的数字添加到列表框的items集合中.

听起来非常简单,我在后台线程中完成了第三位(没有RX,但在list <int>上有标准查找)并且很容易添加到列表框中.当没有背景工作者等尝试做同样的事情并且只使用RX时,我被卡住了.

为可能的愚蠢问题道歉(对于那里的RX专家),但请帮助我们如何使用RX完成此WPF.

谢谢.

c# system.reactive

3
推荐指数
1
解决办法
1598
查看次数

在c#中使用Rx Reactive扩展的linqpad错误

我究竟做错了什么 ?我刚下载了最新的Rx sdk,已安装.使用vs 2010,.net 4具有所有最新的sp/updates等.下载/安装了linqpad,添加了对被动dll的引用,如附带的屏幕截图所示.添加了一行,如linqpad演示中所示,但在运行时出错.请指教.右键单击图像并查看图像以获得清晰视图.

谢谢

在此输入图像描述

c# linq linqpad system.reactive

3
推荐指数
1
解决办法
2356
查看次数

字段,构造函数或成员'AsyncReadToEnd'未定义错误

我正在学习RX(Reactive Extensions),并且发现有人在近一年前使用F#和RX发布了一些代码来制作一个简单的webCrawler.我试着看看我是否可以重用代码.我下载了RX,并创建了一个F#windows应用程序,添加了对System.Reactive的引用.我的IDE是VS 2010 Ultimate,RX版本是:1.1.11111.以下是代码:

#light
open System
open System.Linq
open System.Collections.Generic
open System.Net
open System.IO
open System.Threading
open System.Text.RegularExpressions
open System.Reactive
open System.Reactive.Linq

let create f =
    Observable.Create<_>(fun x ->
        f x
        new System.Action((fun () -> ())))

let ofAsync async =
    create (fun obs -> Async.StartWithContinuations(async, obs.OnNext,obs.OnError,obs.OnError))

let fromEvent (event:IEvent<_,_>) = create (fun x -> event.Add x.OnNext)

let tickEvent = new Event<unit> ()
let tickEventObs = tickEvent.Publish |> fromEvent

let fetch(url:string) =
    async { let req = WebRequest.Create(url)
            let! resp …
Run Code Online (Sandbox Code Playgroud)

f# system.reactive

3
推荐指数
1
解决办法
1132
查看次数

只有当值已经改变了一定的余量时,Rx IObservable才会产生值

我有一种感觉,这可能是一个非常简单的扩展方法,我已经错过但我看不到它...

我基本上想要一个产生流的流,其中值在每个新值上缓慢递增.我希望节流/采样,而不是时间,而是"容忍".例如

var ob = Enumerable.Range(0, 30).ToObservable(); // 0, 1, 2, 3, 4, 5,....., 30
var largeMovingOb = ob.WhenChangedBy(10); // 0, 10, 20, 30
Run Code Online (Sandbox Code Playgroud)

当我有[1,4,20,33]等序列时,我希望在值变化超过最后一个的15时输出 - 这将导致:[1,20].如果价值变化为12将导致:[1,20,33]

这是否有内置的Rx扩展?理想情况下,它可以在所有数字类型上工作,而无需为每个类型编写重载

.net c# system.reactive c#-4.0

3
推荐指数
1
解决办法
1724
查看次数

为什么Subject <T> .Dispose不会处理当前订阅?

Subject<T>,如果您手动调用其Dispose方法,我一直在考虑基于它处理所有订阅.但是我最近发现它不起作用,只是清除它的内部观察者集合并用DisposedObserver帮助器类实例替换它.

我发现自己对行为有点困惑,只是假设"正常"只会传播并处置所有的嫌疑人.后来,试图找出为什么这样设计,我猜想他们设计这种方式的原因有两个.

  • 怀疑者可能是部分依赖于主题的组合物,因此处置的完全传播没有意义.即.合并并不是因为其中一个消息来源被处理掉,正如大家所期望的那样.
  • Subject.Dispose它在语义上等同于Observable.Never观察者一侧的延续.如果想要在处理之前发出错误或完成信号,则Subject.Dispose调用者也可以调用OnComplete或OnError(因为它们在同一范围内).

编辑注意:对不明问题抱歉.我已经明白了如何使用它,这更像是一个设计问题.让我更清楚地说明一下.

为什么你认为Rx的设计者以这种方式制作Dispose行为?

(以上两点是我的回答试验)

c# reactive-programming system.reactive

3
推荐指数
1
解决办法
1031
查看次数

在我的C#应用​​程序中结合使用Task和IObservable是不好的做法?

我最近进入了Rx,我正在使用它来帮助我从数据挖掘应用程序中的几个API中提取数据.

我有一个为每个API实现的接口,它封装了对每个API的公共调用,例如

public interface IMyApi {

    IObservable<string> GetApiName(); //Cold feed for getting the API's name.

    IObservable<int> GetNumberFeed(); //Hot feed of numbers from the API

}
Run Code Online (Sandbox Code Playgroud)

我的问题是关于冷IObservables vs Tasks.在我看来,冷可观察基本上是一项任务,它们的运作方式大致相同.当你可以争辩说任务就是你所需要的时候,把一个任务"抽象"为一个冷酷的观察者,这让我感到很奇怪.同时使用cold observable来包装Tasks会隐藏活动的性质,因为签名看起来与热的observable相同.

我可以代表上述界面的另一种方式是:

public interface IMyApi {

    Task<string> GetApiNameAsync(); //Async method for getting the API's name.

    IObservable<int> GetNumberFeed(); //Hot feed of numbers from the API

}
Run Code Online (Sandbox Code Playgroud)

对于为什么我不应该在Tasks和IObservable之间混合和匹配,有一些传统的智慧吗?

编辑:澄清 - 我已经阅读了其他发布的讨论并理解了Rx和TPL之间的关系,但我的担忧主要在于将两者结合在一个应用程序中是否安全,以及它是否会导致不良做法或线程化和调度陷阱?

c# task-parallel-library system.reactive

3
推荐指数
1
解决办法
903
查看次数

使用NetMQ接收IObservable

我正在尝试编写一个典型的股票交易程序,它从netmq接收股票代码/订单/交易,将流转换为IObservable,并在WPF前端显示它们.我尝试使用async/await与NetMQ阻塞ReceiveString(假设我期待一些字符串输入),以便ReceiveString循环不会阻止主(UI)线程.由于我还是C#的新手,我在这篇文章中接受了Dave Sexton的回答:(https://social.msdn.microsoft.com/Forums/en-US/b0cf96b0-d23e-4461-9d2b-ca989be678dc/where -is-iasyncenumens-the-the -stestst-release?forum = rx)并试图写一些这样的例子:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive;
using System.Reactive.Linq;

namespace App1
{
    class MainClass
    {
        // publisher for testing, should be an external data publisher in real environment
        public static Thread StartPublisher(PublisherSocket s)
        {
            s.Bind("inproc://test");
            var thr = new Thread(() => {
                Console.WriteLine("Start publishing...");
                while (true) {
                    Thread.Sleep(500);
                    s.Send("hello");
                }
            });
            thr.Start();
            return thr;
        }

        public static IObservable<string> Receive(SubscriberSocket s)
        {
            s.Connect("inproc://test");
            s.Subscribe("");
            return Observable.Create<string>( …
Run Code Online (Sandbox Code Playgroud)

c# zeromq system.reactive async-await netmq

3
推荐指数
1
解决办法
1189
查看次数

无功扩展定时器/间隔复位

我有一个项目,我需要每10秒发送一次状态消息,除非在此期间有更新.意思是,每次有更新时,计时器都会重置.

var res = Observable
  .Interval(TimeSpan.FromSeconds(10))
  .Where(_ => condition);

res.Subscribe(_ => Console.WriteLine("Status sent."));
Run Code Online (Sandbox Code Playgroud)

现在我知道"Where"只会在计时器结束时应用,所以它没有帮助.但是,我想知道是否有办法重置间隔; 或者使用带有重复的Timer().

c# system.reactive

3
推荐指数
1
解决办法
1674
查看次数

Rx Observable有多少"温度"?

在整个Rx.Net文献中,都提到了通常所知的可观察温度.

冷可观察对象(如由Observable.Interval()类似的工厂方法创建的那些),每次创建新的订阅时都会产生副作用.

在频谱的另一方面,有一些热门的观察者(比如Subject<T>)会随着新的订阅而来.

还有一些温暖的可观察对象,就像RefCount()每次创建一个订阅时返回的那些将执行初始化,但仅在没有其他有效订阅时才会执行.这些温暖观测的行为解释这里由Dave塞克斯顿:

或者,您可以调用Publish然后调用RefCount来获取在多个连续观察者之间共享的IObservable.请注意,这不是一个真正的热门观察 - 它更像是一个温暖的可观察者.RefCount对底层observable进行单个订阅,而至少有一个查询观察者.当您的查询没有更多观察者时,将引用计数更改为0,将处理基础订阅.如果另一个观察者稍后订阅了您的查询,再次将引用计数从0移动到1,则RefCount会对基础observable进行新的订阅,从而导致订阅副作用再次发生.

是否有人应该注意的其他温度?是否有可能以编程方式获得Observable的温度?

system.reactive rxjs

3
推荐指数
1
解决办法
127
查看次数