我试图理解Reactive和ReactiveStream之间的区别,特别是在RxJava的上下文中?
我能想到的最多的是Reactive Streams在规范中有一些背压概念,但RxJava/Reactive中已存在request(n)接口.
不介意ELI5的答案.
在Slick的文档中,使用Reactive Streams的示例仅用于读取数据作为DatabasePublisher的一种方式.但是,如果您希望根据插入率将数据库用作接收器和后端,会发生什么?
我找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有一个来源,说:
val source = Source(0 to 100)
如何用Slick创建一个Sink,将这些值写入带有模式的表中:
create table NumberTable (value INT)
是否有一些代码示例使用org.reactivestreams库来处理使用Java NIO的大数据流(为了获得高性能)?我的目标是分布式处理,所以使用Akka的例子是最好的,但我可以解决这个问题.
似乎大多数(我希望不是所有)scala中的文件读取Source(非二进制)或直接Java NIO(甚至是类似的东西)的例子都是这样的Files.readAllBytes.
也许有一个我错过的激活模板?(带有Scala的Akka Streams!除了二进制/ NIO端之外,已经接近我需要的所有内容)
我需要akka.stream.scaladsl.Source[T, Unit]从一个集合中创建一个Future[T].
例如,有一组期货返回整数,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
Run Code Online (Sandbox Code Playgroud)
如何创建一个
val source: Source[Int, Unit] = ???
Run Code Online (Sandbox Code Playgroud)
从中.
我不能使用Future.sequence组合器,从那以后我会等待每个未来完成之后从源头获取任何东西.我想在任何未来完成后立即以任何顺序获得结果.
我知道这Source是一个纯粹的功能API,它不应该以某种方式实现它之前运行任何东西.所以,我的想法是使用Iterator(懒惰)来创建一个源:
Source { () =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
Run Code Online (Sandbox Code Playgroud)
但这将是未来的来源,而不是实际价值.我也可以阻止next使用,Await.result(future)但我不确定哪个胎面池的线程将被阻止.这也将顺序调用期货,而我需要并行执行.
更新2:事实证明有一种更简单的方法(感谢Viktor Klang):
Source(futures).mapAsync(1)(identity)
Run Code Online (Sandbox Code Playgroud)
更新:这是基于@sschaef回答我得到的:
def futuresToSource[T](futures: …Run Code Online (Sandbox Code Playgroud) 我有以下代码重新调整 Mono<Foo>:
try {
return userRepository.findById(id) // step 1
.flatMap(user -> barRepository.findByUserId( user.getId()) // step 2
.map(bar-> Foo.builder().msg("Already exists").build()) // step 3
.switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build()) // step 4
.map(bar-> Foo.builder().msg("Created").build()) // step 5
))
.doOnError(throwable -> Mono.just(handleError(throwable)));
} catch(Exception e) {
log.error("from catch block");
return Mono.just(handleError(e));
}
Run Code Online (Sandbox Code Playgroud)
如果在步骤 1 中发生错误(例如,指定 id 的用户不存在),它会被 doOnError 或 try catch 块捕获还是这两个都没有?
如果在步骤 2、步骤 3、步骤 4 中发生错误,同样的问题。
什么是正确的代码,以便 doOnError 始终捕获错误并消除 try catch?
我public interface UserRepository extends ReactiveMongoRepository<User, String>对 barRepository使用
相同的。
handleError(throwable) 只是执行 log.error(e.getMessage() 并重新调整 Foo。
我从文档中读到flatMap:
将此 Flux 发出的元素异步转换为 Publisher,然后通过合并将这些内部发布者扁平化为单个 Flux,从而允许它们交错。
那flatMapSequential:
将此 Flux 发出的元素异步转换为 Publishers,然后将这些内部发布者扁平化为单个 Flux,但按照源元素的顺序合并它们。
然后concatMap:
将此 Flux 发出的元素异步转换为发布者,然后将这些内部发布者扁平化为单个 Flux,按顺序并使用串联保留顺序。该运算符有三个维度可以与 flatMap 和 flatMapSequential 进行比较:
内部函数的生成和订阅:该运算符等待一个内部函数完成,然后再生成下一个内部函数并订阅它。
展平值的排序:该运算符自然地保留与源元素相同的顺序,按顺序连接每个源元素的内部。
交错:此运算符不会让来自不同内部的值交错(串联)。
和 其他两者之间的区别flatMap是可以理解的,但我不明白concatMap和之间的区别何时flatMapSequential发生。两者之间有性能差异吗?我读过它flatMapSequential有一些队列的缓冲区大小,但我不明白为什么concatMap不需要它。
我正在寻找替换本土的日志处理库,它看起来非常接近ReactiveStreams io.projectreactor.目标是减少我们维护的代码,并利用社区添加的任何新功能(瞄准操作员融合).
首先,我需要使用stdio并将多行日志条目合并到流向管道的文本blob中.在Filebeat文档的多行日志条目章节中详细解释了用例(除了我们希望它在进程中).
到目前为止,我的代码是:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
这会在检测到新的日志标头时关注多行合并,但在现有库中,我们还会在超时后刷新累积的行(即如果在5秒内没有收到文本,则刷新记录).
在Reactor中对此进行建模的正确方法是什么?我是否需要编写自己的运算符,还是可以自定义任何现有运算符?
任何有关在Project Reactor或RxJava中实现此用例的相关示例和文档的指针都将非常受欢迎.
reactive-programming rx-java project-reactor reactive-streams
在 ProjectReactor 或 Reactive Streams 中,在您 subscribe() 之前不会发生任何事情。
除非有人订阅,否则响应式流数据流不会发生,但我看到所有 REST API(如查找、保存和插入)都没有显式调用订阅,但数据在生产者和订阅者之间流动。
@RestController
class PersonController {
private final PersonRepository repository;
public PersonController(PersonRepository repository) {
this.repository = repository;
}
@GetMapping("/all")
public Flux<Person> index() {
return repository.findAll();
}
@GetMapping("/people")
Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {
Flux<Person> result = repository.findByLastname(lastname);
return result.map(it -> it.getFullName());
}
@PostMapping("/people")
Flux<People> AddPeople(@RequestBody Flux<Person> people) {
return repository.saveAll(people);
}
}
Run Code Online (Sandbox Code Playgroud)
为什么我们不需要调用REST 端点的订阅来启动 Project Reactor 中的数据流?
当我从浏览器调用时,REST 端点(HTTP 请求)如何自动订阅 Reactive Streams 以获取数据流?
我在这里错过了什么吗?
reactive-programming project-reactor reactive-streams spring-webflux
我有一个 Flux 和 Mono,我不确定如何将它们组合起来,以便在 Flux 的每个项目中都有单声道值。
我正在尝试这种方法,但它不起作用:
Mono<String> mono1 = Mono.just("x");
Flux<String> flux1 = Flux.just("{1}", "{2}", "{3}", "{4}");
Flux.zip(mono1, flux1, (itemMono1, itemFlux1) -> "-[" + itemFlux1 + itemMono1 + "]-").subscribe(System.out::println);
The outcome that I'm getting is -[{1}x]-
How could I combine them in order to get -[{1}x, {2}x, {3}x, {4}x]-?
Run Code Online (Sandbox Code Playgroud) 我有一个外部(即,我无法更改它)Java API,如下所示:
public interface Sender {
void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)
我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。
使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
})(Keep.both)
lazy val (eventsActor, completeFuture) = eventPipeline.run()
override def …Run Code Online (Sandbox Code Playgroud)