标签: system.reactive

如何使用 .net 中的 RX 扩展来使用 UDP 字节流

我想出了这个解决方案。(尚未测试)通过网络上的大量弹跳。

Private Function ObserveUDP() As IObservable(Of bytes())


    Dim f = Function(observer)
                Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                Dim client = New UdpClient(endpoint)

                Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
                      ( Nothing _
                      , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
                      , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
                      , Function(task As Task(Of UdpReceiveResult)) task.Result)

                Dim observable = obs.Select(Function(r) r.Buffer)

                dim handle = observable.Subscribe(observer)

                Dim df = Sub() 
                    client.Close()
                    handle.Dispose()
                End Sub

                Return Disposable.Create(df)

    End Function …
Run Code Online (Sandbox Code Playgroud)

.net vb.net dispose udp system.reactive

3
推荐指数
1
解决办法
1519
查看次数

FromEventPattern 如何知道事件何时完成?

我有一个由大量事件填充的可观察集合。我想从 EventArg 中获取信息,按名称对其进行分组,然后为每个名称选择最大日期。我试过这个:

_subscription = Observable
            .FromEventPattern<NewLoanEventHandler, NewLoanEventArgs>(
                h => loan.NewLoanEvent += h, 
                h => loan.NewLoanEvent -= h)
            .Select(a => a.EventArgs.Counterpatry)
            .GroupBy(c => c.Name)
            .SelectMany(grp => grp.Max( c => c.MaturityDate ).Select( maturity => new {grp.Key, maturity}) )
            .Subscribe( 
                i => Console.WriteLine("{0} --> {1}", i.Key, i.maturity),
                Console.WriteLine,
                () => Console.WriteLine("completed")
                );
Run Code Online (Sandbox Code Playgroud)

我认为它可能会做我想做的事,但订阅永远不会完成:我永远不会收到完整的消息,也没有得到任何输出。也就是说,我怀疑,因为 Observable 仍在等待更多事件。我如何告诉它停止等待并给我我的输出?

c# events system.reactive

3
推荐指数
1
解决办法
197
查看次数

Rx for .Net:如何将 Scan 与 Throttle 结合使用

我的问题是:对于给定的事件序列,我想缓存它们的值,直到流中出现暂停。然后,我将批量处理所有缓存数据并清除缓存状态。

一种天真的方法是(不是工作代码,可能存在一些错误):

struct FlaggedData
{
    public EventData Data { get; set; }
    public bool Reset { get; set; }
}

...

IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();

var flaggedDataStream = eventsStream
    .Select(data => new FlaggedData { Data = data })
    .Merge(resetSignal)
    .Scan(
        new List<EventData>(),
        (cache, flaggedData) =>
        {
            if (!flaggedData.Reset())
            {
                cache.Add(flaggedData.Data);
                return cache;
            }

            return new List<EventData>();
        })
    .Throttle(SomePeriodOfTime)
    .Subscribe(batch => 
        {
            resetSignal.OnNext(new FlaggedData { Reset = true});
            ProcessBatch(batch);
        });
Run Code Online (Sandbox Code Playgroud)

所以在这里,在收到任何要处理的批处理后,我请求重置缓存。问题是因为Throttle缓存中可能有一些数据(或者我相信),在这种情况下会丢失。

我想要的是一些操作,如:

ScanWithThrottling<TAccumulate, TSource>( …
Run Code Online (Sandbox Code Playgroud)

.net c# events reactive-programming system.reactive

3
推荐指数
1
解决办法
1610
查看次数

创建一个可以在 RxCpp 中取消订阅的 Observable

我正在从 C# 移植一些严重依赖 Rx 的代码,而且我很难找到一些最常用的 C# 方法的 C++ 等价物。

特别是,我想从订阅/取消订阅逻辑创建一个 observable。在 C# 中,我使用Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>)覆盖来创建一个 observable。例如

var observable = Observable.Create<int>(observer =>
{
    observers.Add(observer);
    return () =>
    {
        observers.Remove(observer)
    };
});
Run Code Online (Sandbox Code Playgroud)

是否可以用RxCpp做同样的事情?我认为答案在于rx::observable<>::create(OnSubscribe os)方法,但我不知道如何使用它来“注册”取消订阅的 lambda。

c# c++ system.reactive rxcpp

3
推荐指数
1
解决办法
2079
查看次数

在rx中有类似ThrottleOrMax的东西吗?

使用案例:我正在编写一个监视更改并自动保存的内容.我想要节流,以便我不会比每五秒钟更多地保存.如果有连续的变化流,我想每30秒保存一次.

无法在文档中找到observable.Throttle(mergeTime,maxTime),只能想到编写自己的丑陋方式,因此这个问题.

c# events throttling system.reactive

3
推荐指数
1
解决办法
220
查看次数

如何在活动扩展C#中将IObserable <bool>转换为bool

我想检查是否System.Reactive.IObserable<TElement>包含任何元素.该Any()扩展方法没用,因为它期待返回IObserable<bool>,而不只是布尔的.

c# wpf system.reactive

3
推荐指数
1
解决办法
1353
查看次数

如何使用 ReactiveUI 实现倒数计时器?

我是反应式世界的新手,我仍在努力学习。为了练习,我决定编写一个非常简单的 WPF 倒数计时器应用程序。基本上,这就是我想要做的:

  • 有一个显示当前剩余时间的 TextBlock。
  • 单击按钮启动计时器。
  • 当计时器运行时,按钮应该被禁用。

我正在尝试使用 ReactiveUI 来实现这一点。以下是我到目前为止...

XAML:

<Window x:Class="ReactiveTimer.MainWindow"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
    xmlns:local="clr-namespace:ReactiveTimer"
    mc:Ignorable="d"
    Title="MainWindow" Height="350" Width="525">
<Window.DataContext>
    <local:MainViewModel/>
</Window.DataContext>
<Grid HorizontalAlignment="Center" VerticalAlignment="Center">
    <Grid.RowDefinitions>
        <RowDefinition/>
        <RowDefinition Height="Auto"/>
    </Grid.RowDefinitions>

    <TextBlock FontSize="45" FontWeight="Bold">
        <TextBlock.Text>
            <MultiBinding StringFormat="{}{0:00}:{1:00}">
                <Binding Path="RemainingTime.Minutes"/>
                <Binding Path="RemainingTime.Seconds"/>
            </MultiBinding>
        </TextBlock.Text>
    </TextBlock>

    <Button Command="{Binding StartCommand}" Content="Start" Grid.Row="1"/>
</Grid>
Run Code Online (Sandbox Code Playgroud)

视图模型:

public interface IMainViewModel
{
    TimeSpan RemainingTime { get; }
    ICommand StartCommand { get; }
}

public class MainViewModel : ReactiveObject, IMainViewModel
{
    const double InitialTimeInSeconds = 10; …
Run Code Online (Sandbox Code Playgroud)

wpf system.reactive reactiveui

3
推荐指数
1
解决办法
1457
查看次数

动态组合多个Retrofit Observable

我有一个像这样的Observable列表:

 List<Observable<MyObj>> listObservables = new ArrayList<Observable<MyObj>>();
Run Code Online (Sandbox Code Playgroud)

我想将所有Observable组合在一个中,如果我知道Observable的使用数量,我可以处理它zip(),例如我们有3个Observable:

 Observable<MyObj1> obs1= MyRestClient.getSomeData1();
 Observable<MyObj2> obs2= MyRestClient.getSomeData2();
 Observable<MyObj3> obs3= MyRestClient.getSomeData3();
Run Code Online (Sandbox Code Playgroud)

我有一个包装器obj:

class MyWrapperObj {
    private MyObj1 onj1;
    private MyObj2 onj2;
    private MyObj3 onj3;

    public MyWrapperObj(MyObj1 onj1, MyObj2 onj2, MyObj3 onj3) {
        this.onj1 = onj1;
        this.onj2 = onj2;
        this.onj3 = onj3;
    }
}
Run Code Online (Sandbox Code Playgroud)

所以我可以这样组合它们:

 Observable<MyWrapperObj> combinedObservable = Observable.zip(obs1, obs2, obs3, new Func3<MyObj1, MyObj2, MyObj3, MyWrapperObj>() {
        @Override
        public MyWrapperObj call(MyObj1 obj1, MyObj2 obj2, MyObj3 obj3) {
            return new MyWrapperObj(obj1, obj2, obj3);
        }
    }); …
Run Code Online (Sandbox Code Playgroud)

android system.reactive rx-java rx-android retrofit2

3
推荐指数
1
解决办法
2367
查看次数

为什么订阅从集合中获取的IObservable不起作用(以及如何处理)

我的目标是从源可观察源创建一组observable,以便我可以单独订阅它们.

当我手动执行此操作(即手动创建每个子源)时,事情按预期工作:添加到原始源的值充分传播到子源.

但是当我在循环中创建它们,将它们添加到a时List<IObservable<T>>,从该列表中获取的元素的订阅似乎不起作用:

class Program
{
    static void Main(string[] args)
    {
        // using Subject for the sake of example
        var source = new Subject<int>(); 


        // manually creating each subSource
        var source0 = source.Where((t, i) => i % 3 == 0);
        var source1 = source.Where((t, i) => i % 3 == 1);
        var source2 = source.Where((t, i) => i % 3 == 2);


        // creating a List of subsources
        List<IObservable<int>> sources = new List<IObservable<int>>();

        int count = 3;

        for …
Run Code Online (Sandbox Code Playgroud)

system.reactive

3
推荐指数
1
解决办法
59
查看次数

如何断言Observable不会推送任何项目?

我想知道如何单元测试序列Observable.Never.

我怎么能确定没有任何东西被推到序列中?

.net c# system.reactive observable

3
推荐指数
1
解决办法
58
查看次数