标签: system.reactive

你如何注册/取消注册 Observable.FromEventPattern 中使用的事件的处理程序?

我从 Observable.FromEventPattern 得到一个 IObservable,如下所示:

SomeObject target = new SomeObject();
string eventName = "SomeEvent";
IObservable<T> obs = Observable.FromEventPattern<T>(target, eventName);
Run Code Online (Sandbox Code Playgroud)

据我了解,FromEventPattern 调用将自动为我生成添加/删除事件处理程序。但是处理程序何时真正被添加/删除?

我假设在订阅 IObservable 时添加了处理程序。处理订阅者时,处理程序是否也会自动取消注册?

.net c# reactive-programming system.reactive

1
推荐指数
1
解决办法
658
查看次数

如何在 Visual Studio 2012 解决方案中安装 System.Reactive 扩展?

Visual Studio 2012 有 nuget 版本 2.8.60318.667,与 Reactive.Extensions 3.1.1 不兼容。

如果将 System.Reactive.* 包手动复制到我的解决方案的包目录中,当我从 Visual Studio UI 启动包管理器并浏览已安装的包时:而不是 habing 已安装包的列表,将显示以下错误:

安装 System.Reactive 包时出错 “System.Reactive.Core”已经为“System.Reactive.Interfaces”定义了一个依赖项。错误

如果我使用 de package manager 命令安装包: install-package System.Reactive 会显示以下错误:“System.Reactive 3.1.1”包需要 NuGet 客户端版本“2.8.1”或更高版本,但当前的 NuGet 版本是'2.8.60318.667'。

c# system.reactive visual-studio-2012

1
推荐指数
1
解决办法
989
查看次数

Rx – 与超时不同?

我想知道是否有任何方法可以在 .NET 的 Reactive Extensions 中实现 Distinct,使其在给定的时间内工作,并且在此之后它应该重置并允许再次返回值。我需要这个应用程序中的热源,该应用程序将全年工作,现在停止,所以我担心性能,我需要在一段时间后允许这些值。还有 DistinctUntilChanged 但在我的情况下,值可以混合 - 例如:AAXA,DistinctUntilChanged 会给我 AXA,我需要结果 AX,并且在给定时间后应该重置 distinct。

.net c# timeout distinct system.reactive

1
推荐指数
1
解决办法
718
查看次数

Hot observable 和 IDisposable

我想找到关于热可观察和 IDisposable 对象作为事件类型的最佳实践。

假设我的代码将 Bitmap 对象生成为热可观察对象,并且我有多个订阅者。例如:

    public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler)
    {
        return Directory.GetFiles(path, "*.bmp")
            .ToObservable(scheduler)
            .Select(x => new Bitmap(x))
            .Publish()
            .RefCount();
    }

public void Main()
{
    var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
    var process1 = images.Subscribe(SaveBwImages);
    var process2 = images.Subscribe(SaveScaledImages);
    var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
}
Run Code Online (Sandbox Code Playgroud)

所以问题是:处理作为热 observable 来源的一次性资源的最佳实践是什么?

在这个例子中,我想在使用后处理图像,但我不知道 - 究竟是什么时候?

订阅事件的调用顺序并不明显,因此我无法处理“最后一个”事件。

提前致谢。

c# system.reactive rx.net

1
推荐指数
1
解决办法
363
查看次数

取消使用异步函数创建的可观察对象

我是 rx 的新手,并且一直在使用 dot net 中的反应式扩展来处理一些网络代码。我的问题是,当我通过我提供的令牌触发取消时,我使用异步函数创建的 tcpClients 的 observable 没有像我预期的那样完成。这是我遇到问题的代码的简化版本:

public static class ListenerExtensions
{
    public static IObservable<TcpClient> ToListenerObservable(
        this IPEndPoint endpoint,
        int backlog)
    {
        return new TcpListener(endpoint).ToListenerObservable(backlog);
    }

    public static IObservable<TcpClient> ToListenerObservable(
        this TcpListener listener,
        int backlog)
    {
        return Observable.Create<TcpClient>(async (observer, token) => 
        {
            listener.Start(backlog);

            try
            {
                while (!token.IsCancellationRequested)
                    observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
                //This never prints and onCompleted is never called.
                Console.WriteLine("Completing..");
                observer.OnCompleted();
            }
            catch (System.Exception error)
            {
                observer.OnError(error);   
            }
            finally
            {
                //This is never executed and my …
Run Code Online (Sandbox Code Playgroud)

c# task system.reactive async-await .net-core

1
推荐指数
1
解决办法
1001
查看次数

如何使用RxNET发出异步HTTP请求?

目前我在Observable.Create()中使用await.ExecuteRequestAsync是一个调用HttpClient.GetAsync方法(String)的包装类

    public IObservable<IList<ExampleResponseModel>> ListExamplesRx()
    {
        return Observable.Create<IList<ExampleResponseModel>>(
            o =>
            {
                try
                {
                    var url = string.Format(Routes.Examples.List);
                    IList<ExampleResponseModel> exampleResponse = await ExecuteRequestAsync<IList<ExampleResponseModel>>(url, HttpMethod.Get);
                    o.OnNext(exampleResponse);
                    o.OnCompleted();
                }
                catch (Exception e)
                {
                    o.OnError(e);
                }
                return Disposable.Empty;
            }
        );
    }
Run Code Online (Sandbox Code Playgroud)

这是最好的做法吗?有没有更合适的rx解决方案?

c# system.reactive

1
推荐指数
1
解决办法
328
查看次数

与 flatMapIterable 等效的 Rx.NET 是什么?

今天我终于偶然发现了一个非常简单的 RX 问题的解决方案:假设你有一个 Observable,它返回项目列表。喜欢Observable<List<String>>。您经常会从 Web API 收到类似这样的响应。

但是,您可能希望对单个项目进行操作,在本例中是字符串。

flatMapIterable来救援!这个方便的运算符通过映射函数Iterables将一个流扁平化为从这些单个项目生成的流Iterables

RxJava:扁平化迭代流 | 斯文·本德尔

如果这很重要,我正在用 .NET Core 编写。

实施例在RxJava:转换Observable<List<Car>>到的序列Observable<Car>中RxJava

c# system.reactive

1
推荐指数
1
解决办法
924
查看次数

可观察订阅未触发

我开始研究 Reactive Extensions 以及如何将它们应用于常见场景以启用更易于管理和可读的代码。

我现在正在研究基本概念,并构建了一个简单的类:

public class ValidatableObject<TValue>
{
    public bool IsValid { get; private set; } = true;
    public TValue Value { get; }
    public ICollection<IValidationRule<TValue>> Rules { get; }

    public ValidatableObject(TValue value)
    {
        Value = value;
        Rules = new List<IValidationRule<TValue>>();
        Rules.ToObservable()
             .All(rule => rule.Check(Value))
             .Subscribe(b => IsValid = b);
    }
}

public interface IValidationRule<T>
{
    bool Check(T value);
}

public class FailingValidationRule<T> : IValidationRule<T>
{
    public bool Check(T value) => false;
}

public static void main()
{
    var …
Run Code Online (Sandbox Code Playgroud)

c# reactive-programming system.reactive

1
推荐指数
1
解决办法
985
查看次数

Rx 扩展刷新 Buffered observable 中的剩余项目

所以我有这个代码:

ISubject<int> _processed = new ReplaySubject<int>();
_processed.Buffer(5000).Subscribe(UpdateProcessed);

// Start some process which calls _processed.OnNext
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是有时缓冲区没有填满,因为最后一批小于 5000 并且进程在没有调用UpdateProcessed执行的情况下退出。

_processed处理完成后有没有办法刷新observable中剩余的项目?

c# system.reactive

1
推荐指数
1
解决办法
133
查看次数

为什么 C# Rx Subscribe() 函数不适用于“async”关键字?

我有这个代码片段:

static void Main(string[] args)
{
    Observable.Range(1, 5).Subscribe(async x => await DoTheThing(x));
    Console.WriteLine("done");
}

static async Task DoTheThing(int x)
{
    await Task.Delay(TimeSpan.FromSeconds(x));
    Console.WriteLine(x);
}
Run Code Online (Sandbox Code Playgroud)

我希望它会循环 5 次,每次循环后都会有一行打印为

1
2
3
4
5
Run Code Online (Sandbox Code Playgroud)

但令人惊讶的是,这将打印“完成”并立即终止。似乎 async+await 没有等待 Task.Delay 并退出。

语义似乎没有问题,那么我在 Subscribe 或 async 哪里出错了,如何修复它以满足我从 Rx 调用异步任务的请求?

谢谢。

c# asynchronous subscribe system.reactive async-await

1
推荐指数
1
解决办法
280
查看次数