标签: backpressure

在这种情况下,为什么我们需要Publish和RefCount Rx运算符?

我正在尝试熟悉反应背压处理的问题,特别是通过阅读这个维基:https://github.com/ReactiveX/RxJava/wiki/Backpressure

在缓冲段落中,我们有更多涉及的示例代码:

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
Run Code Online (Sandbox Code Playgroud)

如果我理解正确,我们通过为缓冲区运算符生成去抖动信号流来有效地去除突发源流.

但为什么我们需要在这里使用发布和引用计数器?如果我们放弃它们会导致什么问题?评论并没有让我更清楚,默认情况下RxJava Observables不是多播吗?

java reactive-programming backpressure rx-java

5
推荐指数
1
解决办法
1183
查看次数

RxJS首先取得油门然后等待

我想mousewheel使用RxJS-DOM 观察事件,以便在第一个事件触发时,我将其转发然后删除任何和所有值,直到后续值之间的延迟超过先前指定的持续时间.

我想象的运算符可能看起来像:

Rx.DOM.fromEvent(window, 'mousewheel', (e) => e.deltaY)
.timegate(500 /* ms */)
Run Code Online (Sandbox Code Playgroud)

想象一下以下数据流:

0 - (200 ms) - 1 - (400ms) - 2 - (600ms) - 3
Run Code Online (Sandbox Code Playgroud)

其中发送的值是数字,时间描述下一个值到达的时间.由于0是第一个值,它将被发射,然后所有值都将被删除,因为后续值之间的各个延迟不大于500ms.

与节流阀不同,计算值之间的时间延迟,无论是否发出最后接收的值.使用油门,0将被发送,200 ms将被过去,1将被评估并失败,400 ms将被过去,2将被评估和PASS,因为最后一个发射值(0)和当前接收的值之间经过的时间(2 )是600毫秒,而对于我的操作员,它将相对于1进行评估,并且经过的时间将是400毫秒,因此测试失败.

而且这个运营商也没有去抖动.它不是等待间隔过去才发出,而是先发送第一个值,然后根据所有未来值进行评估,依此类推.

像这样的运营商是否已经存在?如果没有,我将如何制作一个?

javascript rxjs ecmascript-6 backpressure

5
推荐指数
1
解决办法
1561
查看次数

Android:实时数据跳过值

我正在使用实时数据将状态从视图模型发布到片段,这可能会导致状态频繁发布。但是可变实时数据正在跳过初始值并采用可用的最新值。

一篇文章谈到了这个特性,但是有没有办法处理这种情况,比如RxJava 中的 Flowable或设置背压策略,或者我是否需要回到使用 RxJava 并处理基于生命周期的发布?

以下是显示此行为的示例代码。发布了 1 到 10 的值,但只收到了两个值,0 和 10。我们可以在 Live Data 中更改此行为,还是应该为此使用 RxJava?

片段(订阅者):

class ParentFragment : Fragment() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        viewModel = ViewModelProviders.of(
            this, ParentViewModelFactory(this, null)
        ).get(ParentViewModel::class.java)

        viewModel.fastLiveData.observe(this, Observer {
            Timber.i(it.toString())
        })

        viewModel.startPublishing()
    }
}
Run Code Online (Sandbox Code Playgroud)

查看模型(发布者):

class ParentViewModel(private val savedState : SavedStateHandle)
    : ViewModel<ParentState>() {

    val fastLiveData : MutableLiveData<Int> = MutableLiveData(0)

    fun startPublishing() {
        for(x in 1..10) {
            Timber.i(x.toString())
            fastLiveData.postValue(x)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

输出 : …

android backpressure android-livedata

4
推荐指数
2
解决办法
2111
查看次数

RxJava onBackpressureBuffer不会发出项目

我亲眼目睹了onBackpressureBuffer的奇怪行为,我不确定它是一个有效的行为还是一个bug.

我有一个tcp调用,以一定的速率发出项目(使用流和inputStream,但只是为了一些信息)

最重要的是,我创建了一个使用create的observable,每次准备就会发出一个项目.

我们称之为message().

然后我这样做:

messages()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe({//do some work});
Run Code Online (Sandbox Code Playgroud)

我注意到使用很少抛出MissingBackPressureException的分析工具,所以我在调用中添加了onBackpressureBuffer.

如果我在之后添加observeOn:

messages()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .onBackpressureBuffer()
    .subscribe({//do some work})
Run Code Online (Sandbox Code Playgroud)

Everyting工作得很好,但这意味着只有在获得UI主线程之后它才会缓冲,所以我更喜欢它是这样的:

messages()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({//do some work});
Run Code Online (Sandbox Code Playgroud)

事情开始变得奇怪.

我注意到while messages()会继续发出项目,但在某些时候它们将停止发送给订阅者.

更准确地说,正好在16个项目之后,显而易见的是缓冲区将开始持有物品而不会将它们向前传递.

一旦我取消了messages()某种超时机制,它将导致messages()发出onError(),缓冲区将立即发出它保存的所有项目(它们将被处理).

我已经检查过是否是订购太多工作的订户错误但是没有,他已经完成但他仍然没有得到这些物品......

我也尝试request(n)在订阅者中使用该方法,在onNext()完成后请求一个项目,但缓冲区不起作用.

我怀疑Main Android UI Thread的消息系统带有背压导致这个,但我无法解释原因.

谁能解释为什么会这样?这是一个错误还是一个有效的行为?TNX!

android ui-thread backpressure rx-java rx-android

3
推荐指数
1
解决办法
759
查看次数

卡夫卡的背压

我在卡夫卡(Kafka)遇到过这样的情况,生产者以比消费者消费率高得多的速度发布消息。我必须在kafka中实施反压实现,以进一步消耗和处理。

请让我知道如何在spark和普通的Java API中实现。

apache-kafka backpressure spark-streaming

3
推荐指数
1
解决办法
3348
查看次数

为什么 `Publishers.Map` 急切地消耗上游值?

假设我有一个自定义订阅者,它请求订阅一个值,然后在收到前一个值三秒后请求一个附加值:

class MySubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    private var subscription: Subscription?

    func receive(subscription: Subscription) {
        print("Subscribed")

        self.subscription = subscription
        subscription.request(.max(1))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Value: \(input)")

        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
            self.subscription?.request(.max(1))
        }

        return .none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        print("Complete")
        subscription = nil
    }
}
Run Code Online (Sandbox Code Playgroud)

如果我使用它来订阅无限范围的发布者,则可以很好地处理背压,发布者每次等待 3 秒,直到收到下一个发送值的请求:

(1...).publisher.subscribe(MySubscriber())

// Prints values infinitely with ~3 seconds between each:
//
//     Subscribed
//     Value: 1
//     Value: 2
//     Value: …
Run Code Online (Sandbox Code Playgroud)

backpressure swift combine

3
推荐指数
1
解决办法
428
查看次数

内部微服务调用的消息总线与Quasar/HTTP

我希望优化当前使用HTTP/REST进行内部节点到节点通信的微服务架构.

一种选择是在服务中实现背压功能,(例如)通过将类似Quasar的东西集成到堆栈中.这无疑会改善一切.但我看到了几个挑战.一个是,异步客户端线程是瞬态的(在内存中),在客户端故障(崩溃)时,这些重试线程将丢失.第二,理论上,如果目标服务器停机一段时间,客户端最终可能会尝试重试OOM,因为线程最终受到限制,甚至是Quasar Fibers.

我知道这有点偏执,但我想知道基于队列的替代方案是否会在非常大的范围内更有利.

它仍然像Quasar /光纤一样异步工作,除了a)队列是集中管理的并且离开客户端JVM,以及b)队列可以是持久的,因此在客户端和/或目标服务器发生故障的情况下,没有在线消息迷路了.

排队的缺点当然是有更多的跳跃,它会减慢系统的速度.但我认为Quasar ROI可能达到峰值,而集中且持久的队列对于扩展和HA来说更为关键.

我的问题是:

是否讨论了这种权衡?是否有任何关于使用集中式外部队列/路由器方法进行内部服务通信的文章.

TL; DR; 我刚才意识到我可以将这个问题说成:

"什么时候使用基于消息总线的内部服务通信,而不是微服务架构中的直接HTTP."

java message-bus backpressure quasar microservices

2
推荐指数
1
解决办法
1046
查看次数

Backpressure如何在RxJava内部发生

我已经在RxJava上阅读了一些关于背压的文档,但我无法找到一个详细的解释,比如它是如何在图书馆内部发生的,每个人都只是总结它就像"生产者"太快而"消费者"太慢.

例如,如下面的代码:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);
Run Code Online (Sandbox Code Playgroud)

我一直在浏览RxJava源代码,所以我的理解是在主线程中我们将在每毫秒发出事件,一旦我们发出它,我们将值传递给System.out.println(i)方法并将其抛入newThead scheduler的线程池并在runnable中运行该方法.

所以我的问题是,异常是如何在内部发生的?因为当我们调用Thread.sleep()时,我们只是睡眠处理方法调用的线程 - > System.out.println()而不影响线程池中的其他线程,它怎么会导致异常.是因为线程池不再具有足够的可用线程了吗?

谢谢

java multithreading backpressure rx-java

2
推荐指数
1
解决办法
245
查看次数

Apache Flink:如何处理背压?

对于操作员,输入流比其输出流快,因此其输入缓冲区将阻塞前一个操作员的输出线程,该线程将数据传输到该操作员。对?

Flink和Spark是否都通过阻塞线程来处理背压?那么它们之间有什么区别?

对于数据源,它会持续生成数据,如果其输出线程被阻塞怎么办?缓冲区会溢出吗?

backpressure apache-flink

2
推荐指数
1
解决办法
1231
查看次数

RxJava:丢弃物品?- 背压

我使用RxJava观察几个按钮上的点击.

这些订阅将在一个对象上调用不同的函数,这需要几毫秒.这些功能是同步的.

问题是,当按下太多按钮时,我会得到一个背压异常.对我来说有用的是丢弃几个输入(最好是oldes).RxJava可以吗?

java backpressure rx-java

1
推荐指数
1
解决办法
439
查看次数

如何在Rxjava2中获得有关背压的实际最新事件?Flowable.onBackpressureLatest()未按预期工作

当生产者生产事件的速度快于客户消费时.

我想用可流动onBackpressureLatest() ,我可以得到最新的情况下发出的.

但事实证明有一个128的默认缓冲区.我得到的是之前缓冲的过时事件.

那么如何才能获得最新的实际活动?

这是示例代码:

Flowable.interval(40, TimeUnit.MILLISECONDS)
            .doOnNext{
                println("doOnNext $it")
            }
            .onBackpressureLatest()
            .observeOn(Schedulers.single())
            .subscribe {
                println("subscribe $it")
                Thread.sleep(100)
            }
Run Code Online (Sandbox Code Playgroud)

我的期望:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   2
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   5
doOnNext    6
doOnNext    7
    subscribe   7
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   10
...
Run Code Online (Sandbox Code Playgroud)

我得到了什么:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   1
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   2
doOnNext    6
doOnNext    7 …
Run Code Online (Sandbox Code Playgroud)

backpressure rx-java rx-java2

1
推荐指数
1
解决办法
45
查看次数

在使用 Spring Project Reactor 延迟背压后重试?

背景

我正在尝试使用Spring Project Reactor 3.3.0 版实现类似于简单的非阻塞速率限制器的东西。例如,要将数量限制为每秒 100 个请求,我使用以下实现:

myFlux
      .bufferTimeout(100, Duration.ofSeconds(1))
      .delayElements(Duration.ofSeconds(1))
      ..
Run Code Online (Sandbox Code Playgroud)

这适用于我的用例,但如果订阅者没有跟上myFlux发布者的速度,它会(正确地)抛出OverflowException

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxLift] :
    reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
Run Code Online (Sandbox Code Playgroud)

在我的情况下,重要的是所有元素都被订阅者消耗,因此例如降低背压 ( onBackpressureDrop()) 是不可接受的。

有没有办法,而不是在背压下丢弃元素,只是暂停消息的发布,直到订阅者赶上?在我的情况下myFlux,发布了一个有限但大量的元素,这些元素持久存在于持久数据库中,因此恕我直言,不应该要求删除元素。

spring backpressure spring-boot project-reactor

1
推荐指数
1
解决办法
601
查看次数

如何高效地将flink pipeline中的数据写入redis

我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。

当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:

public class QueryExample {
    public static Long throughputCounterAfter=new Long("0");
    public static void main(String[] args) {
        int k_partitions = 10;
        reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(5 * 32);
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
        props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
        // not to be shared with another job consuming the same topic
        props.setProperty("group.id", "flink-group");
        props.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
                new SimpleStringSchema(),
                props);

        DataStream<String> purchasesStream = env
                .addSource(purchasesConsumer)
                .setParallelism(Math.min(5 * 32, k_partitions));
        DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …
Run Code Online (Sandbox Code Playgroud)

redis backpressure apache-flink flink-streaming flink-sql

0
推荐指数
1
解决办法
2314
查看次数