标签: system.reactive

加入Rx Streams

我正在尝试建模一个不简单的Rx查询(对我而言):

  • 在一个房间里有男人和女人.
  • 他们进入和离开房间,而在房间里他们有时会改变他们的位置.
  • 每个男人都可以在特定的时间看一个(或零)女人.
  • 每个人都有以下属性:

    class Man
    {
      public const int LookingAtNobody = 0;
      public int Id { get; set; }
      public double Location { get; set; }
      public int LookingAt { get; set; }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 每个女人都有以下特性:

    class Woman
    {
      public int Id { get; set; }
      public double Location { get; set; }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 代表我们拥有的男人IObservable<IObservable<Man>>,代表我们拥有的女人IObservable<IObservable<Woman>>.

你如何使用Rx生成从男人到他们正在看的女人的载体:IObservable<IObservable<Tuple<double,double>>>

为了帮助,这里有一些针对一些简单情况的单元测试:

public class Tests : ReactiveTest
{
    [Test]
    public void Puzzle1()
    {
        var scheduler = new TestScheduler(); …
Run Code Online (Sandbox Code Playgroud)

c# linq system.reactive

25
推荐指数
1
解决办法
1681
查看次数

RX受试者 - 他们应该避免吗?

我在另一个主题对这个主题进行了一次小型讨论,并希望人们对主题的"坏"方面提出意见.

频繁参与RX论坛的人都知道E.Meijer不喜欢受试者.虽然我对RX创作者的观点有最深刻的敬意,但我已经在多个项目中广泛使用了Subjects几年,并且因为它们没有任何架构问题或错误.

我可以命名的主题的唯一"陷阱"是它们不是"可重用的" - 在完成主题上的可观察对象之后,您需要在新订阅者可以从其接收事件之前重新实例化它.

"代码味道"和"不喜欢它们"需要"实用"示例支持 - 您是否可以引起我们注意使用主题可能导致错误或问题的可能情况?或许你认为它们完全是容易和无害的 - 然后尝试定义它们将被使用的区域.

system.reactive

24
推荐指数
2
解决办法
3980
查看次数

错误无法在订阅IObservable <Point>时转换lambda表达式

我正在尝试使用Rx在wpf中实现标准的拖放图像.

var mouseDown = from evt in Observable.FromEventPattern<MouseButtonEventArgs>(image, "MouseLeftButtonDown")
                          select evt.EventArgs.GetPosition(image);

            var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseLeftButtonUp");

            var mouseMove = from evt in Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
                            select evt.EventArgs.GetPosition(this);

            var q = from startLocation in mouseDown
                    from endLocation in mouseMove.TakeUntil(mouseUp)
                    select new Point 
                    {
                        X = endLocation.X - startLocation.X,
                        Y = endLocation.Y - startLocation.Y
                    };

            q.ObserveOn(SynchronizationContext.Current).Subscribe(point =>
            {
                Canvas.SetLeft(image, point.X);
                Canvas.SetTop(image, point.Y);
            });
Run Code Online (Sandbox Code Playgroud)

我收到错误错误 Cannot convert lambda expression to type 'System.IObserver<System.Windows.Point>' because it is not a delegate type

我错过了什么?

wpf lambda system.reactive

24
推荐指数
2
解决办法
6564
查看次数

RX vs消息队列如rabbitmq或zeromq?

我对这些高级并发范例很陌生,我已经开始使用scala RX绑定了.所以我试图理解RX与RabbitMQ或ZeroMQ等消息队列的区别?

他们似乎都使用订阅/发布范例.在某个地方,我看到一条关于RX在RabbitMQ上运行的推文.

有人可以解释RX和消息队列之间的差异吗?为什么我会选择一个而不是另一个?可以用一个替代另一个,还是互相排斥?它们在哪些区域重叠?

rabbitmq zeromq reactive-programming system.reactive

24
推荐指数
2
解决办法
9810
查看次数

带有TimeSpan选择器的Observable.Generate似乎泄漏内存[使用TimeSpan> 15ms时]

我正在研究使用Observable.Generate来创建一系列结果,这些结果是以msdn网站上的示例为起点间隔采样的.

以下代码没有TimeSpan选择器不会出现内存泄漏:

IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

但是,以下代码与TimeSpan选择器显示内存泄漏:

TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString(),
                                              timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

例如,这个玩具应用程序将使用VS 2015社区附带的Memory Profiler快速显示内存泄漏:

using System;
using System.Reactive.Linq;

namespace Sample
{
    public class Program
    {
        static void Main()
        {
            IObservable<string> obs = …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

24
推荐指数
1
解决办法
409
查看次数

Rx的真实世界的例子

可能重复:
使用Reactive Extensions的好例子

我一直在玩Reactive Extension一段时间,但主要限于处理/编写WPF前端内的用户驱动事件.

这是一种强大的,新的异步编程方式,我很好奇其他人正在做什么,你认为它可以改善我们目前做事的方式?

.net c# asynchronous system.reactive

23
推荐指数
2
解决办法
8961
查看次数

反应式扩展是否支持滚动缓冲?

我正在使用反应式扩展将数据整理到100毫秒的缓冲区:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);
Run Code Online (Sandbox Code Playgroud)

这很好用.但是,我想要的行为与Buffer操作提供的行为略有不同.基本上,如果收到另一个数据项,我想重置计时器.只有当整个100毫秒没有收到数据时我才能处理它.这开启了永不处理数据的可能性,因此我还应该能够指定最大计数.我会想象一下:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
Run Code Online (Sandbox Code Playgroud)

我已经环顾四周,在Rx中找不到这样的东西?任何人都可以确认/否认这个吗?

.net c# buffer sliding-window system.reactive

23
推荐指数
2
解决办法
4132
查看次数

写一个Rx"RetryAfter"扩展方法

IntroToRx书中,作者建议为I/O编写一个"智能"重试,在一段时间后重试I/O请求,如网络请求.

这是确切的段落:

添加到您自己的库的有用扩展方法可能是"Back Off and Retry"方法.我与之合作过的团队在执行I/O时发现了这样一个特性,尤其是网络请求.这个概念是尝试,并在失败时等待一段时间,然后再试一次.您的此方法版本可能会考虑您要重试的异常类型,以及重试的最大次数.您甚至可能希望延长等待时间,以便在每次后续重试时不那么激进.

不幸的是,我无法弄清楚如何编写这种方法.:(

c# system.reactive

22
推荐指数
2
解决办法
4808
查看次数

创建对IObservable的弱订阅

我想要做的是确保如果对我的观察者的唯一引用是可观察的,它会被垃圾收集并停止接收消息.

假设我有一个带有列表框的控件,名为Messages,后面是这个代码:

//Short lived display of messages (only while the user's viewing incoming messages)
public partial class MessageDisplay : UserControl
{
    public MessageDisplay()
    {
        InitializeComponent();
        MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
    }
}
Run Code Online (Sandbox Code Playgroud)

哪个连接到此来源:

//Long lived location for message store
static class MySource
{
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}
Run Code Online (Sandbox Code Playgroud)

我不想要的是让消息显示器在不再可见后很长时间内保存在内存中.理想情况下,我想要一点延伸,所以我可以写:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));
Run Code Online (Sandbox Code Playgroud)

我也不想依赖MessageDisplay是一个用户控件的事实,因为我稍后想要使用MessageDisplayViewModel进行MVVM设置,这不是用户控件.

c# garbage-collection weak-references system.reactive

21
推荐指数
2
解决办法
3508
查看次数

等待观察

因此,在C#4.0的悲伤时期,我创建了以下"WorkflowExecutor"类,它允许在GUI线程中通过入侵IEnumerable的"yield return"延续来等待可观察的异步工作流.因此,以下代码将在button1Click处启动一个简单的工作流程来更新文本,等待您单击button2,并在1秒后循环.

public sealed partial class Form1 : Form {
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();

    public Form1() {
        InitializeComponent();
    }

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
        Text = "Initializing";
        var scheduler = new ControlScheduler(this);
        while (true) {
            yield return scheduler.WaitTimer(1000);
            Text = "Waiting for Click";
            yield return _button2Subject;
            Text = "Click Detected!";
            yield return scheduler.WaitTimer(1000);
            Text = "Restarting";
        }
    }

    void button1_Click(object sender, EventArgs e) {
        _workflowExecutor.Run(CreateAsyncHandler());
    }

    void button2_Click(object sender, EventArgs e) {
        _button2Subject.OnNext(Unit.Default);
    } …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive async-await c#-5.0 .net-4.5

21
推荐指数
2
解决办法
2万
查看次数