标签: system.reactive

带有UDPClient.EndReceive和ref远程端点参数的Observable.FromAsyncPattern

我正在学习反应式扩展并试图重新考虑我的一些代码.

UDPClient.EndReceive有一个ref IPEndPoint参数,所以我目前有这个工作:

UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.ASCII.GetString(x)));
Run Code Online (Sandbox Code Playgroud)

如果我的订阅者需要访问远程IPEndPoint怎么办?在我目前的化身中,我正在使用事件,并传回一个包装byte[]和自定义类IPEndPoint.我无法为我的生活,研究如何用Rx做到这一点.

c# asynchronous system.reactive

2
推荐指数
1
解决办法
1841
查看次数

为什么我不需要在这个冷的可观察面上发布?

既然我在Observable这里感冒了,我多次订阅"分组",为什么我不需要在这里发布?当我运行它时,我会期望它会带来不必要的结果,但令我惊讶的是它可以使用和不使用Publish.这是为什么?

var subject = new List<string>
    {                            
    "test",                        
    "test",                 
    "hallo",
    "test",
    "hallo"
    }.ToObservable();
subject
    .GroupBy(x => x)
    .SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
         .Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
    .Subscribe(result => Console.WriteLine("You typed {0} {1} times", 
         result.Chars, result.Count));

// I Would have expect that I need to use Publish like that
//subject
//   .GroupBy(x => x)
//   .SelectMany(grouped => grouped.Publish(sharedGroup => 
//       sharedGroup.Scan(0, (count, _) => ++count)
//       .Zip(sharedGroup, (count, …
Run Code Online (Sandbox Code Playgroud)

c# reactive-programming system.reactive observable

2
推荐指数
1
解决办法
456
查看次数

如何观察物品的集合是否有效?

我正在使用ReactiveUI和提供的ReactiveCollection<>类.

在ViewModel中我有一组对象,我希望创建一个observable来监视这些项目的IsValid属性.

这是我想要解决的方案.在我的ViewModel的构造函数中.

this.Items = new ReactiveCollection<object>();

IObservable<bool> someObservable = // ... how do I watch Items so when 
                                   // any items IsValid property changes, 
                                   // this observable changes. There
                                   // is an IValidItem interface.

this.TheCommand = new ReactiveCommand(someObservable);

...

interface IValidItem { bool IsValid { get; } }
Run Code Online (Sandbox Code Playgroud)

编辑保罗的回答让我大部分都在那里.解决方案如下.

this.Items = new ReactiveCollection<object>();
this.Items.ChangeTrackingEnabled = true;

var someObservable = this.Items.Changed
    .Select(_ => this.Items.All(i => i.IsValid));
Run Code Online (Sandbox Code Playgroud)

system.reactive reactiveui

2
推荐指数
1
解决办法
2204
查看次数

找不到Rx任务ToObservable()

我正在使用Visual Studio 2013和.NET framework 4.5.1版

我正在阅读有关Reactive Extensions的教程,我正在尝试运行此代码:

var t = Task.Factory.StartNew(() => "Test");
var source = t.ToObservable();
Run Code Online (Sandbox Code Playgroud)

t.ToObservable() 加下划线并给我错误

System.Threading.Tasks.Task <.string>不包含ToObservable的定义...

我在另一篇文章中读到,我应该下载Silverlight Toolkit并将System.Reactive.dll其作为参考包含在内,但这并没有帮助.

我该怎么做才能解决这个问题?

.net c# task system.reactive

2
推荐指数
1
解决办法
774
查看次数

将轮询Web服务转换为RX

鉴于:

public partial class Weather
{
    private readonly DispatcherTimer _timer = new DispatcherTimer();
    private readonly IWeatherDataProvider _weatherDataProvider;

    public Weather(IWeatherDataProvider weatherDataProvider)
    {
        InitializeComponent();
        _weatherDataProvider = weatherDataProvider;
        Loaded += async (sender, args) =>
        {
            _timer.Interval = new TimeSpan(0, 15, 0);
            _timer.Tick += async (o, eventArgs) => DataContext = await UpdateWeather();
            _timer.Start();
            DataContext = await UpdateWeather();
        };

        Unloaded += (sender, args) => _timer.Stop();
    }

    private async Task<WeatherData> UpdateWeather()
    {
        var weatherData = await _weatherDataProvider.GetWeather();
        return weatherData;
    }
}
Run Code Online (Sandbox Code Playgroud)

我很困惑如何转换这个使用Reactive Extensions.我开始时:

var weather = …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

2
推荐指数
1
解决办法
946
查看次数

反应性扩展:Zip运算符但不同

这不是现实生活中的例子(这段代码可能无法编译),但我试图让它比我实际拥有的问题简单一些.

假设我有图像集:

private void IEnumerable<Image> GetImages()
{
    foreach (var filename in GetFilenames())
    {
         yield return Image.LoadFile(filename);
    }
}
Run Code Online (Sandbox Code Playgroud)

我想显示用户按'空格'驱动的幻灯片:

var images = Observable.FromEvent(form, "KeyPress")
  .Where(e => e.KeyCode == KeyCode.Space)
  .Zip(GetImages.ToObservable(), (k, i) => i);
Run Code Online (Sandbox Code Playgroud)

而这种作品.当按下空间时,它会发出下一个图像.问题是它实际上是以全速加载它们,因此它们被缓冲并消耗大量内存(加载时的处理能力).我可以将过滤后的按键输入到GetImages中并在那里进行压缩,但我不会保留GetImages的纯度.

有没有办法防止枚举.ToObservable()如果不需要提前枚举?

另一个例子(这个将编译):

var observable = 
    Observable.Interval(TimeSpan.FromSeconds(1))
    .Zip(
        Observable.Range(0, 1000000).Do(x => Console.WriteLine("produced {0}", x)), 
        (_, v) => v
    );

var subscription = observable.Subscribe(x => Console.WriteLine("consumed {0}", x));

Console.WriteLine("Press <enter>...");
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)

它将产生大量"生产"(提前),但每秒仅消耗一个"消耗".

c# system.reactive

2
推荐指数
1
解决办法
219
查看次数

Observable.Using()取消

我有一个由Using帮助者制作的观察结果:

var o = Observable.Using(
 () => { 
          return new MyResource 
       },
 res => {
          return new Observable.Create<string>(observer => ....);
        });
Run Code Online (Sandbox Code Playgroud)

如何取消观察?并确保MyResource被处理掉?

我看到有一个Observable.Using( )包括a cancellationToken,但签名是如此不同,我无法使其工作......

更新: 詹姆斯指出,通过处置可观察量,我的资源也将被处理掉.在我的情况下,一个简单的处置是不够的.我需要先在资源上调用一个方法.怎么归档?

.net c# system.reactive

2
推荐指数
1
解决办法
876
查看次数

使用Rx为webservice调用创建轮询请求

在C#中使用Rx我正在尝试创建REST API的轮询请求.我面临的问题是,Observable需要按顺序发送回复.意味着如果请求A在X时间进行并且请求B在X + dx时间进行并且B的响应在A之前出现,则Observable表达式应该忽略或取消请求A.

我编写了一个示例代码,试图描述该场景.我如何解决它只获得最新的响应,取消或忽略以前的响应.

 class Program
    {
        static int i = 0;

        static void Main(string[] args)
        {
            GenerateObservableSequence();

            Console.ReadLine();
        }

        private static void GenerateObservableSequence()
        {
            var timerData = Observable.Timer(TimeSpan.Zero,
                TimeSpan.FromSeconds(1));

            var asyncCall = Observable.FromAsync<int>(() =>
            {
                TaskCompletionSource<int> t = new TaskCompletionSource<int>();
                i++;

                int k = i;
                var rndNo = new Random().Next(3, 10);
                Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
                return t.Task;
            });

            var obs = from t in timerData
            from data in asyncCall
            select data;

            var hot = obs.Publish();
            hot.Connect();

                hot.Subscribe(j …
Run Code Online (Sandbox Code Playgroud)

c# recursion polling reactive-programming system.reactive

2
推荐指数
1
解决办法
464
查看次数

Reactive Extensions:使用适用于文件的Rx创建管道

我有一个包含3个步骤的流程管道:

  1. 视频到图像:我有一个视频转换为静止图像(帧)
  2. zip文件的帧:当视频中的所有帧都已处理完毕后,我应该用它们创建一个Zip文件.
  3. zip文件=>上传到FTP

它涉及2个一次性用品:视频捕获和zip文件.

我怎么能用Rx处理它?有任何想法吗?很抱歉没有发布任何代码,我不知道如何开始.

提前致谢!

.net c# idisposable ziparchive system.reactive

2
推荐指数
1
解决办法
550
查看次数

为什么我需要在完成后处理订阅?

在介绍到RX本书描述了OnSubscribe作为返回值IDisposible,并指出,订阅应该布置时OnErrorOnCompleted被调用.

一个有趣的事情是,当一个序列完成或错误时,你仍然应该处理你的订阅.

Intro到RX:Lifetime Management,OnError和OnCompleted

为什么是这样?


作为参考,这是我目前正在研究的课程.我可能会在某些时候将其提交给代码审查.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
    private readonly Action onTimeout;
    private object signalLock = new object();
    private IObserver<Unit> signals;

    /// <summary>
    /// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
    /// </summary>
    /// <param name="timeout">The duration to wait after receiving signals …
Run Code Online (Sandbox Code Playgroud)

idisposable object-lifetime reactive-programming system.reactive reactive

2
推荐指数
1
解决办法
1559
查看次数