我有一个主题,我在其中订阅在游戏中发生确定事件时应该调用的方法。
public Subject<SomeEvent> TestSubject = new Subject<SomeEvent>();
Run Code Online (Sandbox Code Playgroud)
某些实例订阅该主题。
TestSubject.Subscribe(MyMethod);
Run Code Online (Sandbox Code Playgroud)
我的目标是计算有多少方法已订阅该主题。我见过一些使用 Count() 扩展的示例,但我需要一个 int 作为返回值,这样我就可以在其他地方使用它,并且 Count() 返回一个 IObservable。
if (subjectCount > 0)
{
DoSomething();
}
Run Code Online (Sandbox Code Playgroud)
有没有什么方法可以获取某个主题的订阅数量,或者我是否需要手动跟踪它们(有一个 public int subjectSubcriptions 并在每次订阅方法时添加 1)?
c# unity-game-engine reactive-programming system.reactive observer-pattern
我有两个简单的观察处理程序,它们订阅了同一源。然而,这两种订阅都以不同的类型运行。我希望他们保持可观察源(Subject())的顺序。我尝试使用 Synchronize() 扩展,但没有找到按预期方式完成这项工作的方法。
这是我的单元测试代码:
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var are = new AutoResetEvent(false);
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000 / o; // just to simulate longer processing
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Console.WriteLine("Handled {1} on threadId: {0}", …Run Code Online (Sandbox Code Playgroud) 似乎没有FromEvent或方法的重载,仅在有多个参数且没有 的情况下FromEventPattern才会转换为具有该IObservable<T>类型的事件。TDelegateEventArgs
例如,我们似乎无法将以下内容转换为可观察量。
public event Action<int, int> SomethingHappened;
public event Func<string, int> SomethingElseHappened;
Run Code Online (Sandbox Code Playgroud)
我们要么必须EventArgs在某个地方有一个,要么TDelegate在其签名中只有一个参数。因此,以下内容是可转换的,因为它们有一个EventArgs隐式委托。
public event NameChangedHandler NameChanged;
public event EventHandler RollNumberChanged;
public event EventHandler<AgeChangedEventArgs> AgeChanged;
public delegate void NameChangedHandler(
object sender,
NameChangedEventArgs e);
Run Code Online (Sandbox Code Playgroud)
这个也可以转换,因为T它的参数只有一个。
public event Action<Tuple<string, string>> ClassChanged;
Run Code Online (Sandbox Code Playgroud)
如果我遇到这样的事件我该怎么办:
public event Action<T1, T2...> ItHappened;
Run Code Online (Sandbox Code Playgroud) 我有 2 个事件流: 1. 鼠标拖放事件流(拖动开始...拖动结束...拖动开始...拖动结束) 2.按键事件流('a' ... ' b' .... 'c' .... 'd')
我需要合并到一个仅包含第二个流中的事件(因此仅包含按键)的流中,但它需要过滤掉拖动开始和拖动结束之间发生的所有按键(最后一个除外)。
所以如果来源是这样的:
... Start ............... End .............. Start .............. End
Run Code Online (Sandbox Code Playgroud)
和
...........'a'...'b'...'c'.......'d'...'e'..........'f'....'g'.......
Run Code Online (Sandbox Code Playgroud)
结果应该是这样的:
...........................'c'...'d'...'e'..........................'g'
Run Code Online (Sandbox Code Playgroud)
在 C# 中使用 Rx.net 可以实现类似的功能吗?
我在Scheduler.Default和之间做出选择非常困难TaskPoolScheduler.Default。
我已经看到它表明 TaskPoolScheduler 更高效/优化,并且它当然具有更明确/具体的好处;然而,这并不能帮助我理解真正的差异,因为从功能上来说它们似乎做同样的事情。
什么时候Scheduler.Default更可取TaskPoolScheduler.Default,反之亦然?
我想创建一个可用于表示动态计算值的类,另一个表示值的类可以是这些动态计算值的源(主题).目标是当主题发生变化时,计算出的值会自动更新.
在我看来,使用IObservable/IObserver是可行的方法.不幸的是我无法使用Reactive Extensions库,因此我不得不从头开始实现主题/观察者模式.
够了blabla,这是我的课程:
public class Notifier<T> : IObservable<T>
{
public Notifier();
public IDisposable Subscribe(IObserver<T> observer);
public void Subscribe(Action<T> action);
public void Notify(T subject);
public void EndTransmission();
}
public class Observer<T> : IObserver<T>, IDisposable
{
public Observer(Action<T> action);
public void Subscribe(Notifier<T> tracker);
public void Unsubscribe();
public void OnCompleted();
public void OnError(Exception error);
public void OnNext(T value);
public void Dispose();
}
public class ObservableValue<T> : Notifier<T>
{
public T Get();
public void Set(T x);
}
public class ComputedValue<T>
{
public T …Run Code Online (Sandbox Code Playgroud) 我的问题很简单.经过大量谷歌搜索后,我了解到我可以在.NET 3.5项目中使用ConcurrentDictionary,使用其安装目录中的Reactive Extensions和System.Threading.dll版本.首先,没有System.Threading.dll,Reactive Extensions .NET 3.5子目录中只有System.Reactive.Windows.Threading.添加对System.Reactive或System.Reactive.Windows.Threading的引用或者从提到的.NET 3.5中的任何其他引用都没有给我ConcurrentDictionary类,也没有给我System.Collections.Concurrent命名空间.我已经下载了旧版本的Reactive Extensions SDK,我找到了我一直在寻找的东西,但我的问题是:有没有人知道Reactive Extensions的实际版本中ConcurrentDictionary backport发生了什么,有人知道它在哪里或为什么它是失踪.我根本无法找到合理的答案或任何答案.
我使用.Schedule(DateTimeOffset,Action>)的东西使用RX调度程序类.基本上我有一个可以再次安排自己的预定动作.
码:
public SomeObject(IScheduler sch, Action variableAmountofTime)
{
this.sch = sch;
sch.Schedule(GetNextTime(), (Action<DateTimeOffset> runAgain =>
{
//Something that takes an unknown variable amount of time.
variableAmountofTime();
runAgain(GetNextTime());
});
}
public DateTimeOffset GetNextTime()
{
//Return some time offset based on scheduler's
//current time which is irregular based on other inputs that i have left out.
return this.sch.now.AddMinutes(1);
}
Run Code Online (Sandbox Code Playgroud)
我的问题是关于模拟变量AmountofTime可能采用的时间量,并测试我的代码按预期行为,并且仅触发按预期调用它.
我已经尝试推进测试调度程序在代理中的时间,但这不起作用.我编写的代码示例不起作用.假设GetNextTime()只是安排一分钟.
[Test]
public void TestCallsAppropriateNumberOfTimes()
{
var sch = new TestScheduler();
var timesCalled = 0;
var variableAmountOfTime = () =>
{
sch.AdvanceBy(TimeSpan.FromMinutes(3).Ticks); …Run Code Online (Sandbox Code Playgroud) 我试图在Windows服务中使用Rx扩展,但我被卡住了.我找到的样本不起作用.我将用一些代码解释我想要实现的目标.我正在创建的第一个类实际上是完成所有工作的类:
class Worker : IDisposable {
public Worker() {
}
private void Run() {
}
public void Dispose() {
}
}
Run Code Online (Sandbox Code Playgroud)
我想在OnStart中创建一个实例并将其杀死OnStop:
public partial class MyService : ServiceBase {
private Worker _Worker;
public MyService () {
InitializeComponent ();
}
protected override void OnStart ( string[] args ) {
_Worker = new Worker();
}
protected override void OnStop () {
_Worker.Dispose();
_Worker = null;
}
}
Run Code Online (Sandbox Code Playgroud)
所以我在Worker的构造函数中执行此操作:
/* _TimesEvents and _Events are class level variables */
_TimedEvents = Observable.Timer ( TimeSpan.FromSeconds …Run Code Online (Sandbox Code Playgroud) 我想做什么的声明如下:
// Checks input source for timeouts, based on the number of elements received
// from clock since the last one received from source.
// The two selectors are used to generate output elements.
public static IObservable<R> TimeoutDetector<T1,T2,R>(
this IObservable<T1> source,
IObservable<T2> clock,
int countForTimeout,
Func<R> timedOutSelector,
Func<T1, R> okSelector)
Run Code Online (Sandbox Code Playgroud)
ascii中的大理石图很难,但是这里有:
source --o---o--o-o----o-------------------o---
clock ----x---x---x---x---x---x---x---x---x---
output --^---^--^-^----^-----------!-------^---
Run Code Online (Sandbox Code Playgroud)
我已经尝试寻找Observable可以组合source并clock以我可以使用的方式的现有函数,但是大多数组合函数依赖于接收"每个"中的一个(And,Zip),或者它们重新返回"之前"的值. "丢失"一个(CombineLatest),或他们是从我需要什么(只是太远Amb,GroupJoin,Join,Merge,SelectMany …