请注意以下单元测试:
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace UnitTests
{
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var tcs = …Run Code Online (Sandbox Code Playgroud) 有没有办法在序列完成之前将聚合函数(Max、Count、....)与 Buffer 一起使用。完成后这将产生结果,但继续流它不会给出任何结果?
我期待有什么方法可以使这个工作与缓冲区一起工作?
IObservable<long> source;
IObservable<IGroupedObservable<long, long>> group = source
.Buffer(TimeSpan.FromSeconds(5))
.GroupBy(i => i % 3);
IObservable<long> sub = group.SelectMany(grp => grp.Max());
sub.Subscribe(l =>
{
Console.WriteLine("working");
});
Run Code Online (Sandbox Code Playgroud) 我想要一个简单的演示工作.
我有一个字符串集合,我想看它添加,而不使用任何控制事件代码.不知怎的,我得到的印象,或许是错误的,Rx或.Net的另一部分支持这一点,而不是诉诸所有各种事件(可能会或可能不会)将成员添加到集合中.
如果我source用一个间隔替换我,就像在注释掉的代码中一样,委托被调用(ala,.var source = Observable.Interval(TimeSpan.FromSeconds(1));这让我希望我可以做我想做的事,也许是错误的.
基本示例来自创建和订阅简单可观察序列
在下面的代码中,我想要做的是source直接观察集合(而不是通过控制事件),并在项目添加到集合时调用委托.
我宁愿不绕过LINQ或RX,如果它们确实支持我的论文,他们支持这个功能.
using Microsoft.VisualBasic;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Collections.ObjectModel;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
public class frmRx
{
ObservableCollection<string> ObservableCollection = new ObservableCollection<string>();
Dim source = Observable.ToObservable(ObservableCollection).ObserveOn(SynchronizationContext.Current)
//this code worked, but it's not a collection I made
//source = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => x.ToString).ObserveOn(SynchronizationContext.Current);
private void frmRx_Load(System.Object sender, System.EventArgs e)
{
IConnectableObservable<string> Publication = Observable.Publish<string>(source);
Publication.Subscribe(x => { AddToTreeView(x); }, …Run Code Online (Sandbox Code Playgroud) 我想创建一个Observable,每秒都会持续推送一个值列表t.
例如,鉴于{1,2,3,4}订阅者应该得到这个:
1,2,3,4,1,2,3,4,1,2,3,4,1,2 ......
class Program
{
static void Main()
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(3))
.Zip(Observable.Range(1, 4)
.Repeat(), (_, count) => count);
observable.Subscribe(Console.WriteLine);
Console.WriteLine("Finished!");
}
}
Run Code Online (Sandbox Code Playgroud)
我已经研究过这个例子,它似乎有效,但是有一个非常讨厌的问题:Main方法永远不会结束它的执行!为什么?:(
更糟糕的是,几分钟后,这个控制台应用程序抛出一个OutOfMemoryException!
在我开始写一个之前,我认为值得一问:RX 有一个油门扩展方法,如果事件发生得太快,它会丢弃事件。
因此,如果您要求它将事件限制为每 5 秒 1 个,如果您在 0.1 秒后收到一个事件,然后在 1 秒后收到第二个事件,您将收到一个事件,然后是静音。
我想要的是它在 0.1 秒后引发第一个事件,但在 4.9 秒后引发另一个事件。
此外,如果我在 0.1、1 和 2 秒收到事件,我希望它在 0.1 秒、5 秒时引发事件,然后什么都不发生,所以我不希望它捕获 n 个事件并且每个周期只释放一个 n 个周期.
Buffer 做相反的事情,因为它将所有内容保存 5 秒,然后引发事件,所以既不是油门也不是缓冲区,而是介于两者之间的东西。
有没有办法用现有的框架来做到这一点,还是我需要写一个?
我有以下虚拟视图模型:
public class DummyViewModel : ReactiveObject
{
internal DummyViewModel()
{
ItemChanged.Subscribe(_ => Console.WriteLine());
}
public IObservable<string> ItemChanged
{
get { return this.WhenAnyValue(x => x.Item).Select(s => s); }
}
private string _item;
public string Item
{
get { return _item; }
set { this.RaiseAndSetIfChanged(ref _item, value); }
}
}
Run Code Online (Sandbox Code Playgroud)
当我创建这个类的一个新实例时,observable 立即在订阅时触发,返回 null(没有任何东西绑定到 Item)。这在我更高级的视图模型中导致了一个问题,我有多个 observable 需要以不同的方式链接在一起。我一直在使用 Skip 和 StartWith 的组合,但它变得非常复杂。有人可以建议为什么会发生这种情况以及我是否应该考虑采用不同的方法?
关于 ReactiveUi 的另一个问题。我有一个用于编辑表单的 ViewModel。模型是 ReactiveObject。我只想在对象发生更改时启用 savecommand。我的尝试:
var canSaveCommand =
this.WhenAnyValue(vm => vm.CurrentClient)
.Where(client => client != null)
.Select(client =>
client.Changed
)
.Any();
Run Code Online (Sandbox Code Playgroud)
但是当表单出现时SaveCommand已经启用了。我的错在哪里?
我想使用 Rx 缓冲区功能:
var source = new Subject<Price>();
var buffer = source
.Buffer(TimeSpan.FromSeconds(30), 5)
.Where(p => p.Any());
Run Code Online (Sandbox Code Playgroud)
这意味着当缓冲区达到 5 或 30 秒后,自上次发出以来已经过去时,就会发出(发布给订阅者)。
但是我需要能够按需发送 - 例如当我收到高优先级序列项目时。然后我想将它添加到 observable ( source.OnNext()) 并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。
我知道我可以添加以下代码:
var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);
Run Code Online (Sandbox Code Playgroud)
并调用flusher.OnNext(highPriorityItem),我会发出它。
但在这种情况下,我有两个独立的序列,有两个不同的发射。当缓冲区已满或特定项目按顺序出现时,我需要发出一次。
Force flush count-type Observable.Buffer c#和Force flush to Observable.Buffer c#似乎不适合我
我有这个可观察的:
public class NetworkToolEngine
{
public NetworkToolEngine()
{
this.connectionAvailable = Observable
.Interval(TimeSpan.FromSeconds(5))
.Select(_ => true);
}
}
static class Program
{
static void Main()
{
NetWorkToolEngine networkToolEngine = new NetworkToolEngine();
this.networkToolEngine.ConnectionAvailable
.Do(_ => this.ConnectionAvailable())
.Catch(ex => //<<<<<1>>>>>
{
this.ConnectionUnavailable();
return Observable.Empty<bool>();
});
}
Run Code Online (Sandbox Code Playgroud)
目前; 我在以下位置收到此编译错误<<<<<1>>>>>:
无法将 lambda 表达式转换为类型“IObservable”,因为它不是委托类型
有任何想法吗?
预计都在 .NET Core 2.0 控制台应用程序的主线程上执行,所以输出被阻塞了 10 秒:
static void Main(string[] args)
{
WriteLine($"We are on {Thread.CurrentThread.ManagedThreadId}");
var subject = new Subject<long>();
var subscription = subject.Subscribe(
i => WriteLine($"tick on {Thread.CurrentThread.ManagedThreadId}"));
var timer = Observable.Interval(TimeSpan.FromSeconds(1))
.SubscribeOn(Scheduler.CurrentThread)
.Subscribe(i => subject.OnNext(i));
Thread.Sleep(10000);
}
Run Code Online (Sandbox Code Playgroud)
但事实并非如此——随机线程每隔一秒就会有一条新行进入控制台:
Run Code Online (Sandbox Code Playgroud)We are on 1 tick on 4 tick on 5 tick on 4 tick on 4 tick on 4 tick on 4 tick on 4 tick on 4 tick on 5
我做错了什么?
system.reactive ×10
c# ×8
.net ×2
reactiveui ×2
.net-core ×1
asynchronous ×1
linq ×1
mvvm ×1
observable ×1
rx.net ×1