我正在尝试建模一个不简单的Rx查询(对我而言):
每个人都有以下属性:
class Man
{
public const int LookingAtNobody = 0;
public int Id { get; set; }
public double Location { get; set; }
public int LookingAt { get; set; }
}
Run Code Online (Sandbox Code Playgroud)每个女人都有以下特性:
class Woman
{
public int Id { get; set; }
public double Location { get; set; }
}
Run Code Online (Sandbox Code Playgroud)代表我们拥有的男人IObservable<IObservable<Man>>,代表我们拥有的女人IObservable<IObservable<Woman>>.
你如何使用Rx生成从男人到他们正在看的女人的载体:IObservable<IObservable<Tuple<double,double>>>?
为了帮助,这里有一些针对一些简单情况的单元测试:
public class Tests : ReactiveTest
{
[Test]
public void Puzzle1()
{
var scheduler = new TestScheduler(); …Run Code Online (Sandbox Code Playgroud) 我在另一个主题中对这个主题进行了一次小型讨论,并希望人们对主题的"坏"方面提出意见.
频繁参与RX论坛的人都知道E.Meijer不喜欢受试者.虽然我对RX创作者的观点有最深刻的敬意,但我已经在多个项目中广泛使用了Subjects几年,并且因为它们没有任何架构问题或错误.
我可以命名的主题的唯一"陷阱"是它们不是"可重用的" - 在完成主题上的可观察对象之后,您需要在新订阅者可以从其接收事件之前重新实例化它.
"代码味道"和"不喜欢它们"需要"实用"示例支持 - 您是否可以引起我们注意使用主题可能导致错误或问题的可能情况?或许你认为它们完全是容易和无害的 - 然后尝试定义它们将被使用的区域.
我正在尝试使用Rx在wpf中实现标准的拖放图像.
var mouseDown = from evt in Observable.FromEventPattern<MouseButtonEventArgs>(image, "MouseLeftButtonDown")
select evt.EventArgs.GetPosition(image);
var mouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseLeftButtonUp");
var mouseMove = from evt in Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
select evt.EventArgs.GetPosition(this);
var q = from startLocation in mouseDown
from endLocation in mouseMove.TakeUntil(mouseUp)
select new Point
{
X = endLocation.X - startLocation.X,
Y = endLocation.Y - startLocation.Y
};
q.ObserveOn(SynchronizationContext.Current).Subscribe(point =>
{
Canvas.SetLeft(image, point.X);
Canvas.SetTop(image, point.Y);
});
Run Code Online (Sandbox Code Playgroud)
我收到错误错误 Cannot convert lambda expression to type 'System.IObserver<System.Windows.Point>' because it is not a delegate type
我错过了什么?
我对这些高级并发范例很陌生,我已经开始使用scala RX绑定了.所以我试图理解RX与RabbitMQ或ZeroMQ等消息队列的区别?
他们似乎都使用订阅/发布范例.在某个地方,我看到一条关于RX在RabbitMQ上运行的推文.
有人可以解释RX和消息队列之间的差异吗?为什么我会选择一个而不是另一个?可以用一个替代另一个,还是互相排斥?它们在哪些区域重叠?
我正在研究使用Observable.Generate来创建一系列结果,这些结果是以msdn网站上的示例为起点间隔采样的.
以下代码没有TimeSpan选择器不会出现内存泄漏:
IObservable<string> obs = Observable.Generate(initialState: 1,
condition: x => x < 1000,
iterate: x => x + 1,
resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)
但是,以下代码与TimeSpan选择器显示内存泄漏:
TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
condition: x => x < 1000,
iterate: x => x + 1,
resultSelector: x => x.ToString(),
timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)
例如,这个玩具应用程序将使用VS 2015社区附带的Memory Profiler快速显示内存泄漏:
using System;
using System.Reactive.Linq;
namespace Sample
{
public class Program
{
static void Main()
{
IObservable<string> obs = …Run Code Online (Sandbox Code Playgroud) 我一直在玩Reactive Extension一段时间,但主要限于处理/编写WPF前端内的用户驱动事件.
这是一种强大的,新的异步编程方式,我很好奇其他人正在做什么,你认为它可以改善我们目前做事的方式?
我正在使用反应式扩展将数据整理到100毫秒的缓冲区:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
Run Code Online (Sandbox Code Playgroud)
这很好用.但是,我想要的行为与Buffer操作提供的行为略有不同.基本上,如果收到另一个数据项,我想重置计时器.只有当整个100毫秒没有收到数据时我才能处理它.这开启了永不处理数据的可能性,因此我还应该能够指定最大计数.我会想象一下:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
Run Code Online (Sandbox Code Playgroud)
我已经环顾四周,在Rx中找不到这样的东西?任何人都可以确认/否认这个吗?
我想要做的是确保如果对我的观察者的唯一引用是可观察的,它会被垃圾收集并停止接收消息.
假设我有一个带有列表框的控件,名为Messages,后面是这个代码:
//Short lived display of messages (only while the user's viewing incoming messages)
public partial class MessageDisplay : UserControl
{
public MessageDisplay()
{
InitializeComponent();
MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
}
}
Run Code Online (Sandbox Code Playgroud)
哪个连接到此来源:
//Long lived location for message store
static class MySource
{
public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}
Run Code Online (Sandbox Code Playgroud)
我不想要的是让消息显示器在不再可见后很长时间内保存在内存中.理想情况下,我想要一点延伸,所以我可以写:
MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));
Run Code Online (Sandbox Code Playgroud)
我也不想依赖MessageDisplay是一个用户控件的事实,因为我稍后想要使用MessageDisplayViewModel进行MVVM设置,这不是用户控件.
因此,在C#4.0的悲伤时期,我创建了以下"WorkflowExecutor"类,它允许在GUI线程中通过入侵IEnumerable的"yield return"延续来等待可观察的异步工作流.因此,以下代码将在button1Click处启动一个简单的工作流程来更新文本,等待您单击button2,并在1秒后循环.
public sealed partial class Form1 : Form {
readonly Subject<Unit> _button2Subject = new Subject<Unit>();
readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();
public Form1() {
InitializeComponent();
}
IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
Text = "Initializing";
var scheduler = new ControlScheduler(this);
while (true) {
yield return scheduler.WaitTimer(1000);
Text = "Waiting for Click";
yield return _button2Subject;
Text = "Click Detected!";
yield return scheduler.WaitTimer(1000);
Text = "Restarting";
}
}
void button1_Click(object sender, EventArgs e) {
_workflowExecutor.Run(CreateAsyncHandler());
}
void button2_Click(object sender, EventArgs e) {
_button2Subject.OnNext(Unit.Default);
} …Run Code Online (Sandbox Code Playgroud) system.reactive ×10
c# ×7
.net ×2
.net-4.5 ×1
async-await ×1
asynchronous ×1
buffer ×1
c#-5.0 ×1
lambda ×1
linq ×1
rabbitmq ×1
wpf ×1
zeromq ×1