我有一个IObservable; 其中属性更改具有实体ID和PropertyName.我想用它来更新数据库,但是如果多个属性几乎同时发生变化,我只想对同一个实体的所有属性进行一次更新.
如果这是一个静态IEnumerable并且我使用LINQ我可以简单地使用:
MyList.GroupBy(C=>C.EntityID);
Run Code Online (Sandbox Code Playgroud)
但是,列表永远不会终止(从不调用IObserver.OnComplete).我想要做的是等待一段时间,比如1秒钟,将所有呼叫分组适当的一秒钟.
理想情况下,我会为每个EntityID设置单独的计数器,只要找到该EntityID的新属性更改,它们就会重置.
我不能使用像Throttle这样的东西,因为我想处理所有的属性更改,我只是想一次性处理它们.
我是Rx的新手.我想知道是否可以向不同的订阅者发送消息,以便它们在不同的线程上运行?IObserable如何控制呢?简单的Subject实现,据我所知,它在一个线程上一个接一个地调用订阅者.
public class Subsciber : IObserver<int>
{
public void OnNext(int a)
{
// Do something
}
public void OnError(Exception e)
{
// Do something
}
public void OnCompeleted()
{
}
}
public static class Program
{
public void static Main()
{
var observable = new <....SomeClass....>();
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
// some waiting function
}
}
Run Code Online (Sandbox Code Playgroud)
如果我使用Subject作为'SomeClass',那么在sub1的OnNext()完成之前,不会调用sub2的OnNext().如果sub1花了很多时间,我不希望它延迟sub2的接收.有人能告诉我Rx如何为SomeClass提供这种实现.
试图了解如何Subject<T>,ReplaySubject<T>和其他工作.这是一个例子:
(主题是观察者和观察者)
public IObservable<int> CreateObservable()
{
Subject<int> subj = new Subject<int>(); // case 1
ReplaySubject<int> subj = new ReplaySubject<int>(); // case 2
Random rnd = new Random();
int maxValue = rnd.Next(20);
Trace.TraceInformation("Max value is: " + maxValue.ToString());
subj.OnNext(-1); // specific value
for(int iCounter = 0; iCounter < maxValue; iCounter++)
{
Trace.TraceInformation("Value: " + iCounter.ToString() + " is about to publish");
subj.OnNext(iCounter);
}
Trace.TraceInformation("Publish complete");
subj.OnComplete();
return subj;
}
public void Main()
{
//
// …Run Code Online (Sandbox Code Playgroud) 我有以下代码:
Observable.FromEvent<ModelEventArgs>(
h => ValuesController.ModelAdded += h,
h => ValuesController.ModelAdded -= h)
.Subscribe(m => context.Connection.Broadcast(m));
Observable.FromEvent<ModelEventArgs>(
h => ValuesController.ModelDeleted += h,
h => ValuesController.ModelDeleted -= h)
.Subscribe(m => context.Connection.Broadcast(m));
Run Code Online (Sandbox Code Playgroud)
什么会更清洁:
Observable.[SOMETHING](
Observable.FromEvent<ModelEventArgs>(
h => ValuesController.ModelAdded += h,
h => ValuesController.ModelAdded -= h),
Observable.FromEvent<ModelEventArgs>(
h => ValuesController.ModelDeleted += h,
h => ValuesController.ModelDeleted -= h))
.Subscribe(m => context.Connection.Broadcast(m));
Run Code Online (Sandbox Code Playgroud)
我似乎无法弄清楚[SOMETHING]需要什么,我不想等待ModelAdded完成,我只是希望它们都为每个事件发出(m).
对我来说很容易,我是一个Rx newb
是否可以为响应式扩展实现基于自定义硬件定时器的调度程序?我怎么开始,有什么好的例子吗?
我有一个硬件可以每毫秒发送一个准确的中断.我想利用它来创建更精确的RX调度程序.
UPDATE
感谢Asti回答的关键字,我发现了这篇博文,这让我发现我可以实现VirtualTimeScheduler <TAbsolute,TRelative>,因为我的硬件设备为我提供了绝对的时间戳.
Observable.FromAsyncPattern可用于从BeginX EndX样式的异步方法中创建一个observable.
也许我误解了一些事情,但是有一个类似的功能来从新的异步样式方法创建一个observable - 即.. Stream.ReadAsync?
我正在尝试用C#实现一个消费者.有许多发布者可以同时执行.我创建了三个示例,一个使用Rx和subject,一个使用BlockingCollection,第三个使用BlockingCollection中的ToObservable.在这个简单的例子中,它们都做同样的事情,我希望它们与多个生产者一起工作.
每种方法有哪些不同的特质?
我已经在使用Rx,所以我更喜欢这种方法.但我担心OnNext没有线程安全保证,我不知道排队语义是什么主题和默认调度程序.
有线程安全主题吗?
是否要处理所有消息?
当这不起作用时还有其他任何情况吗?是同时处理?
void SubjectOnDefaultScheduler()
{
var observable = new Subject<long>();
observable.
ObserveOn(Scheduler.Default).
Subscribe(i => { DoWork(i); });
observable.OnNext(1);
observable.OnNext(2);
observable.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)
不是Rx,但很容易适应使用/订阅它.它需要一个项目然后处理它.这应该是连续发生的.
void BlockingCollectionAndConsumingTask()
{
var blockingCollection = new BlockingCollection<long>();
var taskFactory = new TaskFactory();
taskFactory.StartNew(() =>
{
foreach (var i in blockingCollection.GetConsumingEnumerable())
{
DoWork(i);
}
});
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud)
使用阻塞集合有点像主题似乎是一个很好的妥协.我猜是隐式地会安排到任务,所以我可以使用async/await,这是正确的吗?
void BlockingCollectionToObservable()
{
var blockingCollection = new BlockingCollection<long>();
blockingCollection.
GetConsumingEnumerable().
ToObservable(Scheduler.Default).
Subscribe(i => { DoWork(i); });
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud) c# subject task-parallel-library system.reactive blockingcollection
所以我有一个BehaviorSubject<string>我希望用于房产的.我一直在使用阻塞First()方法来获取BehaviorSubject属性getter中保持的当前值.
该First()操作现在已经过时,那么实现这一目标的新方法是什么?
我正在尝试将observable分隔成窗口(或者为了我的目的,Buffers也很好),同时能够关闭自定义位置的窗口/缓冲区.
例如,我有一个observable,它从1开始生成整数并向上移动.我想在每个可被7整除的数字处关闭一个窗口.在这种情况下,我的结束函数需要将该项作为参数.
方法有一个重载Window:
Window<TSource, TWindowClosing>(IObservable<TSource>, Func<IObservable<TWindowClosing>>)
Run Code Online (Sandbox Code Playgroud)
要么不能使用这个重载,要么我无法绕过它.文档描述它完全符合我的要求,但没有显示示例.此外,它还显示了非确定性结束的示例,它取决于关闭可观察集合发出项目时的时间.
Window运算符将可观察序列分解为连续的非重叠窗口.当前窗口的结束和下一个窗口的开始由可观察序列控制,该序列是windowClosingSelect函数的结果,该函数作为输入参数传递给操作符.运算符可用于将一组事件分组到窗口中.例如,交易的状态可以是被观察的主要序列.这些州可能包括:准备,准备,活动和承诺/中止.主序列可以包括它们按顺序出现的所有状态.windowClosingSelect函数可以返回一个可观察的序列,该序列仅在Committed或Abort状态下生成一个值.这将关闭表示特定事务的事务事件的窗口.
我认为像下面这样的人会做这个工作,但我必须自己实施:
Window<TSource, TWindowClosing>(IObservable<TSource>, Func<TSource, bool>)
Run Code Online (Sandbox Code Playgroud)
我有IObservable一个库提供,它从外部服务侦听事件:
let startObservable () : IObservable<'a> = failwith "Given"
Run Code Online (Sandbox Code Playgroud)
对于每个收到的事件,我想执行一个返回的动作Async:
let action (item: 'a) : Async<unit> = failwith "Given"
Run Code Online (Sandbox Code Playgroud)
我正在尝试实现一个处理器
let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable
Run Code Online (Sandbox Code Playgroud)
我已经弥补mapAsync并且AwaitObservable:理想情况下它们将由一些图书馆提供,到目前为止我找不到它.
额外要求:
应该按顺序执行操作,以便在处理上一个事件时缓冲后续事件.
如果某个操作引发错误,我希望我的处理器完成.否则,它永远不会完成.
Async.Start应该尊重通过的取消令牌.
关于我应该使用的图书馆的任何提示?