我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)
虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)
是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?
publisher publish-subscribe reactive-programming project-reactor reactive-streams
我想在我的新 Spring 应用程序中保持完全的反应性。因此,我将 web-flux/ reactor 和 ReactiveRepository 与 MongoDB 一起使用。
您知道如何将 java-mail 反应式地集成到技术堆栈中吗?任何替代方案?
我正在为我的spring-boot应用程序使用WebClient和自定义BodyExtractor类
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})
Run Code Online (Sandbox Code Playgroud)
BodyExtractor.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}
Run Code Online (Sandbox Code Playgroud)
上面的代码使用小的有效负载而不是大的有效负载,我认为这是因为我只读取一个通量值,next我不知道如何组合和读取所有dataBuffer.
我是反应堆的新手,所以我不知道很多使用flux/mono的技巧.
我正在玩Akka Stream,我试图在实现后弄清楚它的灵活性.
一种方法是使用低级反应流API:http: //doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/#akka.stream.scaladsl.PublisherSource
但是,您需要定义这些点以进行发布或订阅.有没有办法发布或订阅任意物化流程图节点?这应该是可能的,因为物化流程图只不过是一组参与者.
例如:首先,部署流程图1:A~> B~> C.
然后,部署流程图2和3:D~> BB~> E
2015-10-30更新
基于Roland Kuhn Awnser:
Akka Streams在Actors之间使用异步消息传递来实现流处理阶段.在异步边界上传递数据有一个开销,你在这里看到:你的计算似乎只需要大约160ns(来自单线程测量),而流式解决方案每个元素需要大约1μs,这是由消息传递决定的.
另一个误解是说"流"意味着并行性:在你的代码中,所有计算都在一个Actor(映射阶段)中顺序运行,因此对原始单线程解决方案没有任何好处.
为了从Akka Streams提供的并行性中受益,您需要具有多个处理阶段,每个阶段都执行任务
每个元素1μs,另见文档.
我做了一些改变.我的代码现在看起来像:
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)
val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)
val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder …Run Code Online (Sandbox Code Playgroud) 在Slick的文档中,使用Reactive Streams的示例仅用于读取数据作为DatabasePublisher的一种方式.但是,如果您希望根据插入率将数据库用作接收器和后端,会发生什么?
我找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有一个来源,说:
val source = Source(0 to 100)
如何用Slick创建一个Sink,将这些值写入带有模式的表中:
create table NumberTable (value INT)
我无法通过JavaScript的新Streams API从我的WebFlux服务器中获取响应.
我可以通过Curl(在帮助下--limit-rate)看到服务器正在按预期放慢速度,但是当我尝试在Google Chrome(64.0.3282.140)中使用正文时,它并没有像它应该的那样减速.实际上,即使只传递了大约187 kB,Chrome也会从服务器下载并缓冲大约32 MB write().
我的JavaScript有问题吗?
async function fetchStream(url, consumer) {
const response = await fetch(url, {
headers: {
"Accept": "application/stream+json"
}
});
const decoder = new TextDecoder("utf-8");
let buffer = "";
await response.body.pipeTo(new WritableStream({
async write(chunk) {
buffer += decoder.decode(chunk);
const blocks = buffer.split("\n");
if (blocks.length === 1) {
return;
}
const indexOfLastBlock = blocks.length - 1;
for (let index = 0; index < indexOfLastBlock; index ++) {
const block = blocks[index];
const item …Run Code Online (Sandbox Code Playgroud) javascript google-chrome reactive-programming reactive-streams spring-webflux
我需要在 Observable 完成时执行一些代码,具体取决于是否最终确定有错误。我有这个代码:
const obs = getMyObservable().pipe(finalize(() => {
//here
}));
Run Code Online (Sandbox Code Playgroud)
如您所见,我正在使用finalize运算符,但我不知道是否已最终确定是否有错误。中是否有某种doOnComplete或doOnError运算符rxjs?
我对 Reactive Stream 有点陌生,所以在使用Spring Webflux和Reactor时遇到了一个问题。
我制作了一个如下所示的片段:
@RestController
public class TestController {
@GetMapping("responsebody/flux")
public Flux<String> tt2() {
return Flux.range(1, 5)
.delayElements(Duration.ofMillis(1000))
.map(l -> "hi");
}
}
Run Code Online (Sandbox Code Playgroud)
而且,有趣的是,镶边分别显示序列中的每个元素,而不是当我仅使用浏览器请求时一次公开所有元素。(但开发工具立即显示全身)
但我想知道,即使 HTTP 1 只使用一个连接,并且服务器发送的数据放在 HTTP 协议的正文中,它是如何工作的。客户端如何知道哪个元素分隔每个元素以及序列何时完成?如果客户端还没有准备好使用反应流怎么办?
我不需要任何使用反应式库的代码,但想知道协议是如何工作的。
http project-reactor reactive-streams reactive spring-webflux
reactive-streams ×10
akka-stream ×3
javascript ×2
scala ×2
akka ×1
http ×1
jakarta-mail ×1
java ×1
java-9 ×1
jetty ×1
mysql ×1
publisher ×1
reactive ×1
rxjs ×1
servlets ×1
slick ×1
slick-3.0 ×1
spring ×1
spring-boot ×1