标签: reactive-programming

Reactive Framework/DoubleClick

我知道有一种简单的方法可以做到这一点 - 但它今晚打败了我......

我想知道两个事件是否在300毫秒之内发生,就像双击一样.

在300毫秒内点击两次左击鼠标 - 我知道这就是反应框架的构建 - 但是如果我能找到一个包含所有扩展操作系统的简单示例的好文档 - Throttle,BufferWithCount,BufferWithTime - 所有这些都不是'为我做这件事......

reactive-programming system.reactive

2
推荐指数
1
解决办法
862
查看次数

Reactive NSMutableDictionary?

如何使用ReactiveCocoa订阅要从NSMutableDictionary添加和删除的对象?此外,我想在更改时广播通知.我的猜测是可以使用RACMulticastConnection完成广播,但如何将其与字典更改联系起来?我正在尝试在我的项目中第一次使用ReactiveCocoa并坚持我想要做的第一件事:(

multicast objective-c reactive-programming nsmutabledictionary reactive-cocoa

2
推荐指数
1
解决办法
1314
查看次数

F#Rx扩展具有并发事件的IObservable

我有以下使用FSharp.Reactive的F#代码.函数reactToEvents接受两个事件源并生成一个应用程序状态源.然后UI使用应用程序状态

open FSharp.Reactive

let reactToEvents (initialSettings:Settings) 
            (incomingTweets: ITweet IObservable) 
            (uiActions: UserInput IObservable) :  State IObservable = 
    let initialState = { settings = initialSettings; tweets = [] }

    let tweetInput = Observable.map TweetReceived incomingTweets 
    let userInput = Observable.map UserAction uiActions
    let allInput = Observable.merge userInput tweetInput

    Observable.scan transition initialState allInput
Run Code Online (Sandbox Code Playgroud)

在上面的函数中,在UI线程上生成uiActions时,在一个线程上生成incomingTweets事件.

像我上面那样使用Observable.merge合并两个源是否安全?

使用上面的Observable.scan扫描生成的源是否安全?

如果这不正确,那么正确的方法是什么?

谢谢!

更新1:

我希望至少Observable.merge不关心线程.我发现了这个,似乎说它们使用起来并不安全:

http://msdn.microsoft.com/en-us/library/ee353488.aspx

http://msdn.microsoft.com/en-us/library/ee353749.aspx

"对于每个观察者来说,注册的中间观察对象不是线程安全的.也就是说,源不会在不同的线程上同时触发源的观察."

那么正确的方法是什么呢?

更新2:

这是一个混乱.我链接的文档是用于从名称空间Microsoft.FSharp.Control.Observable合并和扫描的函数.这与我在我的代码中使用的Rx不同.对于真正的Rx,您需要使用库FSharp.Reactive和FSharp.Reactive.Observable下的函数.

f# multithreading reactive-programming system.reactive

2
推荐指数
1
解决办法
404
查看次数

当一次足够时,RxJs反应图被调用两次

在下面的示例中,每次onNext调用都会调用map两次,这是不必要的,因为ds的相同值可以重用于两个观察者.如何编写代码,使每个onNext调用只调用一次映射?或者我应该使用诺言?

var subject = new Rx.BehaviorSubject(42);

var ds = subject.map(function(x) {
        console.log("processing");
        return x + 100;
});

var subscription1 = ds.subscribe(
    function (x) {
        console.log('first observed: ' + x.toString());
    }
);

var subscription2 = ds.subscribe(
    function (x) {
        console.log('second observed: ' + x.toString());
    }
);

subject.onNext(56);
subject.onNext(134);
Run Code Online (Sandbox Code Playgroud)

javascript reactive-programming promise rxjs

2
推荐指数
1
解决办法
1853
查看次数

链接两个Observable返回另一个

我有两个名为A <'ModelA'>和B <'ModelB'>的可观察对象.它们中的每一个都对不同的REST服务执行请求,因此它们扩展了如上所述的不同模型.任何人执行的请求都可能失败.现在,我需要能够链接它们并返回一个ModelC对象.所以,伪编码流将是这样的:

一个<'ModelA'>执行请求,如果失败则执行某些操作,如果没有则将其结果(responseModelA)传递给B <'ModelB'>,这样它就可以执行另一个涉及使用responseModelA部分的REST请求.如果B失败会发生某些事情,如果没有,则将其响应(responseModelB)与responseModelA(手动设置POJO字段)组合在一起以创建ModelC,这是订户应该在其call()方法上接收的参数.

使用rxJava可以远程编码吗?我完全坚持这一点,所以我对任何吸烟都持开放态度.

谢谢.

java reactive-programming rx-java retrofit

2
推荐指数
1
解决办法
1792
查看次数

使用Rx为webservice调用创建轮询请求

在C#中使用Rx我正在尝试创建REST API的轮询请求.我面临的问题是,Observable需要按顺序发送回复.意味着如果请求A在X时间进行并且请求B在X + dx时间进行并且B的响应在A之前出现,则Observable表达式应该忽略或取消请求A.

我编写了一个示例代码,试图描述该场景.我如何解决它只获得最新的响应,取消或忽略以前的响应.

 class Program
    {
        static int i = 0;

        static void Main(string[] args)
        {
            GenerateObservableSequence();

            Console.ReadLine();
        }

        private static void GenerateObservableSequence()
        {
            var timerData = Observable.Timer(TimeSpan.Zero,
                TimeSpan.FromSeconds(1));

            var asyncCall = Observable.FromAsync<int>(() =>
            {
                TaskCompletionSource<int> t = new TaskCompletionSource<int>();
                i++;

                int k = i;
                var rndNo = new Random().Next(3, 10);
                Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
                return t.Task;
            });

            var obs = from t in timerData
            from data in asyncCall
            select data;

            var hot = obs.Publish();
            hot.Connect();

                hot.Subscribe(j …
Run Code Online (Sandbox Code Playgroud)

c# recursion polling reactive-programming system.reactive

2
推荐指数
1
解决办法
464
查看次数

How to iterate Flux and mix with Mono

I have a use case when I should send email to the users. First I create email body.

Mono<String> emailBody = ...cache();
Run Code Online (Sandbox Code Playgroud)

And then I select users and send the email to them:

Flux.fromIterable(userRepository.findAllByRole(Role.USER))
            .map(User::getEmail)
            .doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

What I don't like

  1. Without cache() method emailBody Mono calculates in each iteration step.
  2. To get emailBody value I use emailBody.block() but maybe there's a reactive way and not call block method inside Flux flow?

java reactive-programming project-reactor spring-webflux

2
推荐指数
1
解决办法
7205
查看次数

为什么Spring ReactiveMongoRepository没有Mono的保存方法?

我有一个扩展ReactiveMongoRepository的MovieRepository.我想以反应方式保存单个POJO.但ReactiveMongoRepository不为Mono或Publisher提供保存方法.我必须使用block()方法或使用saveAllReactiveMongoRepository中的方法.

public Mono<ServerResponse> create(ServerRequest request) {

    Mono<Movie> movieMono = request.bodyToMono(Movie.class);
    return movieRepository.save(movieMono.block()) //
            .flatMap((movie) -> ServerResponse.ok().body(fromObject(movie)));
}
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来解决这类问题?我不认为使用块方法是反应式编程的好主意.

spring mongodb reactive-programming spring-data-mongodb project-reactor

2
推荐指数
1
解决办法
2341
查看次数

响应式编程和消息队列之间的区别

这些天,我忙于反应式概念。我已经了解了两个独立的概念,即反应式系统反应式编程。此外,我知道反应式系统是一个较大的概念,其中包含四个属性:

  1. 反应灵敏
  2. 弹性的
  3. 可扩展
  4. 事件驱动

在此处输入图片说明

图像参考:medium.com

我的问题是关于响应式编程的,我知道它的目标是通过Observable / Subscriber模型进行异步编程。 在此处输入图片说明

图片参考:https//hub.packtpub.com/introduction-reactive-programming/

现在,我对反应式编程Message Queue之间的区别感到困惑。我在面向消息的中间件和相关标准(例如JMS)方面有一些经验,我认为在侦听器模式而非阻塞模式下使用消息传递队列的反应式编程是相同的。

我想在反应式编程的真正概念中变得清晰。

java queue jms reactive-programming

2
推荐指数
1
解决办法
1440
查看次数

合并RXJS可观察变量,但等待第一个开始发射值

我有一个视频播放器事件流,当按下某些事件(例如暂停或播放)时会触发该流。该流来自使用CombineLatest的两个可观察对象,每当发出事件时都将抓住当前视频位置。

我需要每10秒钟发出另一个事件,然后将其投入混合。一切工作正常,但是ping $可观察值在发出第一个events值之前开始发出事件,这是标志玩家已加载的事件,并且可能记录了所有其他事件。

如何将ping $和events $合并到一个流中,但是只有在event $开始发出值后才让ping $开始?

const events$ = mediaStreams.events$;
const currTime$ = mediaStreams.currentTime$;

const intervalSource$ = interval(2000);
const ping$ = intervalSource$.pipe(map(() => "ping"));

const concatEvents$ = merge(events$, ping$);


const combined = concatEvents$.pipe(
 withLatestFrom(currTime$),
   map(([first, second]) => {
    return {
       event: first,
       position: second
     }
  })
)


combined.subscribe(val => console.log('COMBINED', val));
Run Code Online (Sandbox Code Playgroud)

谢谢

javascript reactive-programming rxjs

2
推荐指数
1
解决办法
34
查看次数