标签: reactive-programming

订阅者缺少消息; 这是Rx的错误还是我做错了?

我有一个Window WPF应用程序与以下构造函数

        numbers = Observable.Generate(DateTime.Now,
                                         time => true,
                                         time => DateTime.Now,
                                         time => { return new     Random(DateTime.Now.Millisecond).NextDouble() * 99 + 2; },
                                         time => TimeSpan.FromSeconds(1.0));


        numbers.ObserveOnDispatcher()
            .Subscribe(s => list1.Items.Add(s.ToString("##.00")));

        numbers.Where(n => n < 10).ObserveOnDispatcher().
            Subscribe(s => list2.Items.Add(s.ToString("##.00")));
Run Code Online (Sandbox Code Playgroud)

现在这里是列表的屏幕截图 - 左侧列表中缺少通知3.76 ...此行为是间歇性的.

图片

c# wpf reactive-programming system.reactive

1
推荐指数
1
解决办法
153
查看次数

没有Akka的反应性服务和可扩展性

我已经阅读了Reactive Manifesto几次,并试图绕过所有这些反应性,异步,无阻塞的东西.很清楚如何在Actors之上构建可扩展系统,但是如果我会Future在我的代码中主动使用scala,那么我将在可扩展性,异步执行执行方面获得相同的效果,每个方法都会接受或返回Future.这种服务是否具有可扩展性和响应性?让我们说在这个问题中,我对事件驱动和弹性部分服务并不感兴趣.

asynchronous scala future reactive-programming akka

1
推荐指数
1
解决办法
243
查看次数

Kotlin泛型方法和继承

在我的代码中,我想在抽象类中创建一个方法,它返回一些Observable.然后,这个抽象类的实现将返回某个(指定)类型的Observable.不幸的是,Android Studio将在实现方法()中返回错误"类型不匹配":

  • 预期:可观察
  • 找到:Observable <{package} .DrawerItemEntity>

我的MockDrawerList.getList()回报Observable<DrawerItemEntity>

请专注于execute()buildUseCaseObservable

抽象类

public abstract class UseCase(threadExecutor: ThreadExecutor,
    postExecutionThread: PostExecutionThread) {

    private val threadExecutor: ThreadExecutor
    private val postExecutionThread: PostExecutionThread

    private var subscription: Subscription = Subscriptions.empty()

    init {
        this.postExecutionThread = postExecutionThread
        this.threadExecutor = threadExecutor
    }

    protected abstract fun buildUseCaseObservable(): Observable<Any>

    public fun execute(useCaseSubsriber: Subscriber<Any>) {
        subscription = buildUseCaseObservable()
                .subscribeOn(Schedulers.from(threadExecutor))
                .observeOn(postExecutionThread.getScheduler())
                .subscribe(useCaseSubsriber)
    }

    public fun unsubsribe() {
        if (!subscription.isUnsubscribed())
            subscription.unsubscribe()
    }
}
Run Code Online (Sandbox Code Playgroud)

履行

Inject
class GetDrawerListUseCase(threadExecutor: ThreadExecutor, 
postExecutionThread:PostExecutionThread) : UseCase(threadExecutor, …
Run Code Online (Sandbox Code Playgroud)

generics android reactive-programming kotlin rx-java

1
推荐指数
1
解决办法
2466
查看次数

“无止境” TakeWhile,BufferWhile和SkipWhile RX.Net序列

我想知道是否存在一种方法来获取可观察的流并使用* While运算符,尤其是TakeWhile,SkipWhile和BufferWhile,以便在满足bool“ while”条件时,它们的订阅者不会收到.OnComplete?

当我开始使用.TakeWhile / SkipWhile和BufferWhile运算符时,我假定它们不会终止/ .OnComplete(),而只是在满足布尔条件时才发出/不发出。

举个例子可能更有意义:

我有一个bool标志,指示一个实例是否忙,以及一个可观察的数据流:

private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }

private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
Run Code Online (Sandbox Code Playgroud)

..并像这样(简化)使用/设置RX流

private void SetupRx()
{
    ConsumerSubscription = Producer
        .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
        .BufferWhile(_ => IsBusy == true) // for all …
Run Code Online (Sandbox Code Playgroud)

.net reactive-programming system.reactive

1
推荐指数
1
解决办法
372
查看次数

TypeError:React.renderToStaticMarkup不是函数

我正在按照本教程"使用React构建SVG图标"

http://jxnblk.com/react-icons/

我在运行命令时遇到困难 "npm run build"

这是 npm-debug.log

0 info it worked if it ends with ok
1 verbose cli [ '/Users/villat/.nvm/versions/node/v5.0.0/bin/node',
1 verbose cli   '/Users/villat/.nvm/versions/node/v5.0.0/bin/npm',
1 verbose cli   'run',
1 verbose cli   'build' ]
2 info using npm@3.3.6
3 info using node@v5.0.0
4 verbose run-script [ 'prebuild', 'build', 'postbuild' ]
5 info lifecycle react-icons@1.0.0~prebuild: react-icons@1.0.0
6 silly lifecycle react-icons@1.0.0~prebuild: no script for prebuild, continuing
7 info lifecycle react-icons@1.0.0~build: react-icons@1.0.0
8 verbose lifecycle react-icons@1.0.0~build: unsafe-perm in lifecycle true
9 verbose …
Run Code Online (Sandbox Code Playgroud)

javascript reactive-programming node.js reactjs

1
推荐指数
1
解决办法
2754
查看次数

使RxJava 1.1.5适应Reactor Core 3.1.0.M3

我想用采用RxJava 1.1.5与Spring WebFlux(即反应堆堆芯3.1.0.M3)库,但我无法适应ObservableFlux

我以为这是相对简单的,但是我的适配器不起作用:

import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public static <T> Flux<T> toFlux(Observable<T> observable) {
    return Flux.create(emitter -> {
        final Subscription subscription = observable.subscribe(new Subscriber<T>() {
            @Override
            public void onNext(T value) {
                emitter.next(value);
            }
            @Override
            public void onCompleted() {
                emitter.complete();
            }
            @Override
            public void onError(Throwable throwable) {
                emitter.error(throwable);
            }
        });
        emitter.onDispose(subscription::unsubscribe);
    });
}
Run Code Online (Sandbox Code Playgroud)

我已经验证过,onNext并且onCompleted都以正确的顺序被调用,但是我Flux始终是空的。有人看到我在做什么错吗?

在相关说明中,为什么反应堆插件中没有RxJava 1适配器?

java reactive-programming rx-java project-reactor

1
推荐指数
1
解决办法
570
查看次数

将修改后的数组发布到Observable

任务

假设我们实现了Angular服务,并且需要向Observable<number[]>世界发布:

numbers: Observable<number[]>;
Run Code Online (Sandbox Code Playgroud)

我们希望订户:

  1. 订阅时获得最后的价值
  2. 每次我们更改并发布时,都会收到整个修改后的数组

从#1开始,内部Observable<number[]>应至少为a BehaviorSubject<number[]>

但是2号呢?假设我们需要实现一个方法publishNumbersChange(),该方法在我们需要更改和发布更改后的数组时被调用:

private publishNumbersChange() {
    // Get current numbers array
    ...        

    changeArray();

    // Now publish changed numbers array
    ...
}
Run Code Online (Sandbox Code Playgroud)

问题

RxJS 5模式是什么,用于实现根据其先前项目发布修改后的数组的任务?

因为我问这个问题主要是因为当前我正在做Angular的东西,所以这里是问题的第二部分:
Angular(以及基于RxJS的类似框架)在提供Observable哪种类型参数是数组时会使用什么代码?随后发布更新的数组?
他们只是单独保留当前发布的数组的副本吗?

一些想法

看来,分别存储基础数组是最简单的事情,因此我们始终可以访问它。但是同时,它看起来不像RxJS方式(需要在RxJS流外部具有状态)。

另一方面,我们可以执行以下操作:

private publishNumbersChange() {
    // To get the latest value from the stream, we have to subscribe
    const subscription: Subscription = this.numbers.subscribe((numbers: number[]) => {
        // We got the last value in stream in numbers …
Run Code Online (Sandbox Code Playgroud)

javascript reactive-programming rxjs typescript angular

1
推荐指数
1
解决办法
1022
查看次数

在rxjs中取消对新Emmision的Ajax请求

我是反应式编程的新手,在创建AJAX导航侧导航时遇到问题。

<ul>
    <li>change_password</li>
    <li>update_profile</li>
</ul>
<h3> The result </h3>
<div id="theContent"></div>

<script type="text/javascript">
var listClick$ = Rx.Observable.create(function(observer){

            $('li').click(function(){

                let pageName = $(this).html();    
                observer.next(pageName);

            });

        }).startWith('change_password').flatMap(function(pageName){

            return Rx.Observable.fromPromise($.getJSON('http://localhost:3000/settings/'+pageName));        
        });
listClick$.subscribe(function(gottenJson){

$('#theContent').html(gottenJson.page);
}); 
</script>
Run Code Online (Sandbox Code Playgroud)

当我单击change_password或update_profile时,发出了ajax请求,但change_password的请求被延迟,因此,当单击它,然后单击update_profile时,将显示update_profile表单,但在完成时将被change_password表单替换。

单击另一个列表项时,如何取消第一个AJAX请求?

reactive-programming rxjs

1
推荐指数
1
解决办法
1352
查看次数

Spring 5 Reactive - 没有调用WebExceptionHandler

我已经尝试了所有3个解决方案,建议用什么方法来处理spring-webflux中的错误,但是WebExceptionHandler没有被调用.我在用Spring Boot 2.0.0.M7.Github回复这里

@Configuration
class RoutesConfiguration {

  @Autowired
  private lateinit var testService: TestService

  @Autowired
  private lateinit var globalErrorHandler: GlobalErrorHandler

  @Bean
  fun routerFunction():

    RouterFunction<ServerResponse> = router {
    ("/test").nest {

      GET("/") {
        ServerResponse.ok().body(testService.test())
      }
    }
  }


} 


@Component
class GlobalErrorHandler() : WebExceptionHandler {

  companion object {
    private val log = LoggerFactory.getLogger(GlobalErrorHandler::class.java)
  }

  override fun handle(exchange: ServerWebExchange?, ex: Throwable?): Mono<Void> {

    log.info("inside handle")

    /* Handle different exceptions here */
    when(ex!!) {
      is ClientException -> exchange!!.response.statusCode = …
Run Code Online (Sandbox Code Playgroud)

exception-handling reactive-programming spring-boot spring-webflux

1
推荐指数
2
解决办法
5144
查看次数

Flux.compose和Flux.transform之间的区别?

我正在学习反应流,并在Publishers(Flux)上工作,并致力于Flux的转型。为此,我得到了撰写和转换方法。

这是我的代码:

private static void composeStream() {
    System.out.println("*********Calling composeStream************");
    Function<Flux<String>, Flux<String>> alterMap = f -> {
                                                                return f.filter(color -> !color.equals("ram"))
                                                                        .map(String::toUpperCase);
                                                            };

    Flux<String> compose = Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
                                    .doOnNext(System.out::println)
                                    .compose(alterMap);

    compose.subscribe(d -> System.out.println("Subscriber to Composed AlterMap :"+d));
    System.out.println("-------------------------------------");

}

private static void transformStream() {
    System.out.println("*********Calling transformStream************");
    Function<Flux<String>, Flux<String>> alterMap = f -> f.filter(color -> !color.equals("ram"))
                                                            .map(String::toUpperCase);

    Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
            .doOnNext(System.out::println)
            .transform(alterMap)
            .subscribe(d -> System.out.println("Subscriber to Transformed AlterMap: "+d));
    System.out.println("-------------------------------------");
}
Run Code Online (Sandbox Code Playgroud)

这是输出,这两种情况都相同:

*********Calling transformStream************
ram
sam
Subscriber to Transformed …
Run Code Online (Sandbox Code Playgroud)

publisher publish-subscribe reactive-programming project-reactor reactive-streams

1
推荐指数
1
解决办法
774
查看次数