标签: system.reactive

如何创建一个 Observable Timer 来调用一个方法,并且如果该方法正在运行直到完成则在取消时阻塞?

我的要求:

  1. 按指定的时间间隔运行方法 DoWork。
  2. 如果在调用 DoWork 之间调用 stop,则只需停止计时器即可。
  3. 如果在 DoWork 运行时调用 stop,则阻塞直到 DoWork 完成。
  4. 如果调用 stop 后 DoWork 花费太长时间才能完成,则超时。

我有一个到目前为止似乎有效的解决方案,但我对此不太满意,并且认为我可能错过了一些东西。以下是我的测试应用程序中的 void Main:

var source = new CancellationTokenSource();

// Create an observable sequence for the Cancel event.
var cancelObservable = Observable.Create<Int64>(o =>
{
    source.Token.Register(() =>
    {
        Console.WriteLine("Start on canceled handler.");
        o.OnNext(1);
        Console.WriteLine("End on canceled handler.");
    });

    return Disposable.Empty;
});

var observable =
    // Create observable timer.
    Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default)
        // Merge with the cancel observable so we have a composite that 
        // …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

3
推荐指数
1
解决办法
7268
查看次数

等待 Rx 观察者完成而不使用锁

我有一个应用程序与多个观察者一起运行可观察的间隔。每隔 0.5 秒,该时间间隔就会从 Web 服务器加载一些 XML 数据,然后观察者在后台线程上执行一些特定于应用程序的处理。一旦不再需要数据,订阅和可观察间隔就会被释放,因此观察者的 OnNext/OnCompleted/OnError 将不再被调用。到目前为止,一切都很好。

我的问题:在某些罕见的情况下,调用 Dispose 后我的观察者的 OnNext 方法可能仍在运行!在处理后进行进一步操作之前,我想确保OnNext已经完成。

我当前的解决方案:我在观察者类中引入了一个储物柜字段(请参阅代码)。处置后,我尝试获取锁,并仅在获取锁后才继续。虽然这个解决方案有效(?),但在某种程度上我感觉不对。

问题:有没有更优雅、更“Rx Way”来解决这个问题?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RxExperimental
{
    internal sealed class MyXmlDataFromWeb
    {
        public string SomeXmlDataFromWeb { get; set; }
    }

    internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
    {
        private readonly object _locker = new object();
        private readonly string _observerName;

        public MyObserver(string observerName) {
            this._observerName = observerName;
        }

        public object Locker {
            get …
Run Code Online (Sandbox Code Playgroud)

c# multithreading system.reactive

3
推荐指数
1
解决办法
3628
查看次数

RxJs:拖放示例:添加 mousedragstart

如何mousedragstart使用拖放 RxJs 的示例实现Observable 。

mousedragstart应该在第一个mousedrag之后的第一个mousedown之前发出,直到mouseup.

我认为我们必须玩flatMap/ take(1)/takeUntil(mouseup)但我每次都失败了..


更新

这里的困难不是避免mousedrag被发射之前mousedragstart

javascript system.reactive rxjs

3
推荐指数
1
解决办法
2807
查看次数

他们是否删除了 Rx 3.0 中的 Interval 和 Timer 运算符?

我在 Rx 3.0 中找不到Interval运算符和Timer运算符。这些被删除了吗?

我在 Visual Studio 2015 的控制台应用程序中安装了 System.Reactive NuGet 包,现在我看不到它们。请看下面的图片。

在此处输入图片说明

直到一周前,当我使用 NuGet 包管理器来获取 Rx-Main 时,它们才出现。这让我得到了程序集的 v2.2.5。

但是,现在当我在 NuGet 中搜索 Rx-Main 时,它没有出现在结果中。我推断这是因为 6 月 16 日宣布 Rx 成为 .NET 基金会的一部分,并且新的 NuGet 包被称为 System.Reactive。

这些搬家了吗?

.net system.reactive

3
推荐指数
1
解决办法
87
查看次数

Observable.Create:CancellationToken 不会转换为 IsCancellationRequested

拿这个小脚本(在 LINQPad 中设计,但应该在任何地方运行):

void Main()
{
    Task.Run(() => Worker()).Wait();
}

async Task Worker()
{
    if (SynchronizationContext.Current != null)
        throw new InvalidOperationException("Don't want any synchronization!");

    BaseClass provider = new Implementation();
    Func<IObserver<TimeSpan>, CancellationToken, Task> subscribeAsync =
        provider.CreateValues;
    var observable = Observable.Create(subscribeAsync);

    var cancellation = new CancellationTokenSource(5500).Token; // gets cancelled after 5.5s
    cancellation.Register(() => Console.WriteLine("token is cancelled now"));
    await observable
        .Do(ts =>
        {
            Console.WriteLine("Elapsed: {0}; cancelled: {1}",
                ts,
                cancellation.IsCancellationRequested);
            cancellation.ThrowIfCancellationRequested();
        })
        .ToTask(cancellation)
        .ConfigureAwait(false);
}

abstract class BaseClass
{
    // allow implementers to use …
Run Code Online (Sandbox Code Playgroud)

.net c# system.reactive

3
推荐指数
1
解决办法
1047
查看次数

在将 Rx 事件提供给订阅者之前和之后修改 C# 中的 Rx 事件

每次观察到事件时,我都需要对其进行预处理和后处理。这是我想出的东西,它有效:

var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
    subj.Finally(obs.OnCompleted); 
    return subj.Subscribe(e =>
    {
        try
        {
            Preprocess(e);
            obs.OnNext(e);
            Postprocess(e);
        }
        catch (Exception ex) { obs.OnError(ex); }
    });
});
Run Code Online (Sandbox Code Playgroud)

我的问题:这是正确的方法,还是有更好的模板/扩展方法?

c# system.reactive

3
推荐指数
1
解决办法
191
查看次数

从非标准事件创建 Observable(无 EventArgs / EventHandler)

我想为如下定义的事件创建一个 Observable:

public event Func<Exception, Task> Closed;
Run Code Online (Sandbox Code Playgroud)

我目前的代码是这样的:

Observable.FromEvent<Func<Exception, Task>, Unit>(h => hub.Closed += h, h=> hub.Closed -= h); 
Run Code Online (Sandbox Code Playgroud)

它编译正常,但它抛出这个运行时异常:

System.ArgumentException: '无法绑定到目标方法,因为其签名或安全透明度与委托类型不兼容。'

我觉得我做错了。我不习惯从不遵循 EventArgs 模式的事件中创建 observable

编辑:为了澄清起见,这是经典事件处理的完整代码:

class Program
{
    static async Task Main(string[] args)
    {
        var hub = new HubConnectionBuilder().WithUrl("http://localhost:49791/hubs/status")
            .Build();

        hub.On<Status>("SendAction", status => Console.WriteLine($"Altitude: {status.Altitude:F} m"));
        await hub.StartAsync();

        hub.Closed += HubOnClosed;

        while (true)
        {
        }
    }

    private static Task HubOnClosed(Exception arg)
    {
        Console.WriteLine("The connection to the hub has been closed");
        return Task.CompletedTask;
    }
}
Run Code Online (Sandbox Code Playgroud)

.net c# events system.reactive observable

3
推荐指数
1
解决办法
457
查看次数

如何在 Rx.NET 中延长 Observable Timer 的持续时间?

在 Rx.NET 中使用 C#。有没有办法延长Observable.Timer启动后的持续时间?有没有办法用Join(...)Expand(...)?我不喜欢处理旧计时器,也不喜欢创建新计时器。

示例:初始持续时间为 5 分钟。但是在计时器完成前 1 分钟,我喜欢将其重置为 5 分钟,再次测量 5 分钟。

var _timerDisposable = Observable.Timer(duration)
.ObserveOn(Scheduler.Default)
.Subscribe((_) => DoSomething());
Run Code Online (Sandbox Code Playgroud)

c# system.reactive rx.net

3
推荐指数
1
解决办法
939
查看次数

时间:2019-05-16 标签:c#Observable subject hot andcold

我经常遇到这样的情况:我想要一个 UI 元素“观察”一个潜在的值——假设我正在显示一个 int——我想要一个我可以订阅的 IObservable。

我一直在下面使用一个主题,所以我可以设置它。这真的很好用……除了第一次。如果我设置了主题 - 然后稍后通过打开一个新的 UI 元素订阅它 - 它不会触发 onnext 直到下一次更改。

我基本上想要一些像主题一样工作的东西 - 但总是立即对任何新订阅者执行最新值的 onNext 。

我知道我可以自己编写这样的构造 - 但它似乎是一个常见的用例 - 是否有我遗漏的标准?

c# system.reactive

3
推荐指数
1
解决办法
416
查看次数

计算实时并发流的数量

我有一个流。我想知道将其转换为当前许多内部流的实时计数,即未完成或出错。

我将如何实施 CountLiveStreams?

var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();
Run Code Online (Sandbox Code Playgroud)

谢谢!

c# system.reactive

3
推荐指数
1
解决办法
72
查看次数