标签: system.reactive

使用RX组成命令总线

我熟悉RX并且作为我的实验项目,我正在尝试创建一个简单的命令总线,在概念上类似于:

class Bus
{
    Subject<Command> commands;
    IObservable<Invocation> invocations;

    public Bus()
    {
        this.commands = new Subject<Command>();
        this.invocations = commands.Select(x => new Invocation { Command = x }).Publish();
    }

    public IObserver<Command> Commands
    {
        get { return this.commands; }
    }

    public IObservable<Invocation> Invocations
    {
        get { return this.invocations; }
    }
}

class Invocation
{
    public Command Command { get; set; }
    public bool Handled { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

这个想法是模块可以在启动时使用Invocations属性安装命令处理程序,并可以将他们希望的任何过滤应用于他们的订阅.另一方面,客户端可以通过调用来触发命令执行Commands.OnNext(command).

但是,我希望总线能够保证提交的每个命令都只由一个处理程序处理.也就是说,OnNext一旦第一个处理程序将Invocation.Handled设置为true,理想情况下应该终止处理,如果在结束时仍然为假OnNext(),Invocation.Handled则应该抛出异常.

我玩了创建自己的ISubject,IObservable和IObserver实现,但这感觉"脏又便宜";)

我正在努力探索RX提供的组合能力.在组合方式中,我如何提供"一次性"保证? …

.net c# system.reactive

1
推荐指数
1
解决办法
1888
查看次数

使用带有被动扩展的Observable.Publish

我对使用Observable.Publish进行多播处理的生命周期感到有些困惑.如何使用正确连接?反对直觉我发现我不需要为多播观察者调用connect来启动他们的订阅.

var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);

// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()

// Does calling 
disposable.Dispose();
// unsubscribe field0 and field1?
Run Code Online (Sandbox Code Playgroud)

编辑

我的难题是为什么当我没有在IConnectableObservable显式上调用Connect时我成功订阅了.但是我在IConnectableObservable上调用Await,后者隐式调用Connect

Public Async Function MonitorMeasurements() As Task


    Dim cts = New CancellationTokenSource

    Try
        Using dialog = New TaskDialog(Of Unit)(cts)

            Dim measurementPoints = 
                MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
                TakeUntil(dialog.CancelObserved).Publish()

            Dim viewModel = New MeasurementViewModel(measurementPoints)
            dialog.Content = New MeasurementControl(viewModel)
            dialog.Show()

            Await measurementPoints
        End Using
    Catch ex As …
Run Code Online (Sandbox Code Playgroud)

.net system.reactive

1
推荐指数
2
解决办法
4545
查看次数

IScheduler.Schedule vs IScheduler.ScheduleAsync?

IScheduler接口提供

public static IDisposable Schedule(this IScheduler scheduler, Action action)
Run Code Online (Sandbox Code Playgroud)

public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)
Run Code Online (Sandbox Code Playgroud)

ScheduleAsync的方法说明:

    // Summary:
    //     Schedules work using an asynchronous method, allowing for cooperative scheduling
    //     in an imperative coding style.
    //
    // Parameters:
    //   scheduler:
    //     Scheduler to schedule work on.
    //
    //   action:
    //     Asynchronous method to run the work, using Yield and Sleep operations for
    //     cooperative scheduling and injection of cancellation points.
    //
    // Returns:
    //     Disposable …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

1
推荐指数
1
解决办法
2932
查看次数

为我的Observable扩展方法选择"默认"调度程序

(我见过这个问题,但......)

我正在尝试研究如何为我的Observable扩展方法选择默认调度程序.

第一个例子:如果我不通过使用Scheduler.Default以下方式离开当前线程,则会在"生产"代码中"卡住" :

public static IObservable<T> ResponsiveSample<T>(this IObservable<T> src, 
                                TimeSpan interval, IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return src.Publish(xs => xs.Take(1).Concat(xs.Sample(interval, scheduler)));
}
Run Code Online (Sandbox Code Playgroud)

第二个例子(从这里).这个在当前线程上是可以的.

public static IObservable<T> RetryAfterDelay<T>(this IObservable<T> source, 
                       TimeSpan dueTime, IScheduler scheduler = null)
{
    return RepeateInfinite(source, dueTime, scheduler).Catch();
}

private static IEnumerable<IObservable<T>> RepeateInfinite<T>(IObservable<T> src, 
                               TimeSpan dueTime, IScheduler scheduler = null)
{
    yield return source; // Don't delay the first time

    scheduler = scheduler ?? …
Run Code Online (Sandbox Code Playgroud)

c# scheduler system.reactive

1
推荐指数
1
解决办法
315
查看次数

使用Rx Start,Retry,Delay,等待同步文件删除重试

我需要删除一个文件和应用程序中的其他一些进程阻止它.作为一种解决方法,我决定间隔几次尝试.这是正确的方法:

Observable.Start(() => File.Delete(path)).Retry(2)
    .Delay(TimeSpan.FromMilliseconds(500)).Wait();
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

1
推荐指数
1
解决办法
707
查看次数

反应性扩展开销?

我希望以下LINQPad代码能够在1秒内完成执行.但是在我的机器上一直需要15秒才能完成.我在这里缺少什么 - 这只是RX开销吗?

int count = 0;
ManualResetEvent done = new ManualResetEvent(false);
Observable.Interval(TimeSpan.FromMilliseconds(1))
    .Take(1000)
    .Subscribe((next) => count++, () => done.Set());
done.WaitOne();
count.Dump("Items observed");
Run Code Online (Sandbox Code Playgroud)

system.reactive

1
推荐指数
1
解决办法
283
查看次数

System.reactive跨线程操作荒谬

我试图得到一个非常简单的示例,将订阅序列输出到文本框中,因为您可以预期在控制台应用程序中执行它是没有问题的.

我已经尝试了十几种不同的invoke变体,所有这些都会使相同的交叉线程符合他们的要求.

我读过ObserveOnDispatcher可能会做这个工作吗?但我找不到任何地方.我已经尝试过ObserveOn,你可以在下面的代码中看到,但同样的问题.

private void button1_Click(object sender, EventArgs e)
{
    var source = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Timestamp();

    source.ObserveOn(Scheduler.CurrentThread).Subscribe(x => textBox1.AppendText(x.Value.ToString()));
}
Run Code Online (Sandbox Code Playgroud)

我不明白为什么以下工作正常;

source.Subscribe(x => MessageBox.Show(x.Value + " - " + x.Timestamp));
Run Code Online (Sandbox Code Playgroud)

然而,将这些值写入文本框需要发脾气.

有任何想法吗?

c# multithreading observablecollection winforms system.reactive

1
推荐指数
1
解决办法
205
查看次数

使用takeUntil完成RxJS流

我想使用takeUntil运算符来完成无法自然完成的内部流,如下所示:

  outerObservable
    .mergeMap(() => {
        innerObservable
            .takeUntil(stopObservable$)
    });
Run Code Online (Sandbox Code Playgroud)

这可以正常工作,内部流按预期完成,但是我希望外部流在停止信号之后返回最后一个值。即使经过大量谷歌搜索,我仍然不知道如何做。

编辑:

我写了一个似乎可以解决问题的运算符,但是让问题悬而未决,因为我知道有更好的方法或完全被我误解的东西。

  outerObservable
    .mergeMap(() => {
        innerObservable
            .takeUntil(stopObservable$)
    });
Run Code Online (Sandbox Code Playgroud)

javascript system.reactive rxjs redux-observable

1
推荐指数
1
解决办法
1142
查看次数

如何在ReactiveX中合并两个布尔可观察对象

最近正在与Rx合作,我在合并两个可观察对象并试图从它们发出命令时遇到了麻烦.

我有两个observable,所以我想从它们发出一个命令,只有当那些observable为真时,命令才能执行.这是我的代码:

BuyCommand = playerData.Gold.Select(x => x >= boosterStoreItem.price)
            .Merge(inventoryItem.CanAddItem.Select(x => x))
            .ToReactiveCommand();
Run Code Online (Sandbox Code Playgroud)

那么这段代码有点工作,但问题是,我有多个BuyCommands(它们之间没有共享任何可观察对象),如果任何CanAddItem改变状态,所有BuyCommands CanExecute都变为true.

我相信我在合并时犯了一个错误,应该以其他方式完成.那我该怎么办呢?

请注意,它是UniRx(统一的Rx),但它们几乎相同.

c# unity-game-engine system.reactive

1
推荐指数
1
解决办法
223
查看次数

检测到RXSwift,Reentrancy异常

我是RXSwift的初学者,我的代码有问题

我有代码:

let dartScore = PublishSubject<Int>()
            dartScore.asObservable()
                .scan(501) { intermediate, newValue in
                    let result = intermediate - newValue
                    return result >= 0 ? result : intermediate
                }
                .do(onNext: {
                    if $0 == 0 {
                        dartScore.onCompleted()
                    }
                })
                .subscribe({
                    print($0.isStopEvent ? $0 : $0.element!)
                })
                .disposed(by: disposeBag)

            dartScore.onNext(13)
            dartScore.onNext(50)
            dartScore.onNext(60)
            dartScore.onNext(378)
Run Code Online (Sandbox Code Playgroud)

我得到错误:

⚠️检测到可重入异常.⚠️

调试:要调试此问题,您可以在/****RxSwift/RxSwift/Rx.swift:97中设置断点并观察调用堆栈.

问题:此行为违反了可观察的序列语法.next (error | completed)? 此行为会破坏语法,因为序列事件之间存在重叠.可观察序列试图在发送先前事件之前发送事件.

为什么我不能在.do(onNext)里面做".onCompleted()",我应该怎么做以避免警告?

我正在使用XCode 9.0,swift 4,RXSwift 4.0.0

谢谢

最好的祝福

xcode system.reactive swift rx-swift

1
推荐指数
1
解决办法
3576
查看次数