标签: reactive-streams

Project Reactor 3 中的 publishOn 与 subscribeOn

我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)

虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)

是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?

publisher publish-subscribe reactive-programming project-reactor reactive-streams

13
推荐指数
2
解决办法
1万
查看次数

如何在 spring web-flux 中发送电子邮件反应性

我想在我的新 Spring 应用程序中保持完全的反应性。因此,我将 web-flux/ reactor 和 ReactiveRepository 与 MongoDB 一起使用。

您知道如何将 java-mail 反应式地集成到技术堆栈中吗?任何替代方案?

jakarta-mail spring-boot reactive-streams spring-webflux

13
推荐指数
2
解决办法
2386
查看次数

如何正确读取Flux <DataBuffer>并将其转换为单个inputStream

我正在为我的spring-boot应用程序使用WebClient和自定义BodyExtractor

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })
Run Code Online (Sandbox Code Playgroud)

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}
Run Code Online (Sandbox Code Playgroud)

上面的代码使用小的有效负载而不是大的有效负载,我认为这是因为我只读取一个通量值,next我不知道如何组合和读取所有dataBuffer.

我是反应堆的新手,所以我不知道很多使用flux/mono的技巧.

java spring project-reactor reactive-streams spring-webflux

12
推荐指数
4
解决办法
1万
查看次数

如何发布或订阅物化的Akka Stream流程图?

我正在玩Akka Stream,我试图在实现后弄清楚它的灵活性.

一种方法是使用低级反应流API:http: //doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/#akka.stream.scaladsl.PublisherSource

但是,您需要定义这些点以进行发布或订阅.有没有办法发布或订阅任意物化流程图节点?这应该是可能的,因为物化流程图只不过是一组参与者.

例如:首先,部署流程图1:A~> B~> C.

然后,部署流程图2和3:D~> BB~> E

scala akka reactive-streams akka-stream

11
推荐指数
1
解决办法
799
查看次数

Servlet中使用java 9 Flow的反应流用例?

我正在寻找在servlet容器(或只是HTTP服务器)中使用反应流的用例.

码头项目已开始被问:"是码头反应" 我们已经注意到将反应流添加到java 9 的提议.

所以我们已经开始使用反应流API进行异步servlet IO的一些实验,这很有趣.....但没有任何关注,因为我们缺乏真正的用例来关注哪些问题最重要.

那么,任何人都有任何可以分享/解释的好用例,以便我们可以指导我们的码头实验来满足他们的需求.我想象的那种事情是让一个基于RS的数据库发布者在HTTP响应或websocket连接上一直发送对象,使用Flow.Processors进行转换.

servlets jetty reactive-programming reactive-streams java-9

11
推荐指数
1
解决办法
880
查看次数

Akka-Stream实现比单线程实现慢

2015-10-30更新


基于Roland Kuhn Awnser:

Akka Streams在Actors之间使用异步消息传递来实现流处理阶段.在异步边界上传递数据有一个开销,你在这里看到:你的计算似乎只需要大约160ns(来自单线程测量),而流式解决方案每个元素需要大约1μs,这是由消息传递决定的.

另一个误解是说"流"意味着并行性:在你的代码中,所有计算都在一个Actor(映射阶段)中顺序运行,因此对原始单线程解决方案没有任何好处.

为了从Akka Streams提供的并行性中受益,您需要具有多个处理阶段,每个阶段都执行任务

每个元素1μs,另见文档.

我做了一些改变.我的代码现在看起来像:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder …
Run Code Online (Sandbox Code Playgroud)

scala reactive-streams akka-stream

10
推荐指数
2
解决办法
2473
查看次数

如何在Slick中使用反应流来插入数据

Slick的文档中,使用Reactive Streams的示例仅用于读取数据作为DatabasePublisher的一种方式.但是,如果您希望根据插入率将数据库用作接收器和后端,会发生什么?

我找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有一个来源,说:

val source = Source(0 to 100)

如何用Slick创建一个Sink,将这些值写入带有模式的表中:

create table NumberTable (value INT)

mysql slick reactive-streams akka-stream slick-3.0

10
推荐指数
2
解决办法
1712
查看次数

fetch()上的Back Pressure在Google Chrome中无效

我无法通过JavaScript的新Streams API从我的WebFlux服务器中获取响应.

我可以通过Curl(在帮助下--limit-rate)看到服务器正在按预期放慢速度,但是当我尝试在Google Chrome(64.0.3282.140)中使用正文时,它并没有像它应该的那样减速.实际上,即使只传递了大约187 kB,Chrome也会从服务器下载并缓冲大约32 MB write().

我的JavaScript有问题吗?

async function fetchStream(url, consumer) {
    const response = await fetch(url, {
        headers: {
            "Accept": "application/stream+json"
        }
    });
    const decoder = new TextDecoder("utf-8");
    let buffer = "";
    await response.body.pipeTo(new WritableStream({
        async write(chunk) {
            buffer += decoder.decode(chunk);
            const blocks = buffer.split("\n");
            if (blocks.length === 1) {
                return;
            }
            const indexOfLastBlock = blocks.length - 1;
            for (let index = 0; index < indexOfLastBlock; index ++) {
                const block = blocks[index];
                const item …
Run Code Online (Sandbox Code Playgroud)

javascript google-chrome reactive-programming reactive-streams spring-webflux

10
推荐指数
1
解决办法
446
查看次数

我如何知道 Observable 是否已完成有错误或没有错误?

我需要在 Observable 完成时执行一些代码,具体取决于是否最终确定有错误。我有这个代码:

const obs = getMyObservable().pipe(finalize(() => {
    //here
}));
Run Code Online (Sandbox Code Playgroud)

如您所见,我正在使用finalize运算符,但我不知道是否已最终确定是否有错误。中是否有某种doOnCompletedoOnError运算符rxjs

javascript rxjs reactive-streams

10
推荐指数
1
解决办法
3695
查看次数

响应式流如何与 HTTP 配合使用?什么是响应式http?

我对 Reactive Stream 有点陌生,所以在使用Spring WebfluxReactor时遇到了一个问题。

我制作了一个如下所示的片段:

@RestController
public class TestController {
    @GetMapping("responsebody/flux")
    public Flux<String> tt2() {
        return Flux.range(1, 5)
            .delayElements(Duration.ofMillis(1000))
            .map(l -> "hi");
    }
}
Run Code Online (Sandbox Code Playgroud)

而且,有趣的是,镶边分别显示序列中的每个元素,而不是当我仅使用浏览器请求时一次公开所有元素。(但开发工具立即显示全身)

但我想知道,即使 HTTP 1 只使用一个连接,并且服务器发送的数据放在 HTTP 协议的正文中,它是如何工作的。客户端如何知道哪个元素分隔每个元素以及序列何时完成?如果客户端还没有准备好使用反应流怎么办?

我不需要任何使用反应式库的代码,但想知道协议是如何工作的。

http project-reactor reactive-streams reactive spring-webflux

10
推荐指数
1
解决办法
1287
查看次数