以下示例是我尝试这样做的:
var source
= Observable.Sample(
Observable.Range(1, int.MaxValue), TimeSpan.FromSeconds(2));
Run Code Online (Sandbox Code Playgroud)
但当我.subscribe()到Observable并将其输出到控制台时,它会显示一个这样的序列,每2秒输出一行:
OnNext: 312969
OnNext: 584486
OnNext: 862009
Run Code Online (Sandbox Code Playgroud)
显然.Range()observable正在运行,而.Sample()observable在每个输出之间等待2秒.我想知道如何创建一个observable但是不允许跳过值,所以显然看起来像这样:
OnNext: 1
OnNext: 2
OnNext: 3
Run Code Online (Sandbox Code Playgroud)
使用.Range()中的一个值每2秒输出一次.如何在.NET的Reactive Extensions中实现这一目标?
我已经开始研究Reactive扩展,我想知道如何执行以下操作(生病并尝试保持简单):
有一个字符串列表(或任何其他类型)
将项目添加到所述列表时,请对其执行某些操作.
我正在使用最新的Reactive Extensions并遇到了一个设计问题:
如果我从委托中抛出异常,我会进入Subscribe会发生什么?
通过源步骤,我发现:
Where)在异常通过时处理订阅.所以当然我发现在通过标准RX运算符传递一个observable的任何地方,任何异常都会导致我的事件因处置而停在那里.至少,除非我重新订阅.
这让我质疑我的设计.从我的代表那里抛出异常是一个坏主意吗?显然,RX团队是这么认为的.(虽然我怀疑是否正确地处理"坏"订阅是正确的方法.)
但是,看看我的设计,我不明白为什么这是一个问题.我有些保护行动的时候,我开始烧一些OnNext的通知听众(我们已经完全切换从旧派.NET事件的可观),如果有什么错在那里,它会抛出的堆栈,直到遇到一个处理程序.在我的情况下,处理程序回滚事务它的工作,这也通知回滚的听众.这一切都是异常安全的,工作正常.至少,它没有在Where运算符的Producer基础上进行Dispose时工作正常.
更进一步..主题和同行不做这种行为是不一致的吗?而对于我们自己的ISubjects和我们已经写在这里观察到的运营商,我们应该做这同一处置上的异常行为?
我期待着任何见解!
经过对StackOverflow的大量努力和研究 - 其中大部分已经过时,因为Reactive Extensions代码最近发生了变化 - 我终于能够消除此Observable方法从套接字读取数据的所有编译错误,我理解这一点代码比我最初做的要好得多.但还不完全.有人可以用英语回复给我,回答两三个问题吗?
是从这种方法中提取的缓冲数据(或者如果我有错误,应该怎么做?)?是否有不再需要它的部分?虽然我真的喜欢与业务代码分离,并且只用一两种方法保留所有套接字代码,但有没有更好的方法(解耦和可读)?
public static IObservable<int> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
{
Contract.Requires(byteCount > 0);
return Observable.Create<int>(
observer =>
{
byte[] buffer = new byte[byteCount];
int remainder = byteCount;
bool shutdown = false;
return Observable.Defer<int>(() =>
Task.Factory.FromAsync<int>(socket.BeginReceive(buffer, buffer.Length - remainder, remainder, flags,
(result) =>
{
var read = (int)result.AsyncState;
remainder -= read;
if (read == 0)
shutdown = true;
},
null), socket.EndReceive).ToObservable())
.TakeWhile(_ => remainder > 0 && !shutdown)
.TakeLast(1)
.Subscribe(
observer.OnNext, …Run Code Online (Sandbox Code Playgroud) 我通过IObservable属性暴露了一个温和的"昂贵"计算.
如果有多个订阅者,我想保护它不被多次运行,所以我在它后面放了一个Publish().RefCount(),但是当我坚持使用断点时,我仍然看到它被调用了两次.
public IObservable<int> Property
{
get { return _Source.Select(Expensive).Publish().RefCount(); }
}
Run Code Online (Sandbox Code Playgroud) 我的xaml中有几个Tiles(TileLayoutControl类)(本例中只显示了2个),其可见性绑定到布尔属性并通过BooleanToVisibilityConverter转换.
这很好用.我的问题是
我可以将可见性绑定到Command,以便我可以删除那些几个布尔属性的需要吗?
类似于将Visibility绑定到Command.CanExecute的东西
如果是,我该如何实现?任何帮助将非常感谢!谢谢.
<dxlc:Tile Command="{Binding Tile1Command}"
Visibility="{Binding Path=IsTile1Visible , Converter={StaticResource BooleanToVisibilityConverter}}"/>
<dxlc:Tile Command="{Binding Tile2Command}"
Visibility="{Binding Path=IsTile2Visible , Converter={StaticResource BooleanToVisibilityConverter}}"/>
Run Code Online (Sandbox Code Playgroud)
视图模型
private bool _isTile1Visible;
public bool IsTile1Visible
{
get { return _isTile1Visible; }
set { this.RaiseAndSetIfChanged(ref _isTile1Visible, value); }
}
public ReactiveCommand Tile1Command { get; private set; }
Tile1Command = new ReactiveCommand();
Tile1Command.Subscribe(p => PerformTile1Operation());
Run Code Online (Sandbox Code Playgroud) 我有一个在播放/暂停之间交替的按钮.我想看看是否有一个使用反应式扩展的好解决方案,而不是翻转布尔值.这是对我的想法的粗略估计.
var whenPlayClicked = Observable.FromEventPattern<EventArgs>(_playPauseButton, "Click")
.Take(1)
.Skip(1)
.Repeat();
var whenPauseclicked = Observable.FromEventPattern<EventArgs>(_playPauseButton, "Click")
.Skip(1)
.Take(1)
.Repeat();
Run Code Online (Sandbox Code Playgroud) 如何使用Rx.Net生成一个数字列表,如0-100,其中每个数字是随机生成的?
编辑:
看起来像是这样的
public void NonBlocking_event_driven()
{
var random = new Random();
var ob = Observable.Create<int>(
observer =>
{
timer.Interval = 1000;
timer.Elapsed += (s, e) => observer.OnNext(random.Next(0, 3));
timer.Elapsed += OnTimerElapsed;
timer.Start();
return Disposable.Empty;
});
var subscription = ob.Subscribe(UserAction);
Console.ReadLine();
subscription.Dispose();
}
private void OnTimerElapsed(object sender, ElapsedEventArgs e)
{
var random = new Random();
timer.Interval = random.Next(1, 10)*1000;
Console.WriteLine(e.SignalTime);
}
Run Code Online (Sandbox Code Playgroud) 我有以下代码工作代码,我从Mark Seeman的Pluralsight视频中获得.我不明白最后一行是如何工作的.
let sharpObjectCollection = ConcurrentBag<Envelope<SharpObject>>()
let sharpObjectSubject = new Subjects.Subject<Envelope<SharpObject>>()
sharpObjectSubject.Subscribe sharpObjectCollection.Add |> ignore
Run Code Online (Sandbox Code Playgroud)
查看Subscribe的文档,我看到它需要一个IObserver作为参数,但我传递的是ConcurrentBag.Add方法.
这里发生了什么?这是F#的一个特色吗?我也可以在c#中这样做吗?
为什么你会订阅甚至创建一个永远不会产生任何东西的观察,甚至不会产生错误,也不会完成?
system.reactive ×10
c# ×6
.net ×1
c#-5.0 ×1
canexecute ×1
f# ×1
reactiveui ×1
vb.net ×1
visibility ×1
wpf ×1