标签: system.reactive

统计某个主题的所有订阅数

我有一个主题,我在其中订阅在游戏中发生确定事件时应该调用的方法。

public Subject<SomeEvent> TestSubject = new Subject<SomeEvent>();
Run Code Online (Sandbox Code Playgroud)

某些实例订阅该主题。

TestSubject.Subscribe(MyMethod);
Run Code Online (Sandbox Code Playgroud)

我的目标是计算有多少方法已订阅该主题。我见过一些使用 Count() 扩展的示例,但我需要一个 int 作为返回值,这样我就可以在其他地方使用它,并且 Count() 返回一个 IObservable。

if (subjectCount > 0)
{
    DoSomething();
}
Run Code Online (Sandbox Code Playgroud)

有没有什么方法可以获取某个主题的订阅数量,或者我是否需要手动跟踪它们(有一个 public int subjectSubcriptions 并在每次订阅方法时添加 1)?

c# unity-game-engine reactive-programming system.reactive observer-pattern

3
推荐指数
1
解决办法
740
查看次数

如何同步 Observables 并卸载 UI 线程

我有两个简单的观察处理程序,它们订阅了同一源。然而,这两种订阅都以不同的类型运行。我希望他们保持可观察源(Subject())的顺序。我尝试使用 Synchronize() 扩展,但没有找到按预期方式完成这项工作的方法。

这是我的单元测试代码:

[Test]
public void TestObserveOn()
{
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = new Subject<object>();
    var are = new AutoResetEvent(false);

    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
        o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                int sleep = 3000 / o; // just to simulate longer processing
                Thread.Sleep(sleep);
                Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
            },
        () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
                    o =>
                    {
                        Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                        Console.WriteLine("Handled  {1} on threadId: {0}", …
Run Code Online (Sandbox Code Playgroud)

c# multithreading system.reactive

3
推荐指数
1
解决办法
927
查看次数

与委托有多个参数且没有 EventArgs 的事件集成

似乎没有FromEvent或方法的重载,仅在有多个参数且没有 的情况下FromEventPattern才会转换为具有该IObservable<T>类型的事件。TDelegateEventArgs

例如,我们似乎无法将以下内容转换为可观察量。

public event Action<int, int> SomethingHappened;

public event Func<string, int> SomethingElseHappened;
Run Code Online (Sandbox Code Playgroud)

我们要么必须EventArgs在某个地方有一个,要么TDelegate在其签名中只有一个参数。因此,以下内容是可转换的,因为它们有一个EventArgs隐式委托。

public event NameChangedHandler NameChanged;
public event EventHandler RollNumberChanged;
public event EventHandler<AgeChangedEventArgs> AgeChanged;


public delegate void NameChangedHandler(
                     object sender, 
                     NameChangedEventArgs e);
Run Code Online (Sandbox Code Playgroud)

这个也可以转换,因为T它的参数只有一个。

public event Action<Tuple<string, string>> ClassChanged;
Run Code Online (Sandbox Code Playgroud)

如果我遇到这样的事件我该怎么办:

public event Action<T1, T2...> ItHappened;
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

3
推荐指数
1
解决办法
740
查看次数

在 Rx.Net 中去抖直到

我有 2 个事件流: 1. 鼠标拖放事件流(拖动开始...拖动结束...拖动开始...拖动结束) 2.按键事件流('a' ... ' b' .... 'c' .... 'd')

我需要合并到一个仅包含第二个流中的事件(因此仅包含按键)的流中,但它需要过滤掉拖动开始和拖动结束之间发生的所有按键(最后一个除外)

所以如果来源是这样的:

... Start ............... End .............. Start .............. End
Run Code Online (Sandbox Code Playgroud)

...........'a'...'b'...'c'.......'d'...'e'..........'f'....'g'.......
Run Code Online (Sandbox Code Playgroud)

结果应该是这样的:

...........................'c'...'d'...'e'..........................'g'
Run Code Online (Sandbox Code Playgroud)

在 C# 中使用 Rx.net 可以实现类似的功能吗?

reactive-programming system.reactive rx.net

3
推荐指数
1
解决办法
1514
查看次数

System.Reactive 在 Scheduler.Default 和 TaskPoolScheduler.Default 之间进行选择

我在Scheduler.Default和之间做出选择非常困难TaskPoolScheduler.Default

我已经看到它表明 TaskPoolScheduler 更高效/优化,并且它当然具有更明确/具体的好处;然而,这并不能帮助我理解真正的差异,因为从功能上来说它们似乎做同样的事情。

什么时候Scheduler.Default更可取TaskPoolScheduler.Default,反之亦然?

system.reactive .net-core

3
推荐指数
1
解决办法
798
查看次数

使用IObserver/IObservable实现观察者和主题

我想创建一个可用于表示动态计算值的类,另一个表示值的类可以是这些动态计算值的源(主题).目标是当主题发生变化时,计算出的值会自动更新.

在我看来,使用IObservable/IObserver是可行的方法.不幸的是我无法使用Reactive Extensions库,因此我不得不从头开始实现主题/观察者模式.

够了blabla,这是我的课程:

public class Notifier<T> : IObservable<T>
{
    public Notifier();
    public IDisposable Subscribe(IObserver<T> observer);
    public void Subscribe(Action<T> action);
    public void Notify(T subject);
    public void EndTransmission();
}

public class Observer<T> : IObserver<T>, IDisposable
{
    public Observer(Action<T> action);
    public void Subscribe(Notifier<T> tracker);
    public void Unsubscribe();
    public void OnCompleted();
    public void OnError(Exception error);
    public void OnNext(T value);
    public void Dispose();
}

public class ObservableValue<T> : Notifier<T>
{
    public T Get();
    public void Set(T x);
}

public class ComputedValue<T>
{
    public T …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive subject-observer

2
推荐指数
1
解决办法
3795
查看次数

Reactive Extensions .NET 3.5中的ConcurrentDictionary在哪里

我的问题很简单.经过大量谷歌搜索后,我了解到我可以在.NET 3.5项目中使用ConcurrentDictionary,使用其安装目录中的Reactive ExtensionsSystem.Threading.dll版本.首先,没有System.Threading.dll,Reactive Extensions .NET 3.5子目录中只有System.Reactive.Windows.Threading.添加对System.Reactive或System.Reactive.Windows.Threading的引用或者从提到的.NET 3.5中的任何其他引用都没有给我ConcurrentDictionary类,也没有给我System.Collections.Concurrent命名空间.我已经下载了旧版本的Reactive Extensions SDK,我找到了我一直在寻找的东西,但我的问题是:有没有人知道Reactive Extensions的实际版本中ConcurrentDictionary backport发生了什么,有人知道它在哪里或为什么它是失踪.我根本无法找到合理的答案或任何答案.

c# .net-3.5 system.reactive concurrentdictionary

2
推荐指数
1
解决办法
1925
查看次数

Reactive Extensions Test Scheduler模拟时间过去

我使用.Schedule(DateTimeOffset,Action>)的东西使用RX调度程序类.基本上我有一个可以再次安排自己的预定动作.

码:

public SomeObject(IScheduler sch, Action variableAmountofTime)
{
    this.sch = sch;
    sch.Schedule(GetNextTime(), (Action<DateTimeOffset> runAgain =>
    {
        //Something that takes an unknown variable amount of time.
        variableAmountofTime();

        runAgain(GetNextTime());
    });
}

public DateTimeOffset GetNextTime()
{
    //Return some time offset based on scheduler's 
    //current time which is irregular based on other inputs that i have left out.
    return this.sch.now.AddMinutes(1);
}
Run Code Online (Sandbox Code Playgroud)

我的问题是关于模拟变量AmountofTime可能采用的时间量,并测试我的代码按预期行为,并且仅触发按预期调用它.

我已经尝试推进测试调度程序在代理中的时间,但这不起作用.我编写的代码示例不起作用.假设GetNextTime()只是安排一分钟.

[Test]
public void TestCallsAppropriateNumberOfTimes()
{
    var sch = new TestScheduler();

    var timesCalled = 0;

    var variableAmountOfTime = () => 
        { 
            sch.AdvanceBy(TimeSpan.FromMinutes(3).Ticks); …
Run Code Online (Sandbox Code Playgroud)

c# unit-testing system.reactive

2
推荐指数
1
解决办法
1594
查看次数

在Windows服务中使用Rx

我试图在Windows服务中使用Rx扩展,但我被卡住了.我找到的样本不起作用.我将用一些代码解释我想要实现的目标.我正在创建的第一个类实际上是完成所有工作的类:

class Worker : IDisposable {
    public Worker() {
    }

    private void Run() {
    }

    public void Dispose() {
    }
}
Run Code Online (Sandbox Code Playgroud)

我想在OnStart中创建一个实例并将其杀死OnStop:

public partial class MyService : ServiceBase {
    private Worker _Worker;

    public MyService () {
        InitializeComponent ();
    }

    protected override void OnStart ( string[] args ) {
        _Worker = new Worker();
    }

    protected override void OnStop () {
        _Worker.Dispose();
        _Worker = null;
    }
}
Run Code Online (Sandbox Code Playgroud)

所以我在Worker的构造函数中执行此操作:

/* _TimesEvents and _Events are class level variables */
_TimedEvents = Observable.Timer ( TimeSpan.FromSeconds …
Run Code Online (Sandbox Code Playgroud)

c# windows-services system.reactive

2
推荐指数
1
解决办法
799
查看次数

使用一个observable作为时钟来测试另一个是否超时

我想做什么的声明如下:

// Checks input source for timeouts, based on the number of elements received 
// from clock since the last one received from source. 
// The two selectors are used to generate output elements.
public static IObservable<R> TimeoutDetector<T1,T2,R>(
        this IObservable<T1> source, 
        IObservable<T2> clock, 
        int countForTimeout,
        Func<R> timedOutSelector, 
        Func<T1, R> okSelector)
Run Code Online (Sandbox Code Playgroud)

ascii中的大理石图很难,但是这里有:

source --o---o--o-o----o-------------------o---
clock  ----x---x---x---x---x---x---x---x---x---
output --^---^--^-^----^-----------!-------^---
Run Code Online (Sandbox Code Playgroud)

我已经尝试寻找Observable可以组合sourceclock以我可以使用的方式的现有函数,但是大多数组合函数依赖于接收"每个"中的一个(And,Zip),或者它们重新返回"之前"的值. "丢失"一个(CombineLatest),或他们是从我需要什么(只是太远Amb,GroupJoin,Join,Merge,SelectMany …

c# system.reactive

2
推荐指数
1
解决办法
273
查看次数