标签: reactive-programming

有用或非trival双接口的示例

最近Erik Meijer和其他人已经展示了如何IObservable/IObserver双重IEnumerable/IEnumerator.它们是双重的这一事实意味着一个接口上的任何操作在另一个接口上都是有效的,从而为.Net反应扩展提供了理论基础.

是否存在其他双接口?我对任何一个例子感兴趣,而不仅仅是.Net.

c# functional-programming reactive-programming category-theory system.reactive

11
推荐指数
1
解决办法
515
查看次数

Reactive Framework与F#Events有何不同?

如果我已经熟悉F#事件,并且我不打算有太多的C#interop,是否有充分的理由考虑使用Reactive Framework?

f# reactive-programming

11
推荐指数
1
解决办法
2091
查看次数

使用ReactiveCocoa对小时,每小时执行操作

尝试遵循ReactiveCocoa 的最佳实践,每小时更新我的​​UI.这就是我所拥有的:

NSDateComponents *components = [[[NSCalendar sharedCalendar] calendar] components:NSMinuteCalendarUnit fromDate:[NSDate date]];
// Generalization, I know (not every hour has 60 minutes, but bear with me).
NSInteger minutesToNextHour = 60 - components.minute;

RACSubject *updateEventSignal = [RACSubject subject];
[updateEventSignal subscribeNext:^(NSDate *now) {
    // Update some UI
}];

[[[RACSignal interval:(60 * minutesToNextHour)] take:1] subscribeNext:^(id x) {
    [updateEventSignal sendNext:x];
    [[RACSignal interval:3600] subscribeNext:^(id x) {
        [updateEventSignal sendNext:x];
    }];
}];
Run Code Online (Sandbox Code Playgroud)

这有一些明显的缺陷:手动订阅和发送,只是"感觉不对劲".关于如何使这更"反应"的任何想法?

objective-c reactive-programming ios reactive-cocoa

11
推荐指数
2
解决办法
2974
查看次数

RxJava- CombineLatest但是只有一个Observable的发射?

假设我有两个可以在任何时刻发出值的无限Observable.他们结合起来创造了一个Observable<ProcessFileEvent>.

Observable<Integer>  selectedFileId= ...
Observable<MouseClick> buttonClick = ...

Observable<ProcessFileEvent> `processFileEvent` = Observable.combineLatest(selectedFileId, buttonClick, (s,b) -> {
    //create ProcessFileEvent here
});
Run Code Online (Sandbox Code Playgroud)

问题是我只希望processFileEvent在发出内容时buttonClick发出,而不是selectedFileId.这绝对不是用户在输入文件ID时所期望的行为并且它开始了ProcessFileEvent.我如何组合,但只在发出时buttonClick发出?

java reactive-programming rx-java

11
推荐指数
2
解决办法
3725
查看次数

onNext和onComplete的行为

我有一个Observable,无需发出值即可完成某些操作.我还有一个我希望Observable可以使用的对象列表.因此对于此列表中的所有元素:doSomething()

Observable.from(uris)
        .flatMap(new Func1<Uri, Observable<Void>>() {
            @Override
            public Observable<Void> call(Uri uri) {
                return createDoSomethingObservable(uri);
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<Void>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "completed");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Void aVoid) {
                Log.d(TAG, "next");
            }
        });
Run Code Online (Sandbox Code Playgroud)

以及创建Observable的方法:

Observable<Void> createDoSomethingObservable(final Uri uri) {
    return Observable.create(new Observable.OnSubscribe<Void>() {
        @Override
        public void call(Subscriber<? super Void> subscriber) {
            //doSomething
            subscriber.onNext(null);
            subscriber.onCompleted();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

现在,当我使用带有3个元素的List运行时,我得到:

next
next
next
completed
Run Code Online (Sandbox Code Playgroud)

这很好,因为这就是我想要的,但我不知道它为什么会起作用.首先我开始调用onComplete,因为最终observable完成了它的工作并完成了.但当然onNext永远不会在订阅者上调用.反过来也是如此.

所以我的问题是:

  1. 为什么onComplete只调用最后一个列表元素?
  2. 有没有更好的方法来解决这个问题?

android reactive-programming rx-java

11
推荐指数
2
解决办法
2万
查看次数

如何在后台线程上执行flatMap

我正在使用Retrofit和RxJava来执行一些后台任务.代码如下所示:

public class MyLoader{  
  public Observable<MyData> getMyData(){
      return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() {
              @Override
              public Observable<MyData> call(MyHelper myHelper) {
                  return queryData(myHelper);
              }
      });
  }

  private Observable<MyData> queryData(MyHelper myHelper){
      ...
  }

  private Observable<MyHelper> setupHelper(){
     return Observable.create(new Observable.OnSubscribe<MyHelper>() {
          @Override
          public void call(final Subscriber<? super MyHelper> subscriber) {
              try{
                MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data
                subscriber.onNext(helper);
                subscriber.onCompleted();
              }catch(RetrofitError e){
                subscriber.onError(e)
              }
          }
     }
  }
}
Run Code Online (Sandbox Code Playgroud)

由于NetworkOnMainThread此行的异常导致RetrofitError失败:

  MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some …
Run Code Online (Sandbox Code Playgroud)

java multithreading android reactive-programming rx-java

11
推荐指数
1
解决办法
4757
查看次数

带有nearPoints()的动态ggplot图层

我熟悉闪亮的基础知识但在这里挣扎.我希望能够在点击一个点时添加ggplot图层以突出显示该点.我知道这是可能的ggvis和画廊中有一个很好的例子,但我希望能够nearPoints()用来捕获点击作为ui输入.

我尝试了一些东西(见下文),它与ggplot图层分开显示然后消失.我已尝试过各种编辑reactive(),eventReactive()依此类推.

任何帮助深表感谢...

library(shiny)
library(ggplot2)

shinyApp(
  ui = shinyUI(
        plotOutput("plot", click = "clicked")
    ),

  server = shinyServer(function(input, output) {
    output$plot <- renderPlot({
      ggplot(mtcars, aes(x = mpg, y = wt)) +
        geom_point() +
        geom_point(data = nearPoints(mtcars, input$clicked), colour = "red", size = 5)
    })
  })
)
Run Code Online (Sandbox Code Playgroud)

我想我从概念上理解为什么这不起作用.该图具有依赖性,input$clicked这意味着当input$clicked更改时,图重新渲染,但这又会重置input$clicked.有点抓人22的情况.

r reactive-programming ggplot2 shiny

11
推荐指数
1
解决办法
1283
查看次数

Apache Kafka Streams将KTables物化为一个主题似乎很慢

我正在使用kafka流,我正在尝试将KTable实现为主题.

它工作但似乎每30秒左右完成一次.

Kafka Stream如何/何时决定将KTable的当前状态实现为主题?

有没有办法缩短这个时间并使其更"实时"?

这是我正在使用的实际代码

// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);

// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());

// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");

// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
Run Code Online (Sandbox Code Playgroud)

stream reactive-programming apache-kafka

11
推荐指数
1
解决办法
3150
查看次数

谁在响应式 Web 应用程序中调用 Flux 或 Mono 上的订阅

我正在查看一些响应式 Web 应用程序的示例,我看到它们是这样的

@RequestMapping(value = "/{id}", method = RequestMethod.GET)
@ResponseBody    
public Mono<Person> findById(...) {
    return exampleService.findById(...);
}

@RequestMapping(method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Person> findAll() {
    Flux<Person> persons = exampleService.findAll();
    return persons;
}
Run Code Online (Sandbox Code Playgroud)

当我在文档中阅读有关 Mono 和 Flux 时,它提到必须为 Mono 或 Flux 调用subscribe以发出数据。

因此,当我在本地运行这些反应式 Web 应用程序并在我点击端点时使用邮递员/chrome 浏览器时,我得到了结果。

在服务端,虽然端点返回 Mono 或 Flux,但我如何在浏览器/邮递员中看到实际结果。每当我点击返回 Mono/Flux 类型的端点时,浏览器是否会在内部调用订阅

reactive-programming project-reactor spring-webflux

11
推荐指数
1
解决办法
3264
查看次数

如何模拟返回`Mono&lt;Void&gt;`的方法

如何模拟返回的方法Mono<Void>

我有这个返回的方法 Mono<Void>

public Mono<Void> deleteMethod(Post post) {

        return statusRepository.delete(post);
    }
Run Code Online (Sandbox Code Playgroud)

在我的测试课中,我想做这样的事情

given(statusRepository.delete(any(Post.class))).willReturn(Mono.empty());
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来做到这一点?

有人能帮我吗?

谢谢。

java unit-testing mockito reactive-programming spring-webflux

11
推荐指数
1
解决办法
6944
查看次数