标签: backpressure

Spring Web-Flux中的背压机制

我是Spring Web-Flux的首发.我写了一个控制器如下:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}
Run Code Online (Sandbox Code Playgroud)

我知道其中一个反应性好处是Backpressure,它可以平衡请求或响应率.我想知道如何在Spring Web-Flux中使用背压机制.

java reactive-programming backpressure spring-webflux

24
推荐指数
1
解决办法
6054
查看次数

在nodejs中从可写入的流中暂停管道可读流的正确方法是什么?

我正在写一个模块,它是一个可写的流.我想为我的用户实现管道接口.

如果发生某些错误,我需要暂停可读流并发出错误事件.然后,用户将决定 - 如果他没有错误,他应该能够恢复数据处理.

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);
Run Code Online (Sandbox Code Playgroud)

我看到该节点为我们提供readable.pause()了可用于暂停可读流的方法.但我无法得到如何从我的可写流模块中调用它:

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}
Run Code Online (Sandbox Code Playgroud)

如何在可写流中实现背压?

PS可以使用pipe/unpipe提供可读流作为参数的事件.但也有人说,对于管道流,暂停的唯一机会是从可写入中删除可读流.

我做对了吗?我必须取消管理可写流,直到用户呼叫恢复为止?用户调用恢复后,我应该管道可读流回来?

stream node.js backpressure

15
推荐指数
1
解决办法
1946
查看次数

Rxjava中的"背压"一词是什么意思?

我是RxJava的初学者,我很好奇"背压"的含义.

这是否意味着生产者会给消费者带来压力?

或者这是否意味着消费者对生产者施加压力?(相反方向的压力)

reactive-programming backpressure rx-java

15
推荐指数
3
解决办法
2372
查看次数

在使用"executeAsync"时如何限制写入请求到cassandra?

我使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10.我与QUORUM一致性异步编写.

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex); …
Run Code Online (Sandbox Code Playgroud)

java multithreading rate-limiting backpressure datastax-java-driver

14
推荐指数
1
解决办法
1850
查看次数

Node.JS TCP上的无限并发/流背压

据我所知,Node的事件IO模型的一个后果就是无法告诉Node进程(例如)通过TCP套接字接收数据,阻止,一旦你连接了接收事件处理程序(或者否则开始收听数据).

如果接收器无法足够快地处理传入数据,则可能导致"无限并发",从而节点内的节点继续尽可能快地从套接字读取数据,在事件循环上调度新数据事件而不是阻塞在套接字上,直到进程最终耗尽内存并死掉.

接收方无法告诉节点减慢其读取速度,否则将允许TCP的内置流量控制机制启动并向发送方指示它需要减速.

首先,到目前为止我所描述的是准确的吗?有没有我错过了允许节点避免这种情况的东西?

Node Streams备受推崇的功能之一是自动处理背压.

AFAIK,可写流(tcp套接字)可以告诉它是否需要减速的唯一方法是查看socket.bufferSize(指示写入套接字但尚未发送的数据量).鉴于接收端的节点总是尽可能快地读取,这只能表明发送方和接收方之间的网络连接速度慢,而不是指示接收方是否无法跟上.

那么其次,Node Streams能否以某种方式自动反压在这种情况下处理无法跟上的接收器?

似乎这个问题影响浏览器通过websockets接收数据,原因类似于websockets API没有提供一种机制来告诉浏览器减慢从套接字读取的速度.

Node(以及使用websockets的浏览器)在应用程序级别实现手动流量控制机制,以明确告诉发送进程减速是唯一的解决方案吗?

stream node.js backpressure

9
推荐指数
1
解决办法
1786
查看次数

当mapcat在core.async中破坏背压时,内存泄漏在哪里?

我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并因错误而失败.似乎mapcat在core.async管道中使用可以打破压力.(由于超出本问题范围的原因,这是不幸的.)

下面是一些通过计算:x输入和输出mapcat换能器来演示问题的代码:

(ns mapcat.core
  (:require [clojure.core.async :as async]))

(defn test-backpressure [n length]
  (let [message (repeat length :x)
        input (async/chan)
        transform (async/chan 1 (mapcat seq))
        output (async/chan)
        sent (atom 0)]
    (async/pipe input transform)
    (async/pipe transform output)
    (async/go
      (dotimes [_ n]
        (async/>! input message)
        (swap! sent inc))
      (async/close! input))
    (async/go-loop [x 0]
      (when (= 0 (mod x (/ (* n length) 10)))
        (println "in:" (* @sent length) "out:" x))
      (when-let [_ (async/<! output)]
        (recur (inc x)))))) …
Run Code Online (Sandbox Code Playgroud)

memory-leaks asynchronous clojure backpressure core.async

9
推荐指数
1
解决办法
247
查看次数

Spark Streaming 中的背压属性是如何工作的?

我有一个接收单个事件(字符串)的 CustomReceiver。在火花应用程序的运行时使用接收到的单个事件从 nosql 读取数据并应用转换。当观察到每个批次的处理时间大于批次间隔时,我设置这个属性。

spark.streaming.backpressure.enabled=true

之后,我预计 CustomReceiver 不会在批处理处理时间长于批处理窗口时触发和接收事件,这并没有发生,并且仍然添加了一批积压的批次。我在这里错过了什么吗?

hadoop backpressure apache-spark spark-streaming

7
推荐指数
1
解决办法
6229
查看次数

rxjava 2 中 BackpressureStrategy.BUFFER 和 onBackpressureBuffer 运算符之间的区别

我是反应式编程世界的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。

以下是我试图实现的工作流程:

  1. 可流动的连续字符串流。

  2. 执行一个耗时的操作并将消息更改为另一个字符串

  3. 执行另一个耗时的操作。

现在我使用以下代码:

{
    Flowable.create(subscriber -> {
             some_stream.forEach(data -> {
                subscriber.onNext(data);
            });
        }, BackpressureStrategy.BUFFER).
    subscribeOn(Schedulers.io()). // Data emission will run io scheduler
    observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
    map(val -> Time_Consuming_Task(val)). // Task returns another string
    observeOn(Schedulers.io()).  / Next consumer will run on computation scheduler
    subscribe(val -> Another_Time_Consuming_Task(val));
}
Run Code Online (Sandbox Code Playgroud)

现在对于小型操作,我没有看到任何与背压相关的问题。

但是对于大流,我不知道它会如何表现。

现在我的问题是:-

  1. BackpressureStrategy.BUFFER的默认缓冲区大小是多少,数据在哪里缓冲?

  2. 如果我想在每次耗时任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 操作符吗?

  3. 如果缓冲区已满,我不想丢失数据,我想等待或在这种情况下什么?

java stream backpressure rx-java

7
推荐指数
1
解决办法
4882
查看次数

RxJs:zip运算符的有损形式

考虑使用zip操作符将两个无限的Observable压缩在一起,其中一个Observable发出的项目频率是另一个的两倍.
当前的实现是无损耗的,即如果我让这些Observables发射一小时然后我在它们的发射速率之间切换,第一个Observable将最终赶上另一个.
随着缓冲区越来越大,这将导致内存爆炸.
如果第一个observable将发出几个小时的项目而第二个将在最后发出一个项目,则会发生同样的情况.

如何为此运营商实现有损行为?我只是想随时从两个流中获得排放,而我不关心我错过的更快流量的排放量.

澄清:

  • 我试图在这里解决的主要问题是由于zip操作员的无损性质导致的内存爆炸.
  • 即使两个流每次都发出相同的值,我想随时从两个流中获得发射

例:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70
Run Code Online (Sandbox Code Playgroud)

Regular zip会产生以下输出:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
Run Code Online (Sandbox Code Playgroud)

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); …
Run Code Online (Sandbox Code Playgroud)

javascript reactive-programming rxjs backpressure rxjs5

6
推荐指数
1
解决办法
1677
查看次数

可以让事件处理程序等到基于异步/Promise 的代码完成吗?

我在 Nodejs 模式下使用优秀的 Papa Parse 库,将超过 100 万行的大型(500 MB)CSV 文件流式传输到缓慢的持久性 API,该 API 一次只能接受一个请求。持久性 API 基于Promises,但从 Papa Parse,我在同步事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }

我面临的挑战是 Papa Parse 从流中转储 CSV 行的速度太快,以至于我缓慢的持久性 API 无法跟上。因为 Papa 是同步的,而我的 API 是基于Promise的,所以我不能只调用事件处理await doDirtyWork(row)程序on,因为同步和异步代码不会混合。

或者它们可以混合,但我只是不知道如何混合?

我的问题是,我可以让 Papa 的事件处理程序等待我的 API 调用完成吗?是不是直接在on("data")事件中执行持久性 API 请求,让on()函数以某种方式徘徊直到脏 API 工作完成?

就内存占用而言,我到目前为止的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式对大量事件进行排队。on("data")我还可以将 Promise 工厂排列在一个数组中,并在循环中进行处理。无论哪种方式,我最终都会将几乎整个 CSV 文件作为未来 Promise(承诺工厂)的巨大集合保存在内存中,直到我缓慢的 API 调用完全完成。

async importCSV(filePath) { …
Run Code Online (Sandbox Code Playgroud)

node.js promise backpressure es6-promise node-streams

6
推荐指数
1
解决办法
1612
查看次数