我的要求:
我有一个到目前为止似乎有效的解决方案,但我对此不太满意,并且认为我可能错过了一些东西。以下是我的测试应用程序中的 void Main:
var source = new CancellationTokenSource();
// Create an observable sequence for the Cancel event.
var cancelObservable = Observable.Create<Int64>(o =>
{
source.Token.Register(() =>
{
Console.WriteLine("Start on canceled handler.");
o.OnNext(1);
Console.WriteLine("End on canceled handler.");
});
return Disposable.Empty;
});
var observable =
// Create observable timer.
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default)
// Merge with the cancel observable so we have a composite that
// …Run Code Online (Sandbox Code Playgroud) 我有一个应用程序与多个观察者一起运行可观察的间隔。每隔 0.5 秒,该时间间隔就会从 Web 服务器加载一些 XML 数据,然后观察者在后台线程上执行一些特定于应用程序的处理。一旦不再需要数据,订阅和可观察间隔就会被释放,因此观察者的 OnNext/OnCompleted/OnError 将不再被调用。到目前为止,一切都很好。
我的问题:在某些罕见的情况下,调用 Dispose 后我的观察者的 OnNext 方法可能仍在运行!在处理后进行进一步操作之前,我想确保OnNext已经完成。
我当前的解决方案:我在观察者类中引入了一个储物柜字段(请参阅代码)。处置后,我尝试获取锁,并仅在获取锁后才继续。虽然这个解决方案有效(?),但在某种程度上我感觉不对。
问题:有没有更优雅、更“Rx Way”来解决这个问题?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxExperimental
{
internal sealed class MyXmlDataFromWeb
{
public string SomeXmlDataFromWeb { get; set; }
}
internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
{
private readonly object _locker = new object();
private readonly string _observerName;
public MyObserver(string observerName) {
this._observerName = observerName;
}
public object Locker {
get …Run Code Online (Sandbox Code Playgroud) 如何mousedragstart使用拖放 RxJs 的示例实现Observable 。
mousedragstart应该在第一个mousedrag之后的第一个mousedown之前发出,直到mouseup.
我认为我们必须玩flatMap/ take(1)/takeUntil(mouseup)但我每次都失败了..
更新
这里的困难不是避免mousedrag被发射之前mousedragstart
我在 Rx 3.0 中找不到Interval运算符和Timer运算符。这些被删除了吗?
我在 Visual Studio 2015 的控制台应用程序中安装了 System.Reactive NuGet 包,现在我看不到它们。请看下面的图片。
直到一周前,当我使用 NuGet 包管理器来获取 Rx-Main 时,它们才出现。这让我得到了程序集的 v2.2.5。
但是,现在当我在 NuGet 中搜索 Rx-Main 时,它没有出现在结果中。我推断这是因为 6 月 16 日宣布 Rx 成为 .NET 基金会的一部分,并且新的 NuGet 包被称为 System.Reactive。
这些搬家了吗?
拿这个小脚本(在 LINQPad 中设计,但应该在任何地方运行):
void Main()
{
Task.Run(() => Worker()).Wait();
}
async Task Worker()
{
if (SynchronizationContext.Current != null)
throw new InvalidOperationException("Don't want any synchronization!");
BaseClass provider = new Implementation();
Func<IObserver<TimeSpan>, CancellationToken, Task> subscribeAsync =
provider.CreateValues;
var observable = Observable.Create(subscribeAsync);
var cancellation = new CancellationTokenSource(5500).Token; // gets cancelled after 5.5s
cancellation.Register(() => Console.WriteLine("token is cancelled now"));
await observable
.Do(ts =>
{
Console.WriteLine("Elapsed: {0}; cancelled: {1}",
ts,
cancellation.IsCancellationRequested);
cancellation.ThrowIfCancellationRequested();
})
.ToTask(cancellation)
.ConfigureAwait(false);
}
abstract class BaseClass
{
// allow implementers to use …Run Code Online (Sandbox Code Playgroud) 每次观察到事件时,我都需要对其进行预处理和后处理。这是我想出的东西,它有效:
var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
subj.Finally(obs.OnCompleted);
return subj.Subscribe(e =>
{
try
{
Preprocess(e);
obs.OnNext(e);
Postprocess(e);
}
catch (Exception ex) { obs.OnError(ex); }
});
});
Run Code Online (Sandbox Code Playgroud)
我的问题:这是正确的方法,还是有更好的模板/扩展方法?
我想为如下定义的事件创建一个 Observable:
public event Func<Exception, Task> Closed;
Run Code Online (Sandbox Code Playgroud)
我目前的代码是这样的:
Observable.FromEvent<Func<Exception, Task>, Unit>(h => hub.Closed += h, h=> hub.Closed -= h);
Run Code Online (Sandbox Code Playgroud)
它编译正常,但它抛出这个运行时异常:
System.ArgumentException: '无法绑定到目标方法,因为其签名或安全透明度与委托类型不兼容。'
我觉得我做错了。我不习惯从不遵循 EventArgs 模式的事件中创建 observable
编辑:为了澄清起见,这是经典事件处理的完整代码:
class Program
{
static async Task Main(string[] args)
{
var hub = new HubConnectionBuilder().WithUrl("http://localhost:49791/hubs/status")
.Build();
hub.On<Status>("SendAction", status => Console.WriteLine($"Altitude: {status.Altitude:F} m"));
await hub.StartAsync();
hub.Closed += HubOnClosed;
while (true)
{
}
}
private static Task HubOnClosed(Exception arg)
{
Console.WriteLine("The connection to the hub has been closed");
return Task.CompletedTask;
}
}
Run Code Online (Sandbox Code Playgroud) 在 Rx.NET 中使用 C#。有没有办法延长Observable.Timer启动后的持续时间?有没有办法用Join(...)或Expand(...)?我不喜欢处理旧计时器,也不喜欢创建新计时器。
示例:初始持续时间为 5 分钟。但是在计时器完成前 1 分钟,我喜欢将其重置为 5 分钟,再次测量 5 分钟。
var _timerDisposable = Observable.Timer(duration)
.ObserveOn(Scheduler.Default)
.Subscribe((_) => DoSomething());
Run Code Online (Sandbox Code Playgroud) 我经常遇到这样的情况:我想要一个 UI 元素“观察”一个潜在的值——假设我正在显示一个 int——我想要一个我可以订阅的 IObservable。
我一直在下面使用一个主题,所以我可以设置它。这真的很好用……除了第一次。如果我设置了主题 - 然后稍后通过打开一个新的 UI 元素订阅它 - 它不会触发 onnext 直到下一次更改。
我基本上想要一些像主题一样工作的东西 - 但总是立即对任何新订阅者执行最新值的 onNext 。
我知道我可以自己编写这样的构造 - 但它似乎是一个常见的用例 - 是否有我遗漏的标准?
我有一个流。我想知道将其转换为当前许多内部流的实时计数,即未完成或出错。
我将如何实施 CountLiveStreams?
var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();
Run Code Online (Sandbox Code Playgroud)
谢谢!