我正在尝试使用 RxJS 流实现可切换的自动保存功能。目标是:
这是我遇到的:
autoSave$ = new BehaviorSubject(true);
change$ = new Subject();
change$.pipe(
bufferToggle(
autoSave$.pipe(filter(autoSave => autoSave === false)),
() => autoSave$.pipe(filter(autoSave => autoSave === true)),
),
concatMap(changes => changes),
concatMap(change => apiService.patch(change)),
).subscribe(
() => console.log('Change sent'),
(error) => console.error(error),
);
Run Code Online (Sandbox Code Playgroud)
感谢bufferToggle,我能够在autoSave关闭时缓冲更改并在重新启用时发送它们。
问题是,虽然autoSave启用,但没有通过。我理解这是因为bufferToggle忽略了即将到来的流量,而其开放的 observable 不发出。
我觉得我应该有一个条件来绕过bufferTogglewhileautoSave启用,但我所有的尝试都惨遭失败。
有什么想法可以实现这一目标吗?
我是一名RabbitMQ正在探索的用户Redis,有两个关于 pub/sub 机制的问题
我可以将消息发布到每个客户端(消费者)删除其他客户端的条目的系统吗?我想发布 100 个任务,但每个任务只能由 1 个订阅者处理。
AFAIK,默认情况下,所有消息始终广播/发布给所有客户端。如果一个客户端需要 1 秒来处理消息,而另一个客户端需要 1 分钟,该怎么办?这里的限制是什么?某些消息会在某个时候被丢弃吗?
多谢!