我想出了这个解决方案。(尚未测试)通过网络上的大量弹跳。
Private Function ObserveUDP() As IObservable(Of bytes())
Dim f = Function(observer)
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Dim client = New UdpClient(endpoint)
Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
( Nothing _
, Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
, Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
, Function(task As Task(Of UdpReceiveResult)) task.Result)
Dim observable = obs.Select(Function(r) r.Buffer)
dim handle = observable.Subscribe(observer)
Dim df = Sub()
client.Close()
handle.Dispose()
End Sub
Return Disposable.Create(df)
End Function …Run Code Online (Sandbox Code Playgroud) 我有一个由大量事件填充的可观察集合。我想从 EventArg 中获取信息,按名称对其进行分组,然后为每个名称选择最大日期。我试过这个:
_subscription = Observable
.FromEventPattern<NewLoanEventHandler, NewLoanEventArgs>(
h => loan.NewLoanEvent += h,
h => loan.NewLoanEvent -= h)
.Select(a => a.EventArgs.Counterpatry)
.GroupBy(c => c.Name)
.SelectMany(grp => grp.Max( c => c.MaturityDate ).Select( maturity => new {grp.Key, maturity}) )
.Subscribe(
i => Console.WriteLine("{0} --> {1}", i.Key, i.maturity),
Console.WriteLine,
() => Console.WriteLine("completed")
);
Run Code Online (Sandbox Code Playgroud)
我认为它可能会做我想做的事,但订阅永远不会完成:我永远不会收到完整的消息,也没有得到任何输出。也就是说,我怀疑,因为 Observable 仍在等待更多事件。我如何告诉它停止等待并给我我的输出?
我的问题是:对于给定的事件序列,我想缓存它们的值,直到流中出现暂停。然后,我将批量处理所有缓存数据并清除缓存状态。
一种天真的方法是(不是工作代码,可能存在一些错误):
struct FlaggedData
{
public EventData Data { get; set; }
public bool Reset { get; set; }
}
...
IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();
var flaggedDataStream = eventsStream
.Select(data => new FlaggedData { Data = data })
.Merge(resetSignal)
.Scan(
new List<EventData>(),
(cache, flaggedData) =>
{
if (!flaggedData.Reset())
{
cache.Add(flaggedData.Data);
return cache;
}
return new List<EventData>();
})
.Throttle(SomePeriodOfTime)
.Subscribe(batch =>
{
resetSignal.OnNext(new FlaggedData { Reset = true});
ProcessBatch(batch);
});
Run Code Online (Sandbox Code Playgroud)
所以在这里,在收到任何要处理的批处理后,我请求重置缓存。问题是因为Throttle缓存中可能有一些数据(或者我相信),在这种情况下会丢失。
我想要的是一些操作,如:
ScanWithThrottling<TAccumulate, TSource>( …Run Code Online (Sandbox Code Playgroud) 我正在从 C# 移植一些严重依赖 Rx 的代码,而且我很难找到一些最常用的 C# 方法的 C++ 等价物。
特别是,我想从订阅/取消订阅逻辑创建一个 observable。在 C# 中,我使用Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>)覆盖来创建一个 observable。例如
var observable = Observable.Create<int>(observer =>
{
observers.Add(observer);
return () =>
{
observers.Remove(observer)
};
});
Run Code Online (Sandbox Code Playgroud)
是否可以用RxCpp做同样的事情?我认为答案在于rx::observable<>::create(OnSubscribe os)方法,但我不知道如何使用它来“注册”取消订阅的 lambda。
使用案例:我正在编写一个监视更改并自动保存的内容.我想要节流,以便我不会比每五秒钟更多地保存.如果有连续的变化流,我想每30秒保存一次.
无法在文档中找到observable.Throttle(mergeTime,maxTime),只能想到编写自己的丑陋方式,因此这个问题.
我想检查是否System.Reactive.IObserable<TElement>包含任何元素.该Any()扩展方法没用,因为它期待返回IObserable<bool>,而不只是布尔的.
我是反应式世界的新手,我仍在努力学习。为了练习,我决定编写一个非常简单的 WPF 倒数计时器应用程序。基本上,这就是我想要做的:
我正在尝试使用 ReactiveUI 来实现这一点。以下是我到目前为止...
XAML:
<Window x:Class="ReactiveTimer.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:local="clr-namespace:ReactiveTimer"
mc:Ignorable="d"
Title="MainWindow" Height="350" Width="525">
<Window.DataContext>
<local:MainViewModel/>
</Window.DataContext>
<Grid HorizontalAlignment="Center" VerticalAlignment="Center">
<Grid.RowDefinitions>
<RowDefinition/>
<RowDefinition Height="Auto"/>
</Grid.RowDefinitions>
<TextBlock FontSize="45" FontWeight="Bold">
<TextBlock.Text>
<MultiBinding StringFormat="{}{0:00}:{1:00}">
<Binding Path="RemainingTime.Minutes"/>
<Binding Path="RemainingTime.Seconds"/>
</MultiBinding>
</TextBlock.Text>
</TextBlock>
<Button Command="{Binding StartCommand}" Content="Start" Grid.Row="1"/>
</Grid>
Run Code Online (Sandbox Code Playgroud)
视图模型:
public interface IMainViewModel
{
TimeSpan RemainingTime { get; }
ICommand StartCommand { get; }
}
public class MainViewModel : ReactiveObject, IMainViewModel
{
const double InitialTimeInSeconds = 10; …Run Code Online (Sandbox Code Playgroud) 我有一个像这样的Observable列表:
List<Observable<MyObj>> listObservables = new ArrayList<Observable<MyObj>>();
Run Code Online (Sandbox Code Playgroud)
我想将所有Observable组合在一个中,如果我知道Observable的使用数量,我可以处理它zip(),例如我们有3个Observable:
Observable<MyObj1> obs1= MyRestClient.getSomeData1();
Observable<MyObj2> obs2= MyRestClient.getSomeData2();
Observable<MyObj3> obs3= MyRestClient.getSomeData3();
Run Code Online (Sandbox Code Playgroud)
我有一个包装器obj:
class MyWrapperObj {
private MyObj1 onj1;
private MyObj2 onj2;
private MyObj3 onj3;
public MyWrapperObj(MyObj1 onj1, MyObj2 onj2, MyObj3 onj3) {
this.onj1 = onj1;
this.onj2 = onj2;
this.onj3 = onj3;
}
}
Run Code Online (Sandbox Code Playgroud)
所以我可以这样组合它们:
Observable<MyWrapperObj> combinedObservable = Observable.zip(obs1, obs2, obs3, new Func3<MyObj1, MyObj2, MyObj3, MyWrapperObj>() {
@Override
public MyWrapperObj call(MyObj1 obj1, MyObj2 obj2, MyObj3 obj3) {
return new MyWrapperObj(obj1, obj2, obj3);
}
}); …Run Code Online (Sandbox Code Playgroud) 我的目标是从源可观察源创建一组observable,以便我可以单独订阅它们.
当我手动执行此操作(即手动创建每个子源)时,事情按预期工作:添加到原始源的值充分传播到子源.
但是当我在循环中创建它们,将它们添加到a时List<IObservable<T>>,从该列表中获取的元素的订阅似乎不起作用:
class Program
{
static void Main(string[] args)
{
// using Subject for the sake of example
var source = new Subject<int>();
// manually creating each subSource
var source0 = source.Where((t, i) => i % 3 == 0);
var source1 = source.Where((t, i) => i % 3 == 1);
var source2 = source.Where((t, i) => i % 3 == 2);
// creating a List of subsources
List<IObservable<int>> sources = new List<IObservable<int>>();
int count = 3;
for …Run Code Online (Sandbox Code Playgroud) 我想知道如何单元测试序列Observable.Never.
我怎么能确定没有任何东西被推到序列中?
system.reactive ×10
c# ×6
.net ×3
events ×3
wpf ×2
android ×1
c++ ×1
dispose ×1
observable ×1
reactiveui ×1
retrofit2 ×1
rx-android ×1
rx-java ×1
rxcpp ×1
throttling ×1
udp ×1
vb.net ×1