我正在尝试熟悉反应背压处理的问题,特别是通过阅读这个维基:https://github.com/ReactiveX/RxJava/wiki/Backpressure
在缓冲段落中,我们有更多涉及的示例代码:
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
Run Code Online (Sandbox Code Playgroud)
如果我理解正确,我们通过为缓冲区运算符生成去抖动信号流来有效地去除突发源流.
但为什么我们需要在这里使用发布和引用计数器?如果我们放弃它们会导致什么问题?评论并没有让我更清楚,默认情况下RxJava Observables不是多播吗?
我想mousewheel使用RxJS-DOM 观察事件,以便在第一个事件触发时,我将其转发然后删除任何和所有值,直到后续值之间的延迟超过先前指定的持续时间.
我想象的运算符可能看起来像:
Rx.DOM.fromEvent(window, 'mousewheel', (e) => e.deltaY)
.timegate(500 /* ms */)
Run Code Online (Sandbox Code Playgroud)
想象一下以下数据流:
0 - (200 ms) - 1 - (400ms) - 2 - (600ms) - 3
Run Code Online (Sandbox Code Playgroud)
其中发送的值是数字,时间描述下一个值到达的时间.由于0是第一个值,它将被发射,然后所有值都将被删除,因为后续值之间的各个延迟不大于500ms.
与节流阀不同,计算值之间的时间延迟,无论是否发出最后接收的值.使用油门,0将被发送,200 ms将被过去,1将被评估并失败,400 ms将被过去,2将被评估和PASS,因为最后一个发射值(0)和当前接收的值之间经过的时间(2 )是600毫秒,而对于我的操作员,它将相对于1进行评估,并且经过的时间将是400毫秒,因此测试失败.
而且这个运营商也没有去抖动.它不是等待间隔过去才发出,而是先发送第一个值,然后根据所有未来值进行评估,依此类推.
像这样的运营商是否已经存在?如果没有,我将如何制作一个?
我正在使用实时数据将状态从视图模型发布到片段,这可能会导致状态频繁发布。但是可变实时数据正在跳过初始值并采用可用的最新值。
有一篇文章谈到了这个特性,但是有没有办法处理这种情况,比如RxJava 中的 Flowable或设置背压策略,或者我是否需要回到使用 RxJava 并处理基于生命周期的发布?
以下是显示此行为的示例代码。发布了 1 到 10 的值,但只收到了两个值,0 和 10。我们可以在 Live Data 中更改此行为,还是应该为此使用 RxJava?
片段(订阅者):
class ParentFragment : Fragment() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
viewModel = ViewModelProviders.of(
this, ParentViewModelFactory(this, null)
).get(ParentViewModel::class.java)
viewModel.fastLiveData.observe(this, Observer {
Timber.i(it.toString())
})
viewModel.startPublishing()
}
}
Run Code Online (Sandbox Code Playgroud)
查看模型(发布者):
class ParentViewModel(private val savedState : SavedStateHandle)
: ViewModel<ParentState>() {
val fastLiveData : MutableLiveData<Int> = MutableLiveData(0)
fun startPublishing() {
for(x in 1..10) {
Timber.i(x.toString())
fastLiveData.postValue(x)
}
}
}
Run Code Online (Sandbox Code Playgroud)
输出 : …
我亲眼目睹了onBackpressureBuffer的奇怪行为,我不确定它是一个有效的行为还是一个bug.
我有一个tcp调用,以一定的速率发出项目(使用流和inputStream,但只是为了一些信息)
最重要的是,我创建了一个使用create的observable,每次准备就会发出一个项目.
我们称之为message().
然后我这样做:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
Run Code Online (Sandbox Code Playgroud)
我注意到使用很少抛出MissingBackPressureException的分析工具,所以我在调用中添加了onBackpressureBuffer.
如果我在之后添加observeOn:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onBackpressureBuffer()
.subscribe({//do some work})
Run Code Online (Sandbox Code Playgroud)
Everyting工作得很好,但这意味着只有在获得UI主线程之后它才会缓冲,所以我更喜欢它是这样的:
messages()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
Run Code Online (Sandbox Code Playgroud)
事情开始变得奇怪.
我注意到while messages()会继续发出项目,但在某些时候它们将停止发送给订阅者.
更准确地说,正好在16个项目之后,显而易见的是缓冲区将开始持有物品而不会将它们向前传递.
一旦我取消了messages()某种超时机制,它将导致messages()发出onError(),缓冲区将立即发出它保存的所有项目(它们将被处理).
我已经检查过是否是订购太多工作的订户错误但是没有,他已经完成但他仍然没有得到这些物品......
我也尝试request(n)在订阅者中使用该方法,在onNext()完成后请求一个项目,但缓冲区不起作用.
我怀疑Main Android UI Thread的消息系统带有背压导致这个,但我无法解释原因.
谁能解释为什么会这样?这是一个错误还是一个有效的行为?TNX!
我在卡夫卡(Kafka)遇到过这样的情况,生产者以比消费者消费率高得多的速度发布消息。我必须在kafka中实施反压实现,以进一步消耗和处理。
请让我知道如何在spark和普通的Java API中实现。
假设我有一个自定义订阅者,它请求订阅一个值,然后在收到前一个值三秒后请求一个附加值:
class MySubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
private var subscription: Subscription?
func receive(subscription: Subscription) {
print("Subscribed")
self.subscription = subscription
subscription.request(.max(1))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Value: \(input)")
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
self.subscription?.request(.max(1))
}
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Complete")
subscription = nil
}
}
Run Code Online (Sandbox Code Playgroud)
如果我使用它来订阅无限范围的发布者,则可以很好地处理背压,发布者每次等待 3 秒,直到收到下一个发送值的请求:
(1...).publisher.subscribe(MySubscriber())
// Prints values infinitely with ~3 seconds between each:
//
// Subscribed
// Value: 1
// Value: 2
// Value: …Run Code Online (Sandbox Code Playgroud) 我希望优化当前使用HTTP/REST进行内部节点到节点通信的微服务架构.
一种选择是在服务中实现背压功能,(例如)通过将类似Quasar的东西集成到堆栈中.这无疑会改善一切.但我看到了几个挑战.一个是,异步客户端线程是瞬态的(在内存中),在客户端故障(崩溃)时,这些重试线程将丢失.第二,理论上,如果目标服务器停机一段时间,客户端最终可能会尝试重试OOM,因为线程最终受到限制,甚至是Quasar Fibers.
我知道这有点偏执,但我想知道基于队列的替代方案是否会在非常大的范围内更有利.
它仍然像Quasar /光纤一样异步工作,除了a)队列是集中管理的并且离开客户端JVM,以及b)队列可以是持久的,因此在客户端和/或目标服务器发生故障的情况下,没有在线消息迷路了.
排队的缺点当然是有更多的跳跃,它会减慢系统的速度.但我认为Quasar ROI可能达到峰值,而集中且持久的队列对于扩展和HA来说更为关键.
我的问题是:
是否讨论了这种权衡?是否有任何关于使用集中式外部队列/路由器方法进行内部服务通信的文章.
TL; DR; 我刚才意识到我可以将这个问题说成:
"什么时候使用基于消息总线的内部服务通信,而不是微服务架构中的直接HTTP."
我已经在RxJava上阅读了一些关于背压的文档,但我无法找到一个详细的解释,比如它是如何在图书馆内部发生的,每个人都只是总结它就像"生产者"太快而"消费者"太慢.
例如,如下面的代码:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
Run Code Online (Sandbox Code Playgroud)
我一直在浏览RxJava源代码,所以我的理解是在主线程中我们将在每毫秒发出事件,一旦我们发出它,我们将值传递给System.out.println(i)方法并将其抛入newThead scheduler的线程池并在runnable中运行该方法.
所以我的问题是,异常是如何在内部发生的?因为当我们调用Thread.sleep()时,我们只是睡眠处理方法调用的线程 - > System.out.println()而不影响线程池中的其他线程,它怎么会导致异常.是因为线程池不再具有足够的可用线程了吗?
谢谢
对于操作员,输入流比其输出流快,因此其输入缓冲区将阻塞前一个操作员的输出线程,该线程将数据传输到该操作员。对?
Flink和Spark是否都通过阻塞线程来处理背压?那么它们之间有什么区别?
对于数据源,它会持续生成数据,如果其输出线程被阻塞怎么办?缓冲区会溢出吗?
我使用RxJava观察几个按钮上的点击.
这些订阅将在一个对象上调用不同的函数,这需要几毫秒.这些功能是同步的.
问题是,当按下太多按钮时,我会得到一个背压异常.对我来说有用的是丢弃几个输入(最好是oldes).RxJava可以吗?
当生产者生产事件的速度快于客户消费时.
我想用可流动与onBackpressureLatest() ,我可以得到最新的情况下发出的.
但事实证明有一个128的默认缓冲区.我得到的是之前缓冲的过时事件.
那么如何才能获得最新的实际活动?
这是示例代码:
Flowable.interval(40, TimeUnit.MILLISECONDS)
.doOnNext{
println("doOnNext $it")
}
.onBackpressureLatest()
.observeOn(Schedulers.single())
.subscribe {
println("subscribe $it")
Thread.sleep(100)
}
Run Code Online (Sandbox Code Playgroud)
我的期望:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 2
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 5
doOnNext 6
doOnNext 7
subscribe 7
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 10
...
Run Code Online (Sandbox Code Playgroud)
我得到了什么:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 1
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 2
doOnNext 6
doOnNext 7 …Run Code Online (Sandbox Code Playgroud) 背景
我正在尝试使用Spring Project Reactor 3.3.0 版实现类似于简单的非阻塞速率限制器的东西。例如,要将数量限制为每秒 100 个请求,我使用以下实现:
myFlux
.bufferTimeout(100, Duration.ofSeconds(1))
.delayElements(Duration.ofSeconds(1))
..
Run Code Online (Sandbox Code Playgroud)
这适用于我的用例,但如果订阅者没有跟上myFlux发布者的速度,它会(正确地)抛出OverflowException:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxLift] :
reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
Run Code Online (Sandbox Code Playgroud)
在我的情况下,重要的是所有元素都被订阅者消耗,因此例如降低背压 ( onBackpressureDrop()) 是不可接受的。
题
有没有办法,而不是在背压下丢弃元素,只是暂停消息的发布,直到订阅者赶上?在我的情况下myFlux,发布了一个有限但大量的元素,这些元素持久存在于持久数据库中,因此恕我直言,不应该要求删除元素。
我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。
当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:
public class QueryExample {
public static Long throughputCounterAfter=new Long("0");
public static void main(String[] args) {
int k_partitions = 10;
reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(5 * 32);
Properties props = new Properties();
props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
// not to be shared with another job consuming the same topic
props.setProperty("group.id", "flink-group");
props.setProperty("enable.auto.commit","false");
FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
new SimpleStringSchema(),
props);
DataStream<String> purchasesStream = env
.addSource(purchasesConsumer)
.setParallelism(Math.min(5 * 32, k_partitions));
DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …Run Code Online (Sandbox Code Playgroud) backpressure ×13
rx-java ×5
java ×4
android ×2
apache-flink ×2
apache-kafka ×1
combine ×1
ecmascript-6 ×1
flink-sql ×1
javascript ×1
message-bus ×1
quasar ×1
redis ×1
rx-android ×1
rx-java2 ×1
rxjs ×1
spring ×1
spring-boot ×1
swift ×1
ui-thread ×1