CBl*_*lew 5 javascript observable rxjs typescript angular
Angular 7 文档提供了rxjs Observables在为AJAX请求实现指数补偿中的实际用法的示例:
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) {
return pipe(
retryWhen(attempts => range(1, maxTries)
.pipe(
zip(attempts, (i) => i),
map(i => i * i),
mergeMap(i => timer(i * ms))
)
)
);
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
Run Code Online (Sandbox Code Playgroud)
虽然我了解了Observables和backoff的概念,但我还不太清楚,如何精确retryWhen计算重新订阅源的时间间隔ajax。
具体如何做zip,map以及mapMerge在此设置的工作?
并将attempts其发射到对象中将包含retryWhen什么?
我浏览了他们的参考页,但仍然无法解决这个问题。
我花了很多时间对此进行研究(出于学习目的),并将尝试尽可能全面地解释此代码的工作原理。
首先,这是原始代码,带有注释:
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) { // (1)
return pipe( // (2)
retryWhen(attempts => range(1, maxTries) // (3)
.pipe(
zip(attempts, (i) => i), // (4)
map(i => i * i), // (5)
mergeMap(i => timer(i * ms)) // (6)
)
)
); // (7)
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
Run Code Online (Sandbox Code Playgroud)
backoff运算符之外创建自定义运算retryWhen符。我们稍后可以在pipe函数中应用它。pipemethod返回一个自定义运算符。我们的自定义运算符将成为修改后的retryWhen运算符。它带有一个函数参数。该函数将被调用一次,特别retryWhen是在首次遇到/调用时。顺便说一下,只有在可观察的源产生错误时才retryWhen起作用。然后,它可以防止错误进一步传播并重新订阅源。如果源产生非错误结果(无论是第一次订阅还是重试),则将其传递且不涉及。retryWhen
几句话attempts。这是可观察的。这不是可观察到的源。它是专门为创建的retryWhen。它只有一种用途:只有可观察到的对源可观察项的订阅(或重新订阅)导致错误时,将attempts触发a next。我们可以attempts免费使用它,以便以某种方式对可观察到的源的每次失败订阅做出反应。
这就是我们要做的。
首先,我们创建range(1, maxTries)一个可观察的对象,对于每个我们愿意执行的重试,它都有一个整数。range准备好随时随地发射所有数字,但是我们必须坚决反对:只有在发生其他重试时,我们才需要一个新数字。因此,这就是我们...
...用attempts。含义,将的每个发射值attempts与的单个值相结合range。
请记住,我们当前使用的函数将仅被调用一次,并且在那时,attempts将仅被触发next一次-对于初始失败的订阅。因此,在这一点上,我们的两个压缩观测值仅产生了一个值。
顺便说一句,将两个可观察值压缩为一个的值是多少?此功能决定:(i) => i。为清楚起见,可以将其编写(itemFromRange, itemFromAttempts) => itemFromRange。第二个参数未使用,因此将其删除,第一个参数重命名为i。
这里发生的是,我们只是不理会由激发的值attempts,我们只对它们被激发的事实感兴趣。每当发生这种情况时,我们都会从range可观察值中提取下一个值...
...并将其平方。这是用于指数补偿的指数部分。
因此,现在无论何时(重新)订阅源失败,我们手中的整数都会不断增加(1、4、9、16 ...)。我们如何将该整数转换为时间延迟,直到下一次重新订阅?
请记住,我们当前内置的此函数必须使用attempts输入作为返回可观察到的函数。此结果可观察到的对象仅构建一次。retryWhen然后订阅产生的可观察结果,并:当产生可观察的火灾时,重试订阅可观察的源next;只要可观察到的结果触发了那些相应的事件,就可以调用complete或error在源上观察到。
长话短说,我们需要retryWhen稍等一下。delay也许可以使用运算符,但是设置延迟的指数增长可能会很痛苦。相反,mergeMap操作员开始起作用。
mergeMap是两个运算符组合在一起的快捷方式:map和mergeAll。map只需将每个递增的整数(1、4、9、16 ...)转换为timer可观察值,该值next在经过毫秒数后就会触发。mergeAll部队retryWhen实际订阅timer。如果最后一点没有发生,我们得到的可观察对象将next立即以timer可观察实例作为值触发。
至此,我们已经构建了自定义的Observable,它将用于retryWhen确定何时确切尝试重新订阅源Observable。
就目前而言,我看到此实现存在两个问题:
一旦我们产生的可观察结果触发了最后一个next(导致最后一次尝试重新订阅),它也会立即触发complete。除非源可观察到的返回结果非常迅速(假设最后一次重试将是成功的),否则该结果将被忽略。
这是因为一旦retryWhen就听complete从我们观察到的,它调用complete源,其仍可能将AJAX请求的过程。
如果所有重试均未成功,则源实际上会调用complete而不是更合逻辑error。
为了解决这两个问题,我认为我们得到的可观察值应该error在给最后一次重试提供一些合理的时间来尝试完成其工作后,最终触发。
这是我所说的修复程序的实现,zip最近还考虑了运算符的弃用rxjs v6:
import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";
function backoffImproved(maxTries, ms) {
return pipe(
retryWhen(attempts => {
const observableForRetries =
zip(range(1, maxTries), attempts)
.pipe(
map(([elemFromRange, elemFromAttempts]) => elemFromRange),
map(i => i * i),
switchMap(i => timer(i * ms))
);
const observableForFailure =
throwError(new Error('Could not complete AJAX request'))
.pipe(
materialize(),
delay(1000),
dematerialize()
);
return concat(observableForRetries, observableForFailure);
})
);
}
Run Code Online (Sandbox Code Playgroud)
我测试了这段代码,它似乎在所有情况下都能正常工作。我现在不愿意详细解释它。我怀疑有人甚至不会读上面的文字墙。
无论如何,非常感谢@BenjaminGruenbaum和@cartant将我设置在正确的路径上,以解决所有这些问题。
| 归档时间: |
|
| 查看次数: |
1190 次 |
| 最近记录: |