标签: system.reactive

何时为可观察对象调用Dispose

我的项目中有很多代码,如Hit和静音,通过这种方式使用Reactive扩展:

    IDisposable dsp = null;
    dsp = TargetObservable.Subscribe((incomingContent) =>
    {
        if (incomingContent == "something")
        {
            myList.Add(incomingContent);
            dsp.Dispose();
        }
    });
Run Code Online (Sandbox Code Playgroud)

首先,我担心线程的安全性,因为我的Observable非常繁忙并且一直有大量内容推送,但后来,我被告知我应该结合ObserveOn(thread)保证线程安全,我完全同意,所以让我们忘记了线程安全的事情.

在这里,我想知道:

  1. 我应该如何或何时将Dispose称为可观察的.
  2. 什么是满足命中和静音的正确方法,结合一些完整的扩展方法,如Take(count)'TakeWhile(预测)'?
  3. 如果OnComplete()被调用,Dispose()将在内部调用,对吗?然后Observer和Observable之间的引用关系将中断(因为我的observable是一个长寿命的静态实例,引用会导致内存泄漏).

c# multithreading thread-safety system.reactive

0
推荐指数
1
解决办法
406
查看次数

是否存在以下操作的通用函数编程名称

通过我的C#代码中的组合,我最终得到了一个类型

IObservable<Maybe<IObservable<T>>> events;
Run Code Online (Sandbox Code Playgroud)

其中Maybe是monad选项,IObservable是被动monad.

现在我想将其转化为

IObservable<Maybe<T>> flatEvents;
Run Code Online (Sandbox Code Playgroud)

我认为它表达了几乎相同的东西.注意我想保留可能体现的"无所事事"事件.我不想完全展平它,所以当只有T的实例可用时我才会看到事件.

经过一些试验和错误后,我发现我可以进行转换

var flatEvents =  events
            .Select(p=>p.Select(q=>q.Select(v=>v.ToMaybe()))
                        .Else(Observable.Return(None<T>.Default)))
            .Switch();
Run Code Online (Sandbox Code Playgroud)

其中None<T>.Default返回一个Maybe<T>里面是空的.

T Maybe<T>.Else(T v) 提取选项monad中包含的值,如果没有,则提供替代值.

考虑到命名事情是最难的事情CS我正在寻找这个扁平化运算符的名称.我不是一个haskell程序员,但我确信我没有在这里发明任何东西,这是一种常见的投影.这有名字吗?

c# monads naming functional-programming system.reactive

0
推荐指数
1
解决办法
119
查看次数

CompositDisposable 还不够懒?

我很惊讶反应式扩展CompositeDisposable类未能通过以下测试

[Test]
public void TestDisposable()
{
    var ds = new List<IDisposable>();
    int[] a = { 1, 2, 3 };
    using (new CompositeDisposable(ds))
    {
        ds.Add(Disposable.Create(() => a[0] = 3));
        ds.Add(Disposable.Create(() => a[2] = 1));
    }
    Assert.That(a[0],Is.EqualTo(3)); //Failed here
    Assert.That(a[1], Is.EqualTo(2));
    Assert.That(a[2], Is.EqualTo(1));
}
Run Code Online (Sandbox Code Playgroud)

所以这意味着如果我CompositeDisposable通过给定 a创建 aIEnumerable<IDisposable>它实际上迭代它的所有元素,而不是将其推迟到Dispose被调用者。

这在其他情况下可能很有用,但在我的情况下却很不方便。是否还有其他课程可以更轻松地完成上述任务?

c# idisposable system.reactive

0
推荐指数
1
解决办法
195
查看次数

配置等待 IObservable&lt;T&gt;

我在使用阻塞代码时遇到死锁Task.Wait(),等待async内部等待 Rx LINQ 查询的方法。

这是一个例子:

public void BlockingCode() 

{

    this.ExecuteAsync().Wait();

}

public async Task ExecuteAsync() 

{

    await this.service.GetFooAsync().ConfigureAwait(false);

    //This is the RX query that doesn't support ConfigureAwaitawait 
    await this.service.Receiver
      .FirstOrDefaultAsync(x => x == "foo")
      .Timeout(TimeSpan.FromSeconds(1));

}
Run Code Online (Sandbox Code Playgroud)

所以,我的问题是是否有任何等效的 ConfigureAwait on awaitable IObservable 以确保延续不会在相同的SynchronizationContext.

c# asynchronous system.reactive async-await

0
推荐指数
1
解决办法
936
查看次数

IObservable&lt;T&gt; 或 IObserver&lt;T&gt; 中的“I”是什么意思?

我正在尝试学习/理解 Rx,特别是 RxJS,并不断看到对 IObservable、IObserver 等的引用。

谁能告诉我领先I意味着什么和/或它来自哪里?

从我的搜索来看,它看起来像是<T>类型。如果这是错误的或幼稚的,我也希望对此进行一些澄清。

谢谢!

system.reactive rxjs

0
推荐指数
1
解决办法
95
查看次数

返回当前线程观察

我正在尝试在新线程上安排一个可观察对象,并将结果返回到当前线程上。

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            Log("init");

            Observable.Return(0)
                .ObserveOn(NewThreadScheduler.Default)
                .Do(_ => Log("Do1 method"))
                .ObserveOn(CurrentThreadScheduler.Instance)
                .Do(_ => Log("Do2 method"))
                .Subscribe(_ => Log("subscribe method"));

            Console.ReadKey();
        }

        static void Log(string label)
        {
            Console.WriteLine("{0} on {1}", label, Thread.CurrentThread.ManagedThreadId);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我得到的结果:

init on 9
Do1 method on 10
Do2 method on 10
subscribe method on 10
Run Code Online (Sandbox Code Playgroud)

为什么 Do2 方法和 subscribe 方法在线程 #10 上而不是在线程 #9 上?我期待这个结果:

init on 9
Do1 method on 10
Do2 method on 9
subscribe …
Run Code Online (Sandbox Code Playgroud)

c# concurrency system.reactive

0
推荐指数
1
解决办法
672
查看次数

按组在单独的线程中运行处理

我正在尝试在我的 Kafka 消费者中使用 Rx。

public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
Run Code Online (Sandbox Code Playgroud)

然后我有以下代码

var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
     x => GenericEvent += x,
     x => GenericEvent -= x)
  .Select(x => x.EventArgs);

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
    GenericEvent(consumeResult.Topic, consumeResult);
}
Run Code Online (Sandbox Code Playgroud)

然后在某处我使用它

var subscription = observable.Subscribe(message =>
{
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");

    //_kafkaConsumer.Commit(messages);
});
Run Code Online (Sandbox Code Playgroud)

是否有可能按主题名称 ( consumeResult.Topic)运行单独的线程?当消费者收到一条消息时,它应该将其按主题重定向到相应的线程

在此处输入图片说明

c# asynchronous system.reactive async-await rx.net

0
推荐指数
1
解决办法
82
查看次数

C#lambda表达式中的"我"是什么(_,me)=>是什么意思?我应该什么时候使用我?

完整的代码如下

if (CurrentThreadScheduler.Instance.ScheduleRequired)
{
    CurrentThreadScheduler.Instance.Schedule(this, (_, me) => subscription.Disposable = me.Run(observer, subscription, s => sink.Disposable = s));
}
else
{
    subscription.Disposable = this.Run(observer, subscription, s => sink.Disposable = s);
}
Run Code Online (Sandbox Code Playgroud)

c# lambda system.reactive

-1
推荐指数
1
解决办法
132
查看次数

订阅未来的可观察性

我有基于事件的API(Geolocator),我想转换为Rx.

问题是某些操作要求取消订阅所有事件,并且我不想将该burdon传递给Rx API的用户.

因此,用户将订阅一些可观察对象,并且当订阅事件时,它们将被发布到那些可观察对象.

最好的方法是什么?

我想到了创建用户订阅的主题,然后通过另一组可观察对象将事件发布给那些主题.

这是最好的方法吗?如果是这样,怎么样?

reactive-programming system.reactive

-2
推荐指数
1
解决办法
297
查看次数

在RX中切换流:Sodium相当于RX中的合并和切换

如何能在电视频道的问题,因为在此解释的谈话在第31分钟的中通过RX解决?

Rx中表达的问题如下:

两个电视频道(channel1channel2)传输图像流,加上其中的流fuzz表示没有频道或白噪声.

有两个按钮可以发送事件eButton1eButton2按下它们.

按下这些按钮应该导致各个通道被发送到屏幕.

每个按钮按下应该被投影(映射)到相应的频道,然后所有频道组合成选择流作为以流开始的fuzz流的流.最后,交换机操作员将选定的流发送给screen.

什么相当于Sodiumswitch并在RX中合并?

是否有可能用纯高阶函数解决它?即不使用闭包?我不明白这是怎么可能的.

在此输入图像描述

c# reactive-programming system.reactive sodium

-3
推荐指数
1
解决办法
1257
查看次数