我正在学习反应式扩展并试图重新考虑我的一些代码.
UDPClient.EndReceive有一个ref IPEndPoint参数,所以我目前有这个工作:
UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));
IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.ASCII.GetString(x)));
Run Code Online (Sandbox Code Playgroud)
如果我的订阅者需要访问远程IPEndPoint怎么办?在我目前的化身中,我正在使用事件,并传回一个包装byte[]和自定义类IPEndPoint.我无法为我的生活,研究如何用Rx做到这一点.
既然我在Observable这里感冒了,我多次订阅"分组",为什么我不需要在这里发布?当我运行它时,我会期望它会带来不必要的结果,但令我惊讶的是它可以使用和不使用Publish.这是为什么?
var subject = new List<string>
{
"test",
"test",
"hallo",
"test",
"hallo"
}.ToObservable();
subject
.GroupBy(x => x)
.SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
.Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
.Subscribe(result => Console.WriteLine("You typed {0} {1} times",
result.Chars, result.Count));
// I Would have expect that I need to use Publish like that
//subject
// .GroupBy(x => x)
// .SelectMany(grouped => grouped.Publish(sharedGroup =>
// sharedGroup.Scan(0, (count, _) => ++count)
// .Zip(sharedGroup, (count, …Run Code Online (Sandbox Code Playgroud) 我正在使用ReactiveUI和提供的ReactiveCollection<>类.
在ViewModel中我有一组对象,我希望创建一个observable来监视这些项目的IsValid属性.
这是我想要解决的方案.在我的ViewModel的构造函数中.
this.Items = new ReactiveCollection<object>();
IObservable<bool> someObservable = // ... how do I watch Items so when
// any items IsValid property changes,
// this observable changes. There
// is an IValidItem interface.
this.TheCommand = new ReactiveCommand(someObservable);
...
interface IValidItem { bool IsValid { get; } }
Run Code Online (Sandbox Code Playgroud)
编辑保罗的回答让我大部分都在那里.解决方案如下.
this.Items = new ReactiveCollection<object>();
this.Items.ChangeTrackingEnabled = true;
var someObservable = this.Items.Changed
.Select(_ => this.Items.All(i => i.IsValid));
Run Code Online (Sandbox Code Playgroud) 我正在使用Visual Studio 2013和.NET framework 4.5.1版
我正在阅读有关Reactive Extensions的教程,我正在尝试运行此代码:
var t = Task.Factory.StartNew(() => "Test");
var source = t.ToObservable();
Run Code Online (Sandbox Code Playgroud)
t.ToObservable() 加下划线并给我错误
System.Threading.Tasks.Task <.string>不包含ToObservable的定义...
我在另一篇文章中读到,我应该下载Silverlight Toolkit并将System.Reactive.dll其作为参考包含在内,但这并没有帮助.
我该怎么做才能解决这个问题?
鉴于:
public partial class Weather
{
private readonly DispatcherTimer _timer = new DispatcherTimer();
private readonly IWeatherDataProvider _weatherDataProvider;
public Weather(IWeatherDataProvider weatherDataProvider)
{
InitializeComponent();
_weatherDataProvider = weatherDataProvider;
Loaded += async (sender, args) =>
{
_timer.Interval = new TimeSpan(0, 15, 0);
_timer.Tick += async (o, eventArgs) => DataContext = await UpdateWeather();
_timer.Start();
DataContext = await UpdateWeather();
};
Unloaded += (sender, args) => _timer.Stop();
}
private async Task<WeatherData> UpdateWeather()
{
var weatherData = await _weatherDataProvider.GetWeather();
return weatherData;
}
}
Run Code Online (Sandbox Code Playgroud)
我很困惑如何转换这个使用Reactive Extensions.我开始时:
var weather = …Run Code Online (Sandbox Code Playgroud) 这不是现实生活中的例子(这段代码可能无法编译),但我试图让它比我实际拥有的问题简单一些.
假设我有图像集:
private void IEnumerable<Image> GetImages()
{
foreach (var filename in GetFilenames())
{
yield return Image.LoadFile(filename);
}
}
Run Code Online (Sandbox Code Playgroud)
我想显示用户按'空格'驱动的幻灯片:
var images = Observable.FromEvent(form, "KeyPress")
.Where(e => e.KeyCode == KeyCode.Space)
.Zip(GetImages.ToObservable(), (k, i) => i);
Run Code Online (Sandbox Code Playgroud)
而这种作品.当按下空间时,它会发出下一个图像.问题是它实际上是以全速加载它们,因此它们被缓冲并消耗大量内存(加载时的处理能力).我可以将过滤后的按键输入到GetImages中并在那里进行压缩,但我不会保留GetImages的纯度.
有没有办法防止枚举.ToObservable()如果不需要提前枚举?
另一个例子(这个将编译):
var observable =
Observable.Interval(TimeSpan.FromSeconds(1))
.Zip(
Observable.Range(0, 1000000).Do(x => Console.WriteLine("produced {0}", x)),
(_, v) => v
);
var subscription = observable.Subscribe(x => Console.WriteLine("consumed {0}", x));
Console.WriteLine("Press <enter>...");
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)
它将产生大量"生产"(提前),但每秒仅消耗一个"消耗".
我有一个由Using帮助者制作的观察结果:
var o = Observable.Using(
() => {
return new MyResource
},
res => {
return new Observable.Create<string>(observer => ....);
});
Run Code Online (Sandbox Code Playgroud)
如何取消观察?并确保MyResource被处理掉?
我看到有一个Observable.Using( )包括a cancellationToken,但签名是如此不同,我无法使其工作......
更新: 詹姆斯指出,通过处置可观察量,我的资源也将被处理掉.在我的情况下,一个简单的处置是不够的.我需要先在资源上调用一个方法.怎么归档?
在C#中使用Rx我正在尝试创建REST API的轮询请求.我面临的问题是,Observable需要按顺序发送回复.意味着如果请求A在X时间进行并且请求B在X + dx时间进行并且B的响应在A之前出现,则Observable表达式应该忽略或取消请求A.
我编写了一个示例代码,试图描述该场景.我如何解决它只获得最新的响应,取消或忽略以前的响应.
class Program
{
static int i = 0;
static void Main(string[] args)
{
GenerateObservableSequence();
Console.ReadLine();
}
private static void GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
var obs = from t in timerData
from data in asyncCall
select data;
var hot = obs.Publish();
hot.Connect();
hot.Subscribe(j …Run Code Online (Sandbox Code Playgroud) 我有一个包含3个步骤的流程管道:
它涉及2个一次性用品:视频捕获和zip文件.
我怎么能用Rx处理它?有任何想法吗?很抱歉没有发布任何代码,我不知道如何开始.
提前致谢!
在介绍到RX本书描述了OnSubscribe作为返回值IDisposible,并指出,订阅应该布置时OnError和OnCompleted被调用.
一个有趣的事情是,当一个序列完成或错误时,你仍然应该处理你的订阅.
为什么是这样?
作为参考,这是我目前正在研究的课程.我可能会在某些时候将其提交给代码审查.
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
private readonly Action onTimeout;
private object signalLock = new object();
private IObserver<Unit> signals;
/// <summary>
/// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
/// </summary>
/// <param name="timeout">The duration to wait after receiving signals …Run Code Online (Sandbox Code Playgroud) idisposable object-lifetime reactive-programming system.reactive reactive
system.reactive ×10
c# ×8
.net ×3
idisposable ×2
asynchronous ×1
observable ×1
polling ×1
reactive ×1
reactiveui ×1
recursion ×1
task ×1
ziparchive ×1