标签: reactive-programming

Reactive Extensions基于特定数字的并行处理

我是Reactive Extensions的新手.我有对象集合并为每个对象调用方法,并返回布尔值.通过使用每个循环并调用方法,而不是循环遍历每个循环,是否有一种方法在响应式扩展中同时调用(fork和join)给定数量的对象的方法(一次是ex 5)并且在第一次完成之后,第六个应该调用方法,它应该继续,直到所有对象都完成.

感谢您的回复.

.net throttling reactive-programming system.reactive

1
推荐指数
1
解决办法
553
查看次数

建议在长时间运行的过程中使用rx与众不同?

我正在使用rx不同运算符在长时间运行的过程中基于某个键来过滤外部数据流。

这会导致内存泄漏吗?假设将收到许多不同的密钥。rx区分运算符如何跟踪先前收到的密钥?

我应该将groupbyuntil与持续时间选择器一起使用吗?

c# reactive-programming system.reactive

1
推荐指数
1
解决办法
1073
查看次数

有没有办法将观察者订阅为异步

给定一个同步观察者,有没有办法做到这一点:

observable.SubscribeAsync(observer);
Run Code Online (Sandbox Code Playgroud)

并且observer异步调用所有方法或者是在创建观察者时必须处理的内容吗?

c# multithreading asynchronous reactive-programming system.reactive

1
推荐指数
2
解决办法
5259
查看次数

如果中间有linq方法,则rx处理异常订阅

当我订阅有时抛出异常的方法时,我会得到2个不同的行为.如果我在中间连接LINQ方法,那么订阅就会被处理掉,另外一点也不是,为什么呢?

void main(){
  var numbersSubject=new Subject<int>();

  numbersSubject.subscribe(throwMethod);   // 1,2,3,4,6,7,8,9,10
  // numbersSubject.select(num=>num).subscribe(throwMethod);   // 1,2,3,4

  for(int i=0;i<10;i++)
  {
    try{
      numbersSubject.OnNext(i);
    }catch{}
  }
}

void throwMethod(int num)
{
   if(num==5)
       throw new Exception();
   Console.writeLine(i);
}
Run Code Online (Sandbox Code Playgroud)

c# reactive-programming system.reactive

1
推荐指数
1
解决办法
188
查看次数

将参数传递给Observable.create

我在Android上使用RXJava异步访问数据库.

我想在我的数据库中保存一个对象.通过这种方式,我创建了一个方法,它接受一个最终参数(我想要保存的对象)并返回一个Observable.

在这一点上,我不在乎发射任何东西,所以我最后会打电话subscriber.onComplete().

这是我的代码:

public Observable saveEventLog(@NonNull final EventLog eventLog) {
    return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            DBEventLog log = new DBEventLog(eventLog);
            log.save();
            subscriber.onCompleted();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

问题是,我看到许多人使用参数的final关键字回答,但我想在没有它的情况下这样做.原因是我不喜欢声明一个final变量的方法,以便在另一个线程中使用它.

还有其他选择吗?谢谢.

android reactive-programming rx-java

1
推荐指数
1
解决办法
2342
查看次数

如何使用Reactor创建随机数生成器?

我想通过重复发射随机数来模拟测量数据.我尝试使用Reactor,但它不会产生任何输出:

private static Random random = new Random();

public static void main(String[] args) throws InterruptedException {
    Flux<Double> doubleGenerator = Flux.generate(
            () -> random.nextDouble(),
            (ignored, sink) -> {
                sink.next(random.nextDouble());
                return 0.0;
            });
    Flux<Long> timer = Flux.intervalMillis(1000);
    Flux.combineLatest(doubleGenerator, timer, (value, ignored) -> value)
            .subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)

如何使用Reactor创建随机值流?

java reactive-programming project-reactor

1
推荐指数
1
解决办法
973
查看次数

用RxSwift循环

我是Reactive Programming的新手,我有一个我不能独自解决的大问题...我需要按顺序上传几个视频资产,但我不知道该怎么做,我有很多PHAssets,我正在尝试遍历每个元素并通过网络发送它。到目前为止,这是我的代码并带有注释:

for item in items {
                    let fileName = item.media.localIdentifier

                    //Observable to generate local url to be used to save the compressed video
                    let compressedVideoOutputUrl = videoHelper.getDocumentsURL().appendingPathComponent(fileName)

                    //Observable to generate a thumbnail image for the video
                    let thumbnailObservable =  videoHelper.getBase64Thumbnail(myItem: item)

                    //Observable to request the video from the iPhone library
                    let videoObservable = videoHelper.requestVideo(myItem: item)

                        //Compress the video and save it on the previously generated local url
                        .flatMap { videoHelper.compressVideo(inputURL: $0, outputURL: compressedVideoOutputUrl) }
                    //Generate the thumbnail and share …
Run Code Online (Sandbox Code Playgroud)

reactive-programming ios swift rx-swift

1
推荐指数
1
解决办法
2151
查看次数

为什么Reactor 3需要单声道类型?

Reactor 3有2种主要数据类型,它们都是反应流发布者

  • reactor.core.publisher.Mono<T>
  • reactor.core.publisher.Flux<T>

我理解Mono是0或1个元素的流,而Flux是0或N个元素的流.

由于Mono和Flush都在实施,org.reactivestreams.Publisher<T>为什么我们需要两种类型,为什么不只是使用Flux来做所有事情?

spring reactive-programming project-reactor reactive-streams spring-webflux

1
推荐指数
1
解决办法
221
查看次数

当从数据库中获取任何内容时,如何使服务层返回Mono :: error

我正在使用此代码块从Couchbase检索一些数据,但是当它无法获取任何内容时,它不会调用该onErrorResume块。onErrorResume当找不到与密钥匹配的文档时,有什么方法可以使该代码调用?

return referenceService.getReferenceTable(referenceKey)
    .flatMap(referenceTable -> {
        logger.info("reference table: {}", referenceTable.toString());
        Market market = getMarket(aggregate.getDate(), aggregate.getMarket(), referenceTable);
        aggregate.setMarket(market);
        return Mono.just(aggregate);
    })
    .onErrorResume(e -> {
        logger.info("Error getting reference table");
        return Mono.error(e);
    });
Run Code Online (Sandbox Code Playgroud)

正在使用的服务层如下所示:

@Service("referenceService")
public class CouchbaseReferenceService implements ReferenceService {

    @Autowired
    private ReferenceRepository referenceRepository;

    @Override
    public Mono<ReferenceTable> getReferenceTable(String key) {
        return referenceRepository.getReferenceTable(key);
    }

}
Run Code Online (Sandbox Code Playgroud)

java reactive-programming couchbase spring-boot project-reactor

1
推荐指数
1
解决办法
56
查看次数

如何使用反应式编程实现嵌套的异步代码?

我对反应式编程非常陌生。尽管我对函数式编程和Kotlin协程非常熟悉,但是我仍然无法弄清楚如何使用反应式编程范例来重构普通的嵌套CRUD代码,尤其是那些具有嵌套异步操作的代码。

例如,以下是基于Java 8的简单异步CRUD代码段 CompletableFuture


        getFooAsync(id)
                .thenAccept(foo -> {
                    if (foo == null) {
                        insertFooAsync(id, new Foo());
                    } else {
                        getBarAsync(foo.bar)
                                .thenAccept(bar -> {
                                   updateBarAsync(foo, bar);
                                });
                    }
                });

Run Code Online (Sandbox Code Playgroud)

使用Kotlin协程进行重构非常容易,这使得它在不失去异步性的情况下更具可读性。

 val foo = suspendGetFoo(id)
 if(foo==null) {
   suspendInsertFoo(id, Foo())
 } else {
   val bar = suspendGetBar(foo.bar)
   suspendUpdateBar(foo, bar);-
}
Run Code Online (Sandbox Code Playgroud)

但是,这样的代码是否适合于反应式编程?

如果是这样,给定一个Flux<String> idFlux,如何使用Reactor 3对其进行重构?

它是一个好主意,只需更换所有的CompletableFutureMono

java asynchronous reactive-programming project-reactor

1
推荐指数
1
解决办法
160
查看次数