我的项目中有很多代码,如Hit和静音,通过这种方式使用Reactive扩展:
IDisposable dsp = null;
dsp = TargetObservable.Subscribe((incomingContent) =>
{
if (incomingContent == "something")
{
myList.Add(incomingContent);
dsp.Dispose();
}
});
Run Code Online (Sandbox Code Playgroud)
首先,我担心线程的安全性,因为我的Observable非常繁忙并且一直有大量内容推送,但后来,我被告知我应该结合ObserveOn(thread)保证线程安全,我完全同意,所以让我们忘记了线程安全的事情.
在这里,我想知道:
Take(count)'TakeWhile(预测)'?OnComplete()被调用,Dispose()将在内部调用,对吗?然后Observer和Observable之间的引用关系将中断(因为我的observable是一个长寿命的静态实例,引用会导致内存泄漏).通过我的C#代码中的组合,我最终得到了一个类型
IObservable<Maybe<IObservable<T>>> events;
Run Code Online (Sandbox Code Playgroud)
其中Maybe是monad选项,IObservable是被动monad.
现在我想将其转化为
IObservable<Maybe<T>> flatEvents;
Run Code Online (Sandbox Code Playgroud)
我认为它表达了几乎相同的东西.注意我想保留可能体现的"无所事事"事件.我不想完全展平它,所以当只有T的实例可用时我才会看到事件.
经过一些试验和错误后,我发现我可以进行转换
var flatEvents = events
.Select(p=>p.Select(q=>q.Select(v=>v.ToMaybe()))
.Else(Observable.Return(None<T>.Default)))
.Switch();
Run Code Online (Sandbox Code Playgroud)
其中None<T>.Default返回一个Maybe<T>里面是空的.
T Maybe<T>.Else(T v) 提取选项monad中包含的值,如果没有,则提供替代值.
考虑到命名事情是最难的事情CS我正在寻找这个扁平化运算符的名称.我不是一个haskell程序员,但我确信我没有在这里发明任何东西,这是一种常见的投影.这有名字吗?
我很惊讶反应式扩展CompositeDisposable类未能通过以下测试
[Test]
public void TestDisposable()
{
var ds = new List<IDisposable>();
int[] a = { 1, 2, 3 };
using (new CompositeDisposable(ds))
{
ds.Add(Disposable.Create(() => a[0] = 3));
ds.Add(Disposable.Create(() => a[2] = 1));
}
Assert.That(a[0],Is.EqualTo(3)); //Failed here
Assert.That(a[1], Is.EqualTo(2));
Assert.That(a[2], Is.EqualTo(1));
}
Run Code Online (Sandbox Code Playgroud)
所以这意味着如果我CompositeDisposable通过给定 a创建 aIEnumerable<IDisposable>它实际上迭代它的所有元素,而不是将其推迟到Dispose被调用者。
这在其他情况下可能很有用,但在我的情况下却很不方便。是否还有其他课程可以更轻松地完成上述任务?
我在使用阻塞代码时遇到死锁Task.Wait(),等待async内部等待 Rx LINQ 查询的方法。
这是一个例子:
public void BlockingCode()
{
this.ExecuteAsync().Wait();
}
public async Task ExecuteAsync()
{
await this.service.GetFooAsync().ConfigureAwait(false);
//This is the RX query that doesn't support ConfigureAwaitawait
await this.service.Receiver
.FirstOrDefaultAsync(x => x == "foo")
.Timeout(TimeSpan.FromSeconds(1));
}
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是是否有任何等效的 ConfigureAwait on awaitable IObservable 以确保延续不会在相同的SynchronizationContext.
我正在尝试学习/理解 Rx,特别是 RxJS,并不断看到对 IObservable、IObserver 等的引用。
谁能告诉我领先I意味着什么和/或它来自哪里?
从我的搜索来看,它看起来像是<T>类型。如果这是错误的或幼稚的,我也希望对此进行一些澄清。
谢谢!
我正在尝试在新线程上安排一个可观察对象,并将结果返回到当前线程上。
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Log("init");
Observable.Return(0)
.ObserveOn(NewThreadScheduler.Default)
.Do(_ => Log("Do1 method"))
.ObserveOn(CurrentThreadScheduler.Instance)
.Do(_ => Log("Do2 method"))
.Subscribe(_ => Log("subscribe method"));
Console.ReadKey();
}
static void Log(string label)
{
Console.WriteLine("{0} on {1}", label, Thread.CurrentThread.ManagedThreadId);
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我得到的结果:
init on 9
Do1 method on 10
Do2 method on 10
subscribe method on 10
Run Code Online (Sandbox Code Playgroud)
为什么 Do2 方法和 subscribe 方法在线程 #10 上而不是在线程 #9 上?我期待这个结果:
init on 9
Do1 method on 10
Do2 method on 9
subscribe …Run Code Online (Sandbox Code Playgroud) 我正在尝试在我的 Kafka 消费者中使用 Rx。
public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
Run Code Online (Sandbox Code Playgroud)
然后我有以下代码
var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
x => GenericEvent += x,
x => GenericEvent -= x)
.Select(x => x.EventArgs);
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
GenericEvent(consumeResult.Topic, consumeResult);
}
Run Code Online (Sandbox Code Playgroud)
然后在某处我使用它
var subscription = observable.Subscribe(message =>
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");
//_kafkaConsumer.Commit(messages);
});
Run Code Online (Sandbox Code Playgroud)
是否有可能按主题名称 ( consumeResult.Topic)运行单独的线程?当消费者收到一条消息时,它应该将其按主题重定向到相应的线程
完整的代码如下
if (CurrentThreadScheduler.Instance.ScheduleRequired)
{
CurrentThreadScheduler.Instance.Schedule(this, (_, me) => subscription.Disposable = me.Run(observer, subscription, s => sink.Disposable = s));
}
else
{
subscription.Disposable = this.Run(observer, subscription, s => sink.Disposable = s);
}
Run Code Online (Sandbox Code Playgroud) 我有基于事件的API(Geolocator),我想转换为Rx.
问题是某些操作要求取消订阅所有事件,并且我不想将该burdon传递给Rx API的用户.
因此,用户将订阅一些可观察对象,并且当订阅事件时,它们将被发布到那些可观察对象.
最好的方法是什么?
我想到了创建用户订阅的主题,然后通过另一组可观察对象将事件发布给那些主题.
这是最好的方法吗?如果是这样,怎么样?
如何能在电视频道的问题,因为在此解释的谈话在第31分钟的中通过RX解决?
Rx中表达的问题如下:
两个电视频道(channel1和channel2)传输图像流,加上其中的流fuzz表示没有频道或白噪声.
有两个按钮可以发送事件eButton1和eButton2按下它们.
按下这些按钮应该导致各个通道被发送到屏幕.
每个按钮按下应该被投影(映射)到相应的频道,然后所有频道组合成选择流作为以流开始的fuzz流的流.最后,交换机操作员将选定的流发送给screen.
什么相当于Sodiumswitch并在RX中合并?
是否有可能用纯高阶函数解决它?即不使用闭包?我不明白这是怎么可能的.

system.reactive ×10
c# ×8
async-await ×2
asynchronous ×2
concurrency ×1
idisposable ×1
lambda ×1
monads ×1
naming ×1
rx.net ×1
rxjs ×1
sodium ×1