我是Spring Web-Flux的首发.我写了一个控制器如下:
@RestController
public class FirstController
{
@GetMapping("/first")
public Mono<String> getAllTweets()
{
return Mono.just("I am First Mono")
}
}
Run Code Online (Sandbox Code Playgroud)
我知道其中一个反应性好处是Backpressure,它可以平衡请求或响应率.我想知道如何在Spring Web-Flux中使用背压机制.
我正在写一个模块,它是一个可写的流.我想为我的用户实现管道接口.
如果发生某些错误,我需要暂停可读流并发出错误事件.然后,用户将决定 - 如果他没有错误,他应该能够恢复数据处理.
var writeable = new BackPressureStream();
writeable.on('error', function(error){
console.log(error);
writeable.resume();
});
var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);
Run Code Online (Sandbox Code Playgroud)
我看到该节点为我们提供readable.pause()了可用于暂停可读流的方法.但我无法得到如何从我的可写流模块中调用它:
var Writable = require('stream').Writable;
function BackPressureStream(options) {
Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);
BackPressureStream.prototype._write = function(chunk, encoding, done) {
done();
};
BackPressureStream.prototype.resume = function() {
this.emit('drain');
}
Run Code Online (Sandbox Code Playgroud)
如何在可写流中实现背压?
PS可以使用pipe/unpipe提供可读流作为参数的事件.但也有人说,对于管道流,暂停的唯一机会是从可写入中删除可读流.
我做对了吗?我必须取消管理可写流,直到用户呼叫恢复为止?用户调用恢复后,我应该管道可读流回来?
我是RxJava的初学者,我很好奇"背压"的含义.
这是否意味着生产者会给消费者带来压力?
或者这是否意味着消费者对生产者施加压力?(相反方向的压力)
我使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10.我与QUORUM一致性异步编写.
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex); …Run Code Online (Sandbox Code Playgroud) java multithreading rate-limiting backpressure datastax-java-driver
据我所知,Node的事件IO模型的一个后果就是无法告诉Node进程(例如)通过TCP套接字接收数据,阻止,一旦你连接了接收事件处理程序(或者否则开始收听数据).
如果接收器无法足够快地处理传入数据,则可能导致"无限并发",从而节点内的节点继续尽可能快地从套接字读取数据,在事件循环上调度新数据事件而不是阻塞在套接字上,直到进程最终耗尽内存并死掉.
接收方无法告诉节点减慢其读取速度,否则将允许TCP的内置流量控制机制启动并向发送方指示它需要减速.
首先,到目前为止我所描述的是准确的吗?有没有我错过了允许节点避免这种情况的东西?
Node Streams备受推崇的功能之一是自动处理背压.
AFAIK,可写流(tcp套接字)可以告诉它是否需要减速的唯一方法是查看socket.bufferSize(指示写入套接字但尚未发送的数据量).鉴于接收端的节点总是尽可能快地读取,这只能表明发送方和接收方之间的网络连接速度慢,而不是指示接收方是否无法跟上.
那么其次,Node Streams能否以某种方式自动反压在这种情况下处理无法跟上的接收器?
似乎这个问题影响浏览器通过websockets接收数据,原因类似于websockets API没有提供一种机制来告诉浏览器减慢从套接字读取的速度.
Node(以及使用websockets的浏览器)在应用程序级别实现手动流量控制机制,以明确告诉发送进程减速是唯一的解决方案吗?
我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并因错误而失败.似乎mapcat在core.async管道中使用可以打破压力.(由于超出本问题范围的原因,这是不幸的.)
下面是一些通过计算:x输入和输出mapcat换能器来演示问题的代码:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x)))))) …Run Code Online (Sandbox Code Playgroud) 我有一个接收单个事件(字符串)的 CustomReceiver。在火花应用程序的运行时使用接收到的单个事件从 nosql 读取数据并应用转换。当观察到每个批次的处理时间大于批次间隔时,我设置这个属性。
spark.streaming.backpressure.enabled=true
之后,我预计 CustomReceiver 不会在批处理处理时间长于批处理窗口时触发和接收事件,这并没有发生,并且仍然添加了一批积压的批次。我在这里错过了什么吗?
我是反应式编程世界的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。
以下是我试图实现的工作流程:
可流动的连续字符串流。
执行一个耗时的操作并将消息更改为另一个字符串
执行另一个耗时的操作。
现在我使用以下代码:
{
Flowable.create(subscriber -> {
some_stream.forEach(data -> {
subscriber.onNext(data);
});
}, BackpressureStrategy.BUFFER).
subscribeOn(Schedulers.io()). // Data emission will run io scheduler
observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
map(val -> Time_Consuming_Task(val)). // Task returns another string
observeOn(Schedulers.io()). / Next consumer will run on computation scheduler
subscribe(val -> Another_Time_Consuming_Task(val));
}
Run Code Online (Sandbox Code Playgroud)
现在对于小型操作,我没有看到任何与背压相关的问题。
但是对于大流,我不知道它会如何表现。
现在我的问题是:-
BackpressureStrategy.BUFFER的默认缓冲区大小是多少,数据在哪里缓冲?
如果我想在每次耗时任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 操作符吗?
如果缓冲区已满,我不想丢失数据,我想等待或在这种情况下什么?
考虑使用zip操作符将两个无限的Observable压缩在一起,其中一个Observable发出的项目频率是另一个的两倍.
当前的实现是无损耗的,即如果我让这些Observables发射一小时然后我在它们的发射速率之间切换,第一个Observable将最终赶上另一个.
随着缓冲区越来越大,这将导致内存爆炸.
如果第一个observable将发出几个小时的项目而第二个将在最后发出一个项目,则会发生同样的情况.
如何为此运营商实现有损行为?我只是想随时从两个流中获得排放,而我不关心我错过的更快流量的排放量.
澄清:
zip操作员的无损性质导致的内存爆炸.例:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
Run Code Online (Sandbox Code Playgroud)
Regular zip会产生以下输出:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
Run Code Online (Sandbox Code Playgroud)
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); …Run Code Online (Sandbox Code Playgroud)我在 Nodejs 模式下使用优秀的 Papa Parse 库,将超过 100 万行的大型(500 MB)CSV 文件流式传输到缓慢的持久性 API,该 API 一次只能接受一个请求。持久性 API 基于Promises,但从 Papa Parse,我在同步事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }
我面临的挑战是 Papa Parse 从流中转储 CSV 行的速度太快,以至于我缓慢的持久性 API 无法跟上。因为 Papa 是同步的,而我的 API 是基于Promise的,所以我不能只调用事件处理await doDirtyWork(row)程序on,因为同步和异步代码不会混合。
或者它们可以混合,但我只是不知道如何混合?
我的问题是,我可以让 Papa 的事件处理程序等待我的 API 调用完成吗?是不是直接在on("data")事件中执行持久性 API 请求,让on()函数以某种方式徘徊直到脏 API 工作完成?
就内存占用而言,我到目前为止的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式对大量事件进行排队。on("data")我还可以将 Promise 工厂排列在一个数组中,并在循环中进行处理。无论哪种方式,我最终都会将几乎整个 CSV 文件作为未来 Promise(承诺工厂)的巨大集合保存在内存中,直到我缓慢的 API 调用完全完成。
async importCSV(filePath) { …Run Code Online (Sandbox Code Playgroud) backpressure ×10
java ×3
node.js ×3
stream ×3
rx-java ×2
apache-spark ×1
asynchronous ×1
clojure ×1
core.async ×1
es6-promise ×1
hadoop ×1
javascript ×1
memory-leaks ×1
node-streams ×1
promise ×1
rxjs ×1
rxjs5 ×1