我熟悉RX并且作为我的实验项目,我正在尝试创建一个简单的命令总线,在概念上类似于:
class Bus
{
Subject<Command> commands;
IObservable<Invocation> invocations;
public Bus()
{
this.commands = new Subject<Command>();
this.invocations = commands.Select(x => new Invocation { Command = x }).Publish();
}
public IObserver<Command> Commands
{
get { return this.commands; }
}
public IObservable<Invocation> Invocations
{
get { return this.invocations; }
}
}
class Invocation
{
public Command Command { get; set; }
public bool Handled { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
这个想法是模块可以在启动时使用Invocations属性安装命令处理程序,并可以将他们希望的任何过滤应用于他们的订阅.另一方面,客户端可以通过调用来触发命令执行Commands.OnNext(command).
但是,我希望总线能够保证提交的每个命令都只由一个处理程序处理.也就是说,OnNext一旦第一个处理程序将Invocation.Handled设置为true,理想情况下应该终止处理,如果在结束时仍然为假OnNext(),Invocation.Handled则应该抛出异常.
我玩了创建自己的ISubject,IObservable和IObserver实现,但这感觉"脏又便宜";)
我正在努力探索RX提供的组合能力.在组合方式中,我如何提供"一次性"保证? …
我对使用Observable.Publish进行多播处理的生命周期感到有些困惑.如何使用正确连接?反对直觉我发现我不需要为多播观察者调用connect来启动他们的订阅.
var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);
// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()
// Does calling
disposable.Dispose();
// unsubscribe field0 and field1?
Run Code Online (Sandbox Code Playgroud)
编辑
我的难题是为什么当我没有在IConnectableObservable显式上调用Connect时我成功订阅了.但是我在IConnectableObservable上调用Await,后者隐式调用Connect
Public Async Function MonitorMeasurements() As Task
Dim cts = New CancellationTokenSource
Try
Using dialog = New TaskDialog(Of Unit)(cts)
Dim measurementPoints =
MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
TakeUntil(dialog.CancelObserved).Publish()
Dim viewModel = New MeasurementViewModel(measurementPoints)
dialog.Content = New MeasurementControl(viewModel)
dialog.Show()
Await measurementPoints
End Using
Catch ex As …Run Code Online (Sandbox Code Playgroud) IScheduler接口提供
public static IDisposable Schedule(this IScheduler scheduler, Action action)
Run Code Online (Sandbox Code Playgroud)
和
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)
Run Code Online (Sandbox Code Playgroud)
ScheduleAsync的方法说明:
// Summary:
// Schedules work using an asynchronous method, allowing for cooperative scheduling
// in an imperative coding style.
//
// Parameters:
// scheduler:
// Scheduler to schedule work on.
//
// action:
// Asynchronous method to run the work, using Yield and Sleep operations for
// cooperative scheduling and injection of cancellation points.
//
// Returns:
// Disposable …Run Code Online (Sandbox Code Playgroud) (我见过这个问题,但......)
我正在尝试研究如何为我的Observable扩展方法选择默认调度程序.
第一个例子:如果我不通过使用Scheduler.Default以下方式离开当前线程,则会在"生产"代码中"卡住" :
public static IObservable<T> ResponsiveSample<T>(this IObservable<T> src,
TimeSpan interval, IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return src.Publish(xs => xs.Take(1).Concat(xs.Sample(interval, scheduler)));
}
Run Code Online (Sandbox Code Playgroud)
public static IObservable<T> RetryAfterDelay<T>(this IObservable<T> source,
TimeSpan dueTime, IScheduler scheduler = null)
{
return RepeateInfinite(source, dueTime, scheduler).Catch();
}
private static IEnumerable<IObservable<T>> RepeateInfinite<T>(IObservable<T> src,
TimeSpan dueTime, IScheduler scheduler = null)
{
yield return source; // Don't delay the first time
scheduler = scheduler ?? …Run Code Online (Sandbox Code Playgroud) 我需要删除一个文件和应用程序中的其他一些进程阻止它.作为一种解决方法,我决定间隔几次尝试.这是正确的方法:
Observable.Start(() => File.Delete(path)).Retry(2)
.Delay(TimeSpan.FromMilliseconds(500)).Wait();
Run Code Online (Sandbox Code Playgroud) 我希望以下LINQPad代码能够在1秒内完成执行.但是在我的机器上一直需要15秒才能完成.我在这里缺少什么 - 这只是RX开销吗?
int count = 0;
ManualResetEvent done = new ManualResetEvent(false);
Observable.Interval(TimeSpan.FromMilliseconds(1))
.Take(1000)
.Subscribe((next) => count++, () => done.Set());
done.WaitOne();
count.Dump("Items observed");
Run Code Online (Sandbox Code Playgroud) 我试图得到一个非常简单的示例,将订阅序列输出到文本框中,因为您可以预期在控制台应用程序中执行它是没有问题的.
我已经尝试了十几种不同的invoke变体,所有这些都会使相同的交叉线程符合他们的要求.
我读过ObserveOnDispatcher可能会做这个工作吗?但我找不到任何地方.我已经尝试过ObserveOn,你可以在下面的代码中看到,但同样的问题.
private void button1_Click(object sender, EventArgs e)
{
var source = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Timestamp();
source.ObserveOn(Scheduler.CurrentThread).Subscribe(x => textBox1.AppendText(x.Value.ToString()));
}
Run Code Online (Sandbox Code Playgroud)
我不明白为什么以下工作正常;
source.Subscribe(x => MessageBox.Show(x.Value + " - " + x.Timestamp));
Run Code Online (Sandbox Code Playgroud)
然而,将这些值写入文本框需要发脾气.
有任何想法吗?
c# multithreading observablecollection winforms system.reactive
我想使用takeUntil运算符来完成无法自然完成的内部流,如下所示:
outerObservable
.mergeMap(() => {
innerObservable
.takeUntil(stopObservable$)
});
Run Code Online (Sandbox Code Playgroud)
这可以正常工作,内部流按预期完成,但是我希望外部流在停止信号之后返回最后一个值。即使经过大量谷歌搜索,我仍然不知道如何做。
编辑:
我写了一个似乎可以解决问题的运算符,但是让问题悬而未决,因为我知道有更好的方法或完全被我误解的东西。
outerObservable
.mergeMap(() => {
innerObservable
.takeUntil(stopObservable$)
});
Run Code Online (Sandbox Code Playgroud)
最近正在与Rx合作,我在合并两个可观察对象并试图从它们发出命令时遇到了麻烦.
我有两个observable,所以我想从它们发出一个命令,只有当那些observable为真时,命令才能执行.这是我的代码:
BuyCommand = playerData.Gold.Select(x => x >= boosterStoreItem.price)
.Merge(inventoryItem.CanAddItem.Select(x => x))
.ToReactiveCommand();
Run Code Online (Sandbox Code Playgroud)
那么这段代码有点工作,但问题是,我有多个BuyCommands(它们之间没有共享任何可观察对象),如果任何CanAddItem改变状态,所有BuyCommands CanExecute都变为true.
我相信我在合并时犯了一个错误,应该以其他方式完成.那我该怎么办呢?
请注意,它是UniRx(统一的Rx),但它们几乎相同.
我是RXSwift的初学者,我的代码有问题
我有代码:
let dartScore = PublishSubject<Int>()
dartScore.asObservable()
.scan(501) { intermediate, newValue in
let result = intermediate - newValue
return result >= 0 ? result : intermediate
}
.do(onNext: {
if $0 == 0 {
dartScore.onCompleted()
}
})
.subscribe({
print($0.isStopEvent ? $0 : $0.element!)
})
.disposed(by: disposeBag)
dartScore.onNext(13)
dartScore.onNext(50)
dartScore.onNext(60)
dartScore.onNext(378)
Run Code Online (Sandbox Code Playgroud)
我得到错误:
⚠️检测到可重入异常.⚠️
调试:要调试此问题,您可以在/****RxSwift/RxSwift/Rx.swift:97中设置断点并观察调用堆栈.
问题:此行为违反了可观察的序列语法.
next (error | completed)?此行为会破坏语法,因为序列事件之间存在重叠.可观察序列试图在发送先前事件之前发送事件.
为什么我不能在.do(onNext)里面做".onCompleted()",我应该怎么做以避免警告?
我正在使用XCode 9.0,swift 4,RXSwift 4.0.0
谢谢
最好的祝福
system.reactive ×10
c# ×6
.net ×2
javascript ×1
rx-swift ×1
rxjs ×1
scheduler ×1
swift ×1
winforms ×1
xcode ×1