我是Spring Web-Flux的首发.我写了一个控制器如下:
@RestController
public class FirstController
{
@GetMapping("/first")
public Mono<String> getAllTweets()
{
return Mono.just("I am First Mono")
}
}
Run Code Online (Sandbox Code Playgroud)
我知道其中一个反应性好处是Backpressure,它可以平衡请求或响应率.我想知道如何在Spring Web-Flux中使用背压机制.
我正在写一个模块,它是一个可写的流.我想为我的用户实现管道接口.
如果发生某些错误,我需要暂停可读流并发出错误事件.然后,用户将决定 - 如果他没有错误,他应该能够恢复数据处理.
var writeable = new BackPressureStream();
writeable.on('error', function(error){
console.log(error);
writeable.resume();
});
var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);
Run Code Online (Sandbox Code Playgroud)
我看到该节点为我们提供readable.pause()
了可用于暂停可读流的方法.但我无法得到如何从我的可写流模块中调用它:
var Writable = require('stream').Writable;
function BackPressureStream(options) {
Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);
BackPressureStream.prototype._write = function(chunk, encoding, done) {
done();
};
BackPressureStream.prototype.resume = function() {
this.emit('drain');
}
Run Code Online (Sandbox Code Playgroud)
如何在可写流中实现背压?
PS可以使用pipe/unpipe
提供可读流作为参数的事件.但也有人说,对于管道流,暂停的唯一机会是从可写入中删除可读流.
我做对了吗?我必须取消管理可写流,直到用户呼叫恢复为止?用户调用恢复后,我应该管道可读流回来?
我是RxJava的初学者,我很好奇"背压"的含义.
这是否意味着生产者会给消费者带来压力?
或者这是否意味着消费者对生产者施加压力?(相反方向的压力)
我使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10.我与QUORUM一致性异步编写.
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex); …
Run Code Online (Sandbox Code Playgroud) java multithreading rate-limiting backpressure datastax-java-driver
据我所知,Node的事件IO模型的一个后果就是无法告诉Node进程(例如)通过TCP套接字接收数据,阻止,一旦你连接了接收事件处理程序(或者否则开始收听数据).
如果接收器无法足够快地处理传入数据,则可能导致"无限并发",从而节点内的节点继续尽可能快地从套接字读取数据,在事件循环上调度新数据事件而不是阻塞在套接字上,直到进程最终耗尽内存并死掉.
接收方无法告诉节点减慢其读取速度,否则将允许TCP的内置流量控制机制启动并向发送方指示它需要减速.
首先,到目前为止我所描述的是准确的吗?有没有我错过了允许节点避免这种情况的东西?
Node Streams备受推崇的功能之一是自动处理背压.
AFAIK,可写流(tcp套接字)可以告诉它是否需要减速的唯一方法是查看socket.bufferSize
(指示写入套接字但尚未发送的数据量).鉴于接收端的节点总是尽可能快地读取,这只能表明发送方和接收方之间的网络连接速度慢,而不是指示接收方是否无法跟上.
那么其次,Node Streams能否以某种方式自动反压在这种情况下处理无法跟上的接收器?
似乎这个问题影响浏览器通过websockets接收数据,原因类似于websockets API没有提供一种机制来告诉浏览器减慢从套接字读取的速度.
Node(以及使用websockets的浏览器)在应用程序级别实现手动流量控制机制,以明确告诉发送进程减速是唯一的解决方案吗?
我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并因错误而失败.似乎mapcat
在core.async管道中使用可以打破压力.(由于超出本问题范围的原因,这是不幸的.)
下面是一些通过计算:x
输入和输出mapcat
换能器来演示问题的代码:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x)))))) …
Run Code Online (Sandbox Code Playgroud) 我有一个接收单个事件(字符串)的 CustomReceiver。在火花应用程序的运行时使用接收到的单个事件从 nosql 读取数据并应用转换。当观察到每个批次的处理时间大于批次间隔时,我设置这个属性。
spark.streaming.backpressure.enabled=true
之后,我预计 CustomReceiver 不会在批处理处理时间长于批处理窗口时触发和接收事件,这并没有发生,并且仍然添加了一批积压的批次。我在这里错过了什么吗?
我是反应式编程世界的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。
以下是我试图实现的工作流程:
可流动的连续字符串流。
执行一个耗时的操作并将消息更改为另一个字符串
执行另一个耗时的操作。
现在我使用以下代码:
{
Flowable.create(subscriber -> {
some_stream.forEach(data -> {
subscriber.onNext(data);
});
}, BackpressureStrategy.BUFFER).
subscribeOn(Schedulers.io()). // Data emission will run io scheduler
observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
map(val -> Time_Consuming_Task(val)). // Task returns another string
observeOn(Schedulers.io()). / Next consumer will run on computation scheduler
subscribe(val -> Another_Time_Consuming_Task(val));
}
Run Code Online (Sandbox Code Playgroud)
现在对于小型操作,我没有看到任何与背压相关的问题。
但是对于大流,我不知道它会如何表现。
现在我的问题是:-
BackpressureStrategy.BUFFER的默认缓冲区大小是多少,数据在哪里缓冲?
如果我想在每次耗时任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 操作符吗?
如果缓冲区已满,我不想丢失数据,我想等待或在这种情况下什么?
考虑使用zip操作符将两个无限的Observable压缩在一起,其中一个Observable发出的项目频率是另一个的两倍.
当前的实现是无损耗的,即如果我让这些Observables发射一小时然后我在它们的发射速率之间切换,第一个Observable将最终赶上另一个.
随着缓冲区越来越大,这将导致内存爆炸.
如果第一个observable将发出几个小时的项目而第二个将在最后发出一个项目,则会发生同样的情况.
如何为此运营商实现有损行为?我只是想随时从两个流中获得排放,而我不关心我错过的更快流量的排放量.
澄清:
zip
操作员的无损性质导致的内存爆炸.例:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
Run Code Online (Sandbox Code Playgroud)
Regular zip
会产生以下输出:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
Run Code Online (Sandbox Code Playgroud)
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); …
Run Code Online (Sandbox Code Playgroud)我在 Nodejs 模式下使用优秀的 Papa Parse 库,将超过 100 万行的大型(500 MB)CSV 文件流式传输到缓慢的持久性 API,该 API 一次只能接受一个请求。持久性 API 基于Promise
s,但从 Papa Parse,我在同步事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }
我面临的挑战是 Papa Parse 从流中转储 CSV 行的速度太快,以至于我缓慢的持久性 API 无法跟上。因为 Papa 是同步的,而我的 API 是基于Promise的,所以我不能只调用事件处理await doDirtyWork(row)
程序on
,因为同步和异步代码不会混合。
或者它们可以混合,但我只是不知道如何混合?
我的问题是,我可以让 Papa 的事件处理程序等待我的 API 调用完成吗?是不是直接在on("data")
事件中执行持久性 API 请求,让on()
函数以某种方式徘徊直到脏 API 工作完成?
就内存占用而言,我到目前为止的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式对大量事件进行排队。on("data")
我还可以将 Promise 工厂排列在一个数组中,并在循环中进行处理。无论哪种方式,我最终都会将几乎整个 CSV 文件作为未来 Promise(承诺工厂)的巨大集合保存在内存中,直到我缓慢的 API 调用完全完成。
async importCSV(filePath) { …
Run Code Online (Sandbox Code Playgroud) backpressure ×10
java ×3
node.js ×3
stream ×3
rx-java ×2
apache-spark ×1
asynchronous ×1
clojure ×1
core.async ×1
es6-promise ×1
hadoop ×1
javascript ×1
memory-leaks ×1
node-streams ×1
promise ×1
rxjs ×1
rxjs5 ×1