标签: system.reactive

使用 RxJS 处理流结束的惯用方法

流结束时我需要执行一些操作。这样做的惯用方法是什么?

现在我使用下面的代码:

source.subscribe(undefined, undefined, function() {
  socket.send({type: 'end'});
});
Run Code Online (Sandbox Code Playgroud)

reactive-programming system.reactive rxjs

2
推荐指数
1
解决办法
1716
查看次数

避免绑定中的转换器

需要否定来自 ViewModel 的值的情况非常常见。我们最终使用了一个类似所谓的“ InverseBoolConverter ”的转换器。

我的问题是:是否有一种方便的方法可以避免使用不涉及更改 ViewModel 的转换器?

注意:我正在使用 ReactiveUI

.net c# wpf system.reactive reactiveui

2
推荐指数
1
解决办法
654
查看次数

如何对事件总线使用反应式扩展

我很难弄清楚如何使用反应式扩展在 C# 中创建事件总线,而无需使用主题类,据我所知,不建议这样做。

大多数 IEvent 是我自己的,但有些像鼠标和键盘事件将由 WPF 提供。

我更喜欢将事件发布到事件总线的想法,而不是使用 Observable.FromEventPattern 随处使用事件处理程序,因为其中一些事件有时仅由订阅者记录而不执行。

这是显示我正在尝试做的事情的片段。

 public interface IEvent { } // marker interface

 public class BarcodeReaderEvent : EventArgs, IEvent
 { }

 public class MouseEvent : EventArgs, IEvent
 { }

 public class MyEventBus
 {
     private static IObservable<IEvent> eventBus = ??

     public void Post<IEvent>(IEvent theEvent)
     {
       // What goes here? 
     }

     public IDisposable Subscribe()
     {
         return ??
     }
 }
Run Code Online (Sandbox Code Playgroud)

c# linq system.reactive

2
推荐指数
1
解决办法
2081
查看次数

无法取消订阅 Rx

背景

我正在编写一些执行以下操作的软件:

  1. 用户单击“开始”。
  2. 启动一个任务来执行一些工作并启动事件来更新 GUI。
  3. 可观察对象使用任务中的事件,并将数据打印到 GUI 中的富文本框。

当我第一次单击“开始”时,一切正常,但之后就不行了。第一次单击开始时,我得到如下输出:

单击开始一次

这看起来不错,没有什么问题。但是,当我第二次单击“开始”时,我得到以下输出。

再次点击开始

现在,我相信我知道为什么会发生这种情况。据我所知,从我第一次单击“开始”开始,我的观察者就从未取消订阅,因此所有内容都会打印两次。单击开始按钮时,会发生以下情况:

    /// <summary>
    /// Starts the test.
    /// </summary>
    /// <param name="sender">The "start" button.</param>
    /// <param name="e">Clicking on the "start" button.</param>
    private void button_go_Click(object sender, RoutedEventArgs e)
    {
        var uiContext = SynchronizationContext.Current;
        var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
            handler => (s, a) => handler(s, a),
            handler => this.myTest.Results += handler,
            handler => this.myTest.Results -= handler)
            .ObserveOn(uiContext)
            .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

        // start running the test
        this.runningTest = new Task(() => …
Run Code Online (Sandbox Code Playgroud)

c# task reactive-programming winforms system.reactive

2
推荐指数
1
解决办法
1181
查看次数

如何实现 IObservable

我希望能够将 aQueue作为 an使用Observable,但我不知道如何创建它。我希望它能够在有人调用Enqueue.

class Producer:IObservable<int>
{
    private object @lock = new object();
    private Queue<int> queue = new Queue<int>();
    List<IObserver<int>> observers = new List<IObserver<int>>();

    public Producer()
    {
    }
    public IObservable<int> ToObservable()
    {
        return ///
    }
    public bool Enqueue(int sample)
    {
        if (sample == null)
        {
            return false;
        }
        this.queue.Enqueue(sample);
        return true;
    }
    public int Dequeue()
    {
        if(!this.queue.TryDequeue(out Sample rez))
        {
            return 0;
        }
        return rez;
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
    }
}
Run Code Online (Sandbox Code Playgroud)

我可以与处理部分Subscriber …

c# system.reactive observable .net-core

2
推荐指数
1
解决办法
863
查看次数

ReactiveUI 测试中的调度程序

因此,当我为我的系统开发新功能时,我也尝试进行 TDD - 遗憾的是,现在为旧功能执行此操作的代码太大了。

然而,我发现有时我在测试过程中会碰壁——尤其是在使用Delay和时Throttle

我读了很多书,我想我比一周前知道了更多,但我想把所有这些付诸实践。我写了一些实验:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Reactive.Threading.Tasks;
    using System.Text;
    using System.Threading.Tasks;
    using Microsoft.Reactive.Testing;
    using NUnit.Framework;
    using NUnit.Framework.Internal.Commands;
    using ReactiveUI;
    using ReactiveUI.Testing;

    namespace UtilsTests
    {
        [TestFixture]
        public class SchedulersTests
        {
            private int SecondsN = 1;

            [Test]
            public async Task NoScheduler()
            {
                var t = Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), RxApp.MainThreadScheduler)
                    .ObserveOn(RxApp.MainThreadScheduler)
                    .ToTask();
                await t;
            }

            [Test]
            public Task ImmediateSchedulerExperiment()
            {
                return Scheduler.Immediate.With(async s =>
                {
                    var t = …
Run Code Online (Sandbox Code Playgroud)

c# unit-testing system.reactive reactiveui

2
推荐指数
1
解决办法
950
查看次数

RxJS:保留输入顺序的 MergeMap

要求:

urls = [url1, url2, url3]

并行触发所有 3 个 url 并在 urls 列表的序列中绘制 Dom

 ex: Finished order of urls = [url3, url1, url2]
     when url1 finishes Immediately render the DOM, without waiting for url2
     If url2, url3 finishes before url1, then store url2, url3 and paint the DOM after url1 arrives
     Paint the DOM with order [url1, url2, url3]
Run Code Online (Sandbox Code Playgroud)

我的工作使用承诺:

// Fired all 3 urls at the same time
p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

p1.then(updateDom)
  .then(() …
Run Code Online (Sandbox Code Playgroud)

system.reactive rxjs

2
推荐指数
1
解决办法
768
查看次数

在 F# 中一次强制执行一个 Async Observable

我有一系列可观察的事物需要映射到 C# 任务。这些 C# 任务不应该同时运行,而是一个接一个地运行。基本上,我需要实现这个 C# 问题的 F# 等价物:

一次强制执行一个异步可观察对象

天真地翻译这个 C# 代码会得到如下内容:

let run (idx:int) (delay:int) =
    async {
        sprintf "start: %i (%i)" idx delay |> System.Diagnostics.Trace.WriteLine
        let! t = System.Threading.Tasks.Task.Delay(delay) |> Async.AwaitTask
        sprintf "finish: %i" idx  |> System.Diagnostics.Trace.WriteLine
        t
    }

    Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
    |> Observable.take 20
    |> Observable.mapi(fun idx delay -> idx, delay)
    |> Observable.bind(fun (idx, delay) -> Observable.ofAsync (run idx delay))
    |> Observable.subscribe ignore
    |> ignore
Run Code Online (Sandbox Code Playgroud)

这不能按预期工作,因为我不会在任何地方等待结果。有没有办法在 …

f# system.reactive async-await

2
推荐指数
1
解决办法
232
查看次数

Reactive Extensions 库是否已停止使用?

浏览器中 Reactive Extensions 文档网站的屏幕截图

Microsoft 网站上大部分 Reactive Extensions 库的文档说“我们不再定期更新此内容。请查看 Microsoft 产品生命周期以了解有关如何支持此产品、服务、技术或 API 的信息。”

此外,在主文档站点中搜索一些基本的 Reactive Extensions 类名称(例如“主题”)不会返回任何匹配项。唯一的点击来自网站的“以前的版本”部分。

在链接的“产品生命周期”页面中没有提到 Reactive Extensions。

这是否意味着 Microsoft 不再支持 Reactive Extensions,或者我错过了什么?

.net c# system.reactive reactive

2
推荐指数
1
解决办法
653
查看次数

如何使用 ReactiveUI 和 DynamicData 将可变模型的 ObservableCollection&lt;T&gt; 绑定到视图模型的 ReadOnlyObservableCollection&lt;T&gt;

我在我的 C# 项目中使用 ReactiveUI 和 DynamicData。但是,域模型类仍然依赖于 C# 事件、INotifyPropertyChanged 和 INotifyCollectionChanged 接口。

有 Model 和 ViewModel 类:

public class Model
{
    public ObservableCollection<int> Collection { get; } = new ObservableCollection<int>();
}

public class ViewModel : ReactiveObject, IDisposable
{
    private readonly CompositeDisposable _cleanUp;
    private readonly SourceList<int> _collectionForCurrentBModel = new SourceList<int>();
    private Model _model = new Model();
    private IDisposable _tempCleanUp = Disposable.Empty;

    public ViewModel()
    {
        _cleanUp = new CompositeDisposable();
        _collectionForCurrentBModel.Connect()
            .Bind(out var aModelsForCurrentBModel)
            .Subscribe(Console.WriteLine)
            .DisposeWith(_cleanUp);
        CollectionForCurrentBModel = aModelsForCurrentBModel;

        this.WhenAnyValue(x => x.Model.Collection) // Every …
Run Code Online (Sandbox Code Playgroud)

inotifycollectionchanged inotifypropertychanged system.reactive reactiveui

2
推荐指数
1
解决办法
989
查看次数