标签: system.reactive

如何创建一个IObservable <T>,每隔-n-秒返回一个值而不跳过任何值

以下示例是我尝试这样做的:

var source
    = Observable.Sample(
          Observable.Range(1, int.MaxValue), TimeSpan.FromSeconds(2));
Run Code Online (Sandbox Code Playgroud)

但当我.subscribe()到Observable并将其输出到控制台时,它会显示一个这样的序列,每2秒输出一行:

OnNext: 312969
OnNext: 584486
OnNext: 862009
Run Code Online (Sandbox Code Playgroud)

显然.Range()observable正在运行,而.Sample()observable在每个输出之间等待2秒.我想知道如何创建一个observable但是不允许跳过值,所以显然看起来像这样:

OnNext: 1
OnNext: 2
OnNext: 3
Run Code Online (Sandbox Code Playgroud)

使用.Range()中的一个值每2秒输出一次.如何在.NET的Reactive Extensions中实现这一目标?

c# system.reactive

2
推荐指数
1
解决办法
1999
查看次数

使用List <String>和Reactive扩展

我已经开始研究Reactive扩展,我想知道如何执行以下操作(生病并尝试保持简单):

  1. 有一个字符串列表(或任何其他类型)

  2. 将项目添加到所述列表时,请对其执行某些操作.

c# vb.net reactive-programming system.reactive

2
推荐指数
2
解决办法
1848
查看次数

如果我在订阅回调中为Observable抛出异常会发生什么?

我正在使用最新的Reactive Extensions并遇到了一个设计问题:

如果我从委托中抛出异常,我会进入Subscribe会发生什么?

通过源步骤,我发现:

  • 主题将忽略该异常.
  • 从Producer派生的运算符(例如Where)在异常通过时处理订阅.

所以当然我发现在通过标准RX运算符传递一个observable的任何地方,任何异常都会导致我的事件因处置而停在那里.至少,除非我重新订阅.

这让我质疑我的设计.从我的代表那里抛出异常是一个坏主意吗?显然,RX团队是这么认为的.(虽然我怀疑是否正确地处理"坏"订阅是正确的方法.)

但是,看看我的设计,我不明白为什么这是一个问题.我有些保护行动的时候,我开始烧一些OnNext的通知听众(我们已经完全切换从旧派.NET事件的可观),如果有什么错在那里,它会抛出的堆栈,直到遇到一个处理程序.在我的情况下,处理程序回滚事务它的工作,这也通知回滚的听众.这一切都是异常安全的,工作正常.至少,它没有在Where运算符的Producer基础上进行Dispose时工作正常.

更进一步..主题和同行不做这种行为是不一致的吗?而对于我们自己的ISubjects和我们已经写在这里观察到的运营商,我们应该做这同一处置上的异常行为?

我期待着任何见解!

.net c# system.reactive

2
推荐指数
1
解决办法
2761
查看次数

读取可观察的TAP模式

经过对StackOverflow的大量努力和研究 - 其中大部分已经过时,因为Reactive Extensions代码最近发生了变化 - 我终于能够消除此Observable方法从套接字读取数据的所有编译错误,我理解这一点代码比我最初做的要好得多.但还不完全.有人可以用英语回复给我,回答两三个问题吗?

是从这种方法中提取的缓冲数据(或者如果我有错误,应该怎么做?)?是否有不再需要它的部分?虽然我真的喜欢与业务代码分离,并且只用一两种方法保留所有套接字代码,但有没有更好的方法(解耦和可读)?

    public static IObservable<int> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
    {
        Contract.Requires(byteCount > 0);

        return Observable.Create<int>(
            observer =>
            {
                byte[] buffer = new byte[byteCount];
                int remainder = byteCount;
                bool shutdown = false;

                return Observable.Defer<int>(() =>
                        Task.Factory.FromAsync<int>(socket.BeginReceive(buffer, buffer.Length - remainder, remainder, flags,
                        (result) =>
                        {
                            var read = (int)result.AsyncState;
                            remainder -= read;

                            if (read == 0)
                                shutdown = true;
                        },
                        null), socket.EndReceive).ToObservable())
                    .TakeWhile(_ => remainder > 0 && !shutdown)
                    .TakeLast(1)
                    .Subscribe(
                        observer.OnNext, …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive observer-pattern c#-5.0

2
推荐指数
1
解决办法
502
查看次数

发布().RefCount()不起作用?

我通过IObservable属性暴露了一个温和的"昂贵"计算.

如果有多个订阅者,我想保护它不被多次运行,所以我在它后面放了一个Publish().RefCount(),但是当我坚持使用断点时,我仍然看到它被调用了两次.

public IObservable<int> Property
{
    get { return _Source.Select(Expensive).Publish().RefCount(); }
}
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

2
推荐指数
1
解决办法
161
查看次数

将可见性绑定到ReactiveCommand CanExecute

我的xaml中有几个Tiles(TileLayoutControl类)(本例中只显示了2个),其可见性绑定到布尔属性并通过BooleanToVisibilityConverter转换.
这很好用.我的问题是

我可以将可见性绑定到Command,以便我可以删除那些几个布尔属性的需要吗?

类似于将Visibility绑定到Command.CanExecute的东西

如果是,我该如何实现?任何帮助将非常感谢!谢谢.

<dxlc:Tile Command="{Binding Tile1Command}"
           Visibility="{Binding Path=IsTile1Visible , Converter={StaticResource BooleanToVisibilityConverter}}"/>
<dxlc:Tile Command="{Binding Tile2Command}"
           Visibility="{Binding Path=IsTile2Visible , Converter={StaticResource BooleanToVisibilityConverter}}"/>
Run Code Online (Sandbox Code Playgroud)

视图模型

private bool _isTile1Visible;
public bool IsTile1Visible
{
    get { return _isTile1Visible; }
    set { this.RaiseAndSetIfChanged(ref _isTile1Visible, value); }
}

public ReactiveCommand Tile1Command { get; private set; }

Tile1Command = new ReactiveCommand();
Tile1Command.Subscribe(p => PerformTile1Operation());
Run Code Online (Sandbox Code Playgroud)

wpf visibility system.reactive canexecute reactiveui

2
推荐指数
1
解决办法
2376
查看次数

在一个按钮上协调多个IObservable

我有一个在播放/暂停之间交替的按钮.我想看看是否有一个使用反应式扩展的好解决方案,而不是翻转布尔值.这是对我的想法的粗略估计.

var whenPlayClicked = Observable.FromEventPattern<EventArgs>(_playPauseButton, "Click")
    .Take(1)
    .Skip(1)
    .Repeat();

var whenPauseclicked = Observable.FromEventPattern<EventArgs>(_playPauseButton, "Click")
    .Skip(1)
    .Take(1)
    .Repeat();
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

2
推荐指数
1
解决办法
69
查看次数

使用rx以随机时间间隔生成数字

如何使用Rx.Net生成一个数字列表,如0-100,其中每个数字是随机生成的?

编辑:

看起来像是这样的

    public void NonBlocking_event_driven()
    {
        var random = new Random();
        var ob = Observable.Create<int>(
        observer =>
        {
            timer.Interval = 1000;
            timer.Elapsed += (s, e) => observer.OnNext(random.Next(0, 3));
            timer.Elapsed += OnTimerElapsed;
            timer.Start();
            return Disposable.Empty;
        });
        var subscription = ob.Subscribe(UserAction);
        Console.ReadLine();
        subscription.Dispose();
    }
    private void OnTimerElapsed(object sender, ElapsedEventArgs e)
    {
        var random = new Random();
        timer.Interval = random.Next(1, 10)*1000;
        Console.WriteLine(e.SignalTime);
    }
Run Code Online (Sandbox Code Playgroud)

system.reactive

2
推荐指数
1
解决办法
1098
查看次数

F#:Add方法和IObserver <>的关系是什么

我有以下代码工作代码,我从Mark Seeman的Pluralsight视频中获得.我不明白最后一行是如何工作的.

let sharpObjectCollection = ConcurrentBag<Envelope<SharpObject>>()
let sharpObjectSubject = new Subjects.Subject<Envelope<SharpObject>>()
sharpObjectSubject.Subscribe sharpObjectCollection.Add |> ignore
Run Code Online (Sandbox Code Playgroud)

查看Subscribe文档,我看到它需要一个IObserver作为参数,但我传递的是ConcurrentBag.Add方法.

这里发生了什么?这是F#的一个特色吗?我也可以在c#中这样做吗?

f# system.reactive

2
推荐指数
1
解决办法
57
查看次数

什么是真实世界的使用或需要Rx运营商从来没有?

为什么你会订阅甚至创建一个永远不会产生任何东西的观察,甚至不会产生错误,也不会完成?

system.reactive

2
推荐指数
1
解决办法
307
查看次数