标签: system.reactive

如何修复 Publish().RefCount() 行为的不一致?

最近,我偶然发现Enigmativity 关于和运算符的一个有趣的声明PublishRefCount

您正在使用危险的 .Publish().RefCount() 运算符对,它会创建一个在完成后无法订阅的序列。

这一说法似乎与李·坎贝尔对这些经营者的评价相悖。引用他的书Intro to Rx

Publish/RefCount 对对于获取冷可观察值并将其作为热可观察序列共享给后续观察者非常有用。

起初我不相信Enigmativity的说法是正确的,所以我试图反驳它。我的实验表明,这Publish().RefCount()确实可能不一致。第二次订阅已发布的序列可能会导致对源序列的新订阅,也可能不会,具体取决于连接时源序列是否已完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。以下是此行为的演示:

var observable = Observable
    .Create<int>(o =>
    {
        o.OnNext(13);
        o.OnCompleted(); // Commenting this line alters the observed behavior
        return Disposable.Empty;
    })
    .Do(x => Console.WriteLine($"Producer generated: {x}"))
    .Finally(() => Console.WriteLine($"Producer finished"))
    .Publish()
    .RefCount()
    .Do(x => Console.WriteLine($"Consumer received #{x}"))
    .Finally(() => Console.WriteLine($"Consumer finished"));

observable.Subscribe().Dispose();
observable.Subscribe().Dispose();
Run Code Online (Sandbox Code Playgroud)

在此示例中,observable由三部分组成。首先是生成部分,生成单个值然后完成。然后是发布机制(Publish+ RefCount)。最后是消费部分,观察生产者发出的值。已observable订阅两次。预期的行为是每个订阅都会收到一个值。但事实并非如此!这是输出:

var observable = Observable
    .Create<int>(o =>
    {
        o.OnNext(13);
        o.OnCompleted(); // …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive rx.net

2
推荐指数
1
解决办法
1121
查看次数

使用Reactive Extensions创建可观察的鼠标拖动

我有以下内容

var leftMouseDown = Observable.FromEvent<MouseButtonEventArgs>(displayCanvas, "MouseLeftButtonDown");
var leftMouseUp = Observable.FromEvent<MouseButtonEventArgs>(displayCanvas, "MouseLeftButtonUp");
var mouseMove = Observable.FromEvent<MouseEventArgs>(displayCanvas, "MouseMove");

var leftMouseDragging = from down in leftMouseDown
                        let startPoint = down.EventArgs.GetPosition(displayCanvas)
                        from move in mouseMove.TakeUntil(leftMouseUp)
                        let endPoint = move.EventArgs.GetPosition(displayCanvas)
                        select new { Start = startPoint, End = endPoint };
Run Code Online (Sandbox Code Playgroud)

当我订阅它时,它会给我拖动的起点和当前的终点.现在我需要在拖动完成后做一些事情.我试图用RX完全做到这一点并没有成功,最终做到了

leftMouseDragging.Subscribe(value=>
    {
        dragging = true;
        //Some other code
    });

leftMouseUp.Subscribe(e=>
    {
        if(dragging)
        {
            MessageBox.Show("Just finished dragging");
            dragging = false;
        }
    });
Run Code Online (Sandbox Code Playgroud)

这工作正常,直到我拖动鼠标右键.然后,当我单击鼠标左键时,我会看到消息框.如果我只做一个左键拖动我得到消息框,然后单击鼠标左键不会产生该框.我想在没有外部状态的情况下这样做,但如果没有别的,我至少希望它能正常工作.

仅供参考:我尝试拖动易失性并使用锁定,但这不起作用.

编辑

事实证明我的问题是右键单击上下文菜单.一旦我摆脱了我的上述代码工作.所以,现在我的问题是如何获得上下文菜单并仍然使我的代码工作.我假设上下文菜单处理鼠标左键并且不知何故导致我的代码不起作用,但我仍然困惑不解.

c# system.reactive

1
推荐指数
1
解决办法
2175
查看次数

是.net中的扩展方法是Action <T>的反应扩展

我有两个采用Action回调的异步方法.我想知道Rx是否有针对行动的扩展?

我的目标是等到两个回调被调用然后进行一些处理?

action system.reactive

1
推荐指数
1
解决办法
272
查看次数

如何使用Rx通过异步WCF服务轮询图像

我有一个异步WCF服务,它采用"URI"并返回一个图像(作为流).

我想要做的是:

  • 如果没有创建,则确保存在有效的WCF通道
  • 进行异步服务调用
  • 成功时将图像保存到成员变量
  • 如果我收到异常,请关闭频道
  • 无论是失败还是成功,等待200ms然后重新开始(永久循环或直到取消)

到目前为止,我已经想出了这个怪物:

    private void PollImage(string imageUri)
    {
        const int pollingHertz = 1;
        const int millisecondsTimeout = 1000 / pollingHertz;
        Thread.Sleep(millisecondsTimeout);

        if (_channel == null)
        {
            _channel = _channelFactory.CreateChannel();
        }

        var getImageFunc = Observable.FromAsyncPattern<string, Stream>
                                  (_channel.BeginGetImage, _channel.EndGetImage);

        getImageFunc(imageUri)
            .Finally(() => PollImage(imageUri))
            .Subscribe(
                stream => UpdateImageStream(imageUri, stream),
                ex =>
                    {
                        Trace.TraceError(ex.ToString());
                        ((ICommunicationObject) _channel).CloseOrAbort();
                        _channel = null;
                    });
    }
Run Code Online (Sandbox Code Playgroud)

我真的很想学习Rx但是每次尝试都会让我不知所措.

有人愿意给我一些指示吗?谢谢

.net system.reactive

1
推荐指数
1
解决办法
800
查看次数

当鼠标不超过两个元素时,Wpf Reactive Extensions会做出反应

我正在尝试更多地了解反应性扩展,但我发现很难找到一个真实世界的例子,所以我可以训练自己.

我几天前发现自己写了一些ToggleButtonMouse Enter,Leave Checked Unchecked事件,现在我想知道我是否可以使用被动扩展来简化它.

这是目标:

给定a ToggleButton,当悬停并且未检查时,弹出窗口应该显示,如果鼠标没有超过按钮或弹出窗口,弹出窗口应该关闭

如果我按下切换按钮(选中),弹出窗口应保持打开状态,直到取消选中该按钮(忽略鼠标输入离开事件),之后鼠标悬停行为应再次启动.

如果弹出窗口外部关闭,则应自动取消选中切换按钮.(我知道这可以使用一些绑定和数据触发器实现,但我想练习我的反应式扩展逻辑)

现在我有以下内容:

    private void ToggleButton_MouseEnter(object sender, System.Windows.Input.MouseEventArgs e)
    {
        if (!ToggleButton.IsChecked ?? false)
            Popup.IsOpen = true;
    }

    private void ToggleButton_MouseLeave(object sender, System.Windows.Input.MouseEventArgs e)
    {
        if (!Popup.Child.IsMouseOver && !(TaskManagerTab.IsChecked ?? false))
        {
            Popup.IsOpen = false;
            return;
        }

        popup.Child.MouseLeave +=  Popup_MouseLeave;
    }

    void Popup_MouseLeave(object sender, System.Windows.Input.MouseEventArgs e)
    {
        Popup.Child.MouseLeave -=  Popup_MouseLeave;

        if (!ToggleButton.IsMouseOver && !(ToggleButton.IsChecked ?? false))
        {
            Popup.IsOpen = false;
            return;
        }
    }

    private void ToggleButton_CheckedChanged(object sender, System.Windows.RoutedEventArgs e) …
Run Code Online (Sandbox Code Playgroud)

wpf mouseevent system.reactive

1
推荐指数
1
解决办法
659
查看次数

订阅者缺少消息; 这是Rx的错误还是我做错了?

我有一个Window WPF应用程序与以下构造函数

        numbers = Observable.Generate(DateTime.Now,
                                         time => true,
                                         time => DateTime.Now,
                                         time => { return new     Random(DateTime.Now.Millisecond).NextDouble() * 99 + 2; },
                                         time => TimeSpan.FromSeconds(1.0));


        numbers.ObserveOnDispatcher()
            .Subscribe(s => list1.Items.Add(s.ToString("##.00")));

        numbers.Where(n => n < 10).ObserveOnDispatcher().
            Subscribe(s => list2.Items.Add(s.ToString("##.00")));
Run Code Online (Sandbox Code Playgroud)

现在这里是列表的屏幕截图 - 左侧列表中缺少通知3.76 ...此行为是间歇性的.

图片

c# wpf reactive-programming system.reactive

1
推荐指数
1
解决办法
153
查看次数

coroutine/reactive扩展 - 写行

我正在使用这两个函数来读写大文件(写入多个文件).我想将文件操作保留在函数中,因为这些行可能是从其他源读取/写入的.

更新: C#并没有真正的协程.它是Reactive扩展的一个很好的用例吗?

foreach (var line in ReadFrom("filename"))
{
    try 
    {
        .... // Some actions based on the line
        var l = ..... 
        WriteTo("generatedFile1", l);
    }
    catch (Exception e)
    {
        var l = ..... // get some data from line, e and other objects etc.
        WriteTo("generatedFile2", l);
    }
}
Run Code Online (Sandbox Code Playgroud)

以下函数打开文件一次,直到读取所有行,然后关闭并释放资源.

    private static IEnumerable<string> ReadFrom(string file)
    {
        string line;
        using (var reader = File.OpenText(file))
        {
            while ((line = reader.ReadLine()) != null)
                yield return line;
        }
    }
Run Code Online (Sandbox Code Playgroud)

但是,以下函数会写入行而不是读取行,为其写入的每一行打开和关闭文件.是否有可能以某种方式实现它,因此它只打开文件一次并继续写入文件,直到发送EOF为止?

    private static void WriteTo(string …
Run Code Online (Sandbox Code Playgroud)

.net c# coroutine system.reactive

1
推荐指数
1
解决办法
233
查看次数

如何在ReplaySubject下传递多个错误?

我怎样才能传递多个错误ReplaySubject

当我调用时,OnError只传递第一个异常.我需要多次打电话才能 errors/exceptions通过.

我看到内部RX创建了一个AnonymousSafeObserver并且OnError正在调用Dispose.

我可以创建自己的AnonymousSafeObserver一些版本如何更改功能?

非常感谢.

c# error-handling system.reactive

1
推荐指数
1
解决办法
161
查看次数

订阅在Reactive Extensions中结束时关闭非托管资源

我正在从Rx向网络写入数据.当然,我Finally在订阅结束时用来关闭我的流.这在干净的OnError()OnComplete().RX将运行OnNext() ... OnNext(),OnComplete(),Finally()按顺序排列.

但是,有时我想提前终止序列,我这样做是为了使用Dispose().不知怎的Finally(),现在与最后一次OnNext()调用并行运行,导致仍然写入流中的异常OnNext(),以及不完整的写入.

我的订阅看起来大致如下:

NetworkStream stm = client.GetStream();
IDisposable disp = obs
    .Finally(() => {
        client.Close();
    })
    .Subscribe(d => {
        client.GetStream().Write(d.a, 0, d.a.Lenght);
        client.GetStream().Write(d.b, 0, d.b.Lenght);
    } () => {
        client.GetStream().Write(something(), 0, 1);
    });
Thread.sleep(1000);
disp.Dispose();
Run Code Online (Sandbox Code Playgroud)

我也试过了替代方案CancellationToken.

如何正确取消订阅?我不介意它跳过OnComplete(),只要Finally()还在运行.但是,Finally()并行运行存在问题.

我也觉得应该有一个更好的方法来管理资源,通过将声明移动到序列中,这将是一个更好的解决方案.

编辑:以下代码更清楚地显示问题.我希望它总是打印出来,相反,它会经常给出错误,表示在最后一次之前Dispose结束OnNext.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

1
推荐指数
1
解决办法
279
查看次数

如何将回调转换为Rx.Observable?

如果外部库只提供注册回调而不是事件,那么创建Observable它的最佳方法是什么?

如果它是我可以使用的事件,Observable.FromEventPattern 但在这种情况下,我唯一的想法是Subject在每个回调中使用a 和队列事件.

有没有更好的方法来做到这一点?

system.reactive

1
推荐指数
1
解决办法
1173
查看次数