标签: reactive-streams

Reactive和Reactive Streams有什么区别?

我试图理解Reactive和ReactiveStream之间的区别,特别是在RxJava的上下文中?

我能想到的最多的是Reactive Streams在规范中有一些背压概念,但RxJava/Reactive中已存在request(n)接口.

不介意ELI5的答案.

java reactive-programming rx-java reactive-streams

10
推荐指数
1
解决办法
3558
查看次数

如何在Slick中使用反应流来插入数据

Slick的文档中,使用Reactive Streams的示例仅用于读取数据作为DatabasePublisher的一种方式.但是,如果您希望根据插入率将数据库用作接收器和后端,会发生什么?

我找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有一个来源,说:

val source = Source(0 to 100)

如何用Slick创建一个Sink,将这些值写入带有模式的表中:

create table NumberTable (value INT)

mysql slick reactive-streams akka-stream slick-3.0

10
推荐指数
2
解决办法
1712
查看次数

如何使用Reactive Streams进行NIO二进制处理?

是否有一些代码示例使用org.reactivestreams库来处理使用Java NIO的大数据流(为了获得高性能)?我的目标是分布式处理,所以使用Akka的例子是最好的,但我可以解决这个问题.

似乎大多数(我希望不是所有)scala中的文件读取Source(非二进制)或直接Java NIO(甚至是类似的东西)的例子都是这样的Files.readAllBytes.

也许有一个我错过的激活模板?(带有Scala的Akka Streams!除了二进制/ NIO端之外,已经接近我需要的所有内容)

nio scala akka reactive-streams

9
推荐指数
2
解决办法
1818
查看次数

在akka-stream中如何从期货集合中创建无序的来源

我需要akka.stream.scaladsl.Source[T, Unit]从一个集合中创建一个Future[T].

例如,有一组期货返回整数,

val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
Run Code Online (Sandbox Code Playgroud)

如何创建一个

val source: Source[Int, Unit] = ???
Run Code Online (Sandbox Code Playgroud)

从中.

我不能使用Future.sequence组合器,从那以后我会等待每个未来完成之后从源头获取任何东西.我想在任何未来完成后立即以任何顺序获得结果.

我知道这Source是一个纯粹的功能API,它不应该以某种方式实现它之前运行任何东西.所以,我的想法是使用Iterator(懒惰)来创建一个源:

Source { () =>
  new Iterator[Future[Int]] {
    override def hasNext: Boolean = ???
    override def next(): Future[Int] = ???
  }
}
Run Code Online (Sandbox Code Playgroud)

但这将是未来的来源,而不是实际价值.我也可以阻止next使用,Await.result(future)但我不确定哪个胎面池的线程将被阻止.这也将顺序调用期货,而我需要并行执行.

更新2:事实证明有一种更简单的方法(感谢Viktor Klang):

Source(futures).mapAsync(1)(identity)
Run Code Online (Sandbox Code Playgroud)

更新:这是基于@sschaef回答我得到的:

def futuresToSource[T](futures: …
Run Code Online (Sandbox Code Playgroud)

scala future reactive-streams akka-stream

9
推荐指数
2
解决办法
3012
查看次数

如何处理 Spring reactor Mono 或 Flux 中的错误?

我有以下代码重新调整 Mono<Foo>:

try {
    return userRepository.findById(id)  // step 1
        .flatMap(user -> barRepository.findByUserId( user.getId())  // step 2
        .map(bar-> Foo.builder().msg("Already exists").build())  // step 3
            .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())  // step 4
                .map(bar-> Foo.builder().msg("Created").build())   // step 5 
            ))
            .doOnError(throwable -> Mono.just(handleError(throwable)));
    } catch(Exception e) {

        log.error("from catch block");
        return Mono.just(handleError(e));

    }
Run Code Online (Sandbox Code Playgroud)

如果在步骤 1 中发生错误(例如,指定 id 的用户不存在),它会被 doOnError 或 try catch 块捕获还是这两个都没有?

如果在步骤 2、步骤 3、步骤 4 中发生错误,同样的问题。

什么是正确的代码,以便 doOnError 始终捕获错误并消除 try catch?

public interface UserRepository extends ReactiveMongoRepository<User, String>对 barRepository使用 相同的。

handleError(throwable) 只是执行 log.error(e.getMessage() 并重新调整 Foo。

spring project-reactor reactive-streams

9
推荐指数
2
解决办法
4万
查看次数

Project Reactor 中的 flatMap、flatMapSequential 和 concatMap 有什么区别?

我从文档中读到flatMap

将此 Flux 发出的元素异步转换为 Publisher,然后通过合并将这些内部发布者扁平化为单个 Flux,从而允许它们交错。

flatMapSequential

将此 Flux 发出的元素异步转换为 Publishers,然后将这些内部发布者扁平化为单个 Flux,但按照源元素的顺序合并它们。

然后concatMap

将此 Flux 发出的元素异步转换为发布者,然后将这些内部发布者扁平化为单个 Flux,按顺序并使用串联保留顺序。该运算符有三个维度可以与 flatMap 和 flatMapSequential 进行比较:

内部函数的生成和订阅:该运算符等待一个内部函数完成,然后再生成下一个内部函数并订阅它。

展平值的排序:该运算符自然地保留与源元素相同的顺序,按顺序连接每个源元素的内部。

交错:此运算符不会让来自不同内部的值交错(串联)。

和 其他两者之间的区别flatMap是可以理解的,但我不明白concatMap和之间的区别何时flatMapSequential发生。两者之间有性能差异吗?我读过它flatMapSequential有一些队列的缓冲区大小,但我不明白为什么concatMap不需要它。

reactor project-reactor reactive-streams spring-webflux

9
推荐指数
1
解决办法
1万
查看次数

Reactive Streams - 使用超时批处理

我正在寻找替换本土的日志处理库,它看起来非常接近ReactiveStreams io.projectreactor.目标是减少我们维护的代码,并利用社区添加的任何新功能(瞄准操作员融合).

首先,我需要使用stdio并将多行日志条目合并到流向管道的文本blob中.在Filebeat文档的多行日志条目章节中详细解释了用例(除了我们希望它在进程中).

到目前为止,我的代码是:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
          .subscribe();          
Run Code Online (Sandbox Code Playgroud)

这会在检测到新的日志标头时关注多行合并,但在现有库中,我们还会在超时后刷新累积的行(即如果在5秒内没有收到文本,则刷新记录).

在Reactor中对此进行建模的正确方法是什么?我是否需要编写自己的运算符,还是可以自定义任何现有运算符?

任何有关在Project Reactor或RxJava中实现此用例的相关示例和文档的指针都将非常受欢迎.

reactive-programming rx-java project-reactor reactive-streams

8
推荐指数
1
解决办法
653
查看次数

从浏览器/REST 客户端调用时如何自动订阅 REST 端点?

在 ProjectReactor 或 Reactive Streams 中,在您 subscribe() 之前不会发生任何事情。

除非有人订阅,否则响应式流数据流不会发生,但我看到所有 REST API(如查找、保存和插入)都没有显式调用订阅,但数据在生产者和订阅者之间流动。

@RestController
class PersonController {

      private final PersonRepository repository;

      public PersonController(PersonRepository repository) {
        this.repository = repository;
      }
      @GetMapping("/all")
      public Flux<Person> index() {

         return repository.findAll();

     }
      @GetMapping("/people")
      Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {

        Flux<Person> result = repository.findByLastname(lastname);
        return result.map(it -> it.getFullName());
      }

      @PostMapping("/people")
      Flux<People> AddPeople(@RequestBody Flux<Person> people) {

          return repository.saveAll(people);
      }
}
Run Code Online (Sandbox Code Playgroud)

为什么我们不需要调用REST 端点的订阅来启动 Project Reactor 中的数据流?

当我从浏览器调用时,REST 端点(HTTP 请求)如何自动订阅 Reactive Streams 以获取数据流?

我在这里错过了什么吗?

reactive-programming project-reactor reactive-streams spring-webflux

8
推荐指数
1
解决办法
1417
查看次数

project reactor - 如何将 Mono 和 Flux 结合起来?

我有一个 Flux 和 Mono,我不确定如何将它们组合起来,以便在 Flux 的每个项目中都有单声道值。

我正在尝试这种方法,但它不起作用:

Mono<String> mono1 = Mono.just("x");
Flux<String> flux1 = Flux.just("{1}", "{2}", "{3}", "{4}");

Flux.zip(mono1, flux1, (itemMono1, itemFlux1) ->  "-[" + itemFlux1 + itemMono1 + "]-").subscribe(System.out::println);

The outcome that I'm getting is -[{1}x]-

How could I combine them in order to get -[{1}x, {2}x, {3}x, {4}x]-?
Run Code Online (Sandbox Code Playgroud)

project-reactor reactive-streams spring-webflux

8
推荐指数
2
解决办法
7349
查看次数

将元素从外部推送到 fs2 中的反应流

我有一个外部(即,我无法更改它)Java API,如下所示:

public interface Sender {
    void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)

我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。

使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def …
Run Code Online (Sandbox Code Playgroud)

scala reactive-streams akka-stream fs2

8
推荐指数
1
解决办法
953
查看次数