VSO*_*VSO 45 javascript observable rxjs angular
我根本不明白其目的mergeMap.我听过两个"解释:
merge和map" - nope的组合(或者我不能复制这个).请考虑以下代码:
var obs1 = new Rx.Observable.interval(1000);
var obs2 = new Rx.Observable.interval(1000);
//Just a merge and a map, works fine
obs1.merge(obs2).map(x=> x+'a').subscribe(
next => console.log(next)
)
//Who know what - seems to do the same thing as a plain map on 1 observable
obs1.mergeMap(val => Rx.Observable.of(val + `B`))
.subscribe(
next => console.log(next)
)
Run Code Online (Sandbox Code Playgroud)
标有"谁知道什么"的最后一篇文章只不过是一张地图obs1- 重点是什么?
什么是mergeMap真正做到?什么是有效用例的示例?(最好带一些代码)
art*_*iak 102
TL;博士; mergeMap比...更强大map.理解mergeMap是访问Rx全部功能的必要条件.
二者mergeMap并map作用于单流(相对于zip,combineLatest)
二者mergeMap并map可以改变一个流的元件(相对于filter,delay)
不能改变源流的大小(假设:map本身不会throw); 对于源中的每个元素,mapped只发出一个元素; map不能忽略元素(例如filter);
在默认调度程序的情况下,转换同步发生; 100%清除:源流可以异步传递其元素,但每个下一个元素都会立即mapped重新发出; map例如,不能及时移动元素delay
对返回值没有限制
id: x => x
可以改变源流的大小; 对于每个元素,可能存在创建/发出的任意数量(0,1或许多)新元素
它提供了对异步性的完全控制 - 既可以创建/发出新元素,也可以同时处理源流中有多少元素; 例如,假设源流发出了10个元素但maxConcurrency设置为2则会立即处理两个第一个元素,其余8个缓冲; 一旦处理complete完一个,源流中的下一个元素将被处理,等等 - 这有点棘手,但请看下面的例子
所有其他运算符都可以使用just mergeMap和Observableconstructor 来实现
可用于递归异步操作
返回值必须是Observable类型(或者Rx必须知道如何从中创建observable - 例如promise,array)
id: x => Rx.Observable.of(x)
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
Run Code Online (Sandbox Code Playgroud)
类比不显示完整图像,它基本上对应于.mergeMap与maxConcurrency设置为1.在这种情况下的元素将被如上述排序,但在一般情况下,它不必须是如此.我们唯一的保证是新元素的排放将按其在基础流中的位置排序.例如:[3,1,2,4,9,1]和[2,3,1,1,9,4]有效,但[1,1,4,2,3,9]不是(因为在底层流4之后发出2).
mergeMap:// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>Run Code Online (Sandbox Code Playgroud)
Mar*_*ten 20
.mergeMap()允许您将更高阶的Observable压缩为单个流.例如:
Rx.Observable.from([1,2,3,4])
.map(i => getFreshApiData())
.subscribe(val => console.log('regular map result: ' + val));
//vs
Rx.Observable.from([1,2,3,4])
.mergeMap(i => getFreshApiData())
.subscribe(val => console.log('mergeMap result: ' + val));
function getFreshApiData() {
return Rx.Observable.of('retrieved new data')
.delay(1000);
}Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>Run Code Online (Sandbox Code Playgroud)
有关.xxxMap()运算符的详细解释,请参阅我在这个问题的答案:Rxjs - 如何在数组中提取多个值并同步将它们反馈给可观察流
| 归档时间: |
|
| 查看次数: |
28673 次 |
| 最近记录: |