语境:
我们的网络应用程序可以同时显示不同的帮助面板。
当面板需要给定的 ID(例如help-id-1)时,我们会访问一个 API,传递该 ID,然后我们会得到所需的帮助。
现在,由于某些原因,我们可能会显示同一帮助面板两次或多次。但是,当然,如果提取时没有错误或当前正在提取,我们不想对同一个项目多次调用 API。
我们的“生产者”给我们一个冷流来检索它:
const getHelpContentById = (id: string) => fromPromise(
httpCallToGetHelpResultFromThirdLib(id)
).pipe(
catchError(error => of({ status: 'ERROR', item: null, id })),
// extracting the body of the response
map(getHelpItemFromResponse),
// wrapping the response into an object { status: 'SUCCES', item, id }
map(item => setStatusOnHelpItem(item, id)),
startWith({ status: 'LOADING', item: null, id }),
)
Run Code Online (Sandbox Code Playgroud)
我们使用包含状态的对象启动流,然后我们收到另一个具有新状态的对象,该状态为SUCCESS或ERROR。
预期的解决方案应该:
- 在第一次调用给定 ID 时从 API 中获取
- 如果在前一个订阅完成(状态LOADING)之前同一 ID 发生另一个订阅,它应该获得与第一次调用相同的流,而无需再次获取API
- 如果应用程序的一部分正在显示,并且失败,则不应关闭help-id-1流,而应关闭其中类型的值,这样,如果另一个组件尝试再次显示,因为该 ID 的最后状态是,它应再次尝试访问 API,并且两个订阅者都应收到实时更新,然后出现错误或成功next{ status: 'ERROR', item: null, id }help-id-1ERROR{ status: 'LOADING', item: null, id }
第一次尝试:
这是我想出的第一种非 RxJs 方法:(
从服务中提取的代码,它是一个类)
private helpItems: Map<
string,
{ triggerNewFetchForItem: () => void; obs: Observable<HelpItemWithStatus> }
> = new Map();
private getFromCacheOrFetchHelpItem(id: string): Observable<HelpItemWithStatus> {
let triggerNewFetchForItem$: BehaviorSubject<HelpItemWithStatus>;
const idNotInCache = !this.helpItems.has(id);
if (idNotInCache) {
triggerNewFetchForItem$ = new BehaviorSubject<HelpItemWithStatus>(null);
this.helpItems.set(id, {
triggerNewFetchForItem: () => triggerNewFetchForItem$.next(null),
obs: triggerNewFetchForItem$.pipe(
switchMap(() => getHelpContentById(id)),
shareReplay(1),
),
});
return this.helpItems.get(id).obs;
} else {
return this.helpItems.get(id).obs.pipe(
tap(item => {
if (item.status === ContentItemStatus.ERROR) {
this.helpItems.get(id).triggerNewFetchForItem();
}
})
);
}
}
public getHelpItemById(id: string): Observable<HelpItemWithStatus> {
return this.getFromCacheOrFetchHelpItem(id);
}
Run Code Online (Sandbox Code Playgroud)
我同事的尝试:
private getFromCacheOrFetchHelpItem4(id: string): Observable<HelpItemWithStatus> {
let item = this.items.get(id);
if (item && item.status !== ContentItemStatus.ERROR) {
return of(item);
}
return getNewWrappedHelpItem(this.contentfulClient, id).pipe(
tap(item => this.items.set(id, bs),
shareReplay(1)),
)
}
Run Code Online (Sandbox Code Playgroud)
问题
- 如果您订阅它一次,最终会出现错误
- 您从新组件再次订阅它
- 它会按预期进行另一次获取,但会更改引用
- API 调用成功,第二个组件被更新,第一个组件被更新一个不是
结论:
我确信有一种更好的 Rx 方法可以做到这一点,甚至可能不需要依赖“外部缓存”(此处Map)。显然最好的办法是为此有一个新的操作员:)
我在工作中也遇到了基本相同的情况。我已经考虑了一段时间,但最终还是决定放弃它。我的解决方案有点粗糙,但我会在改进时尝试更新答案。我希望得到关于更好的处理方法的反馈。
这是一个相当复杂的问题,所以我将逐步构建它。为了简单起见,我们将从不带参数的 api 方法开始。
share()只需在末尾添加运算符即可使其成为多播。
return api().pipe(share());
Run Code Online (Sandbox Code Playgroud)
只需更改share()为shareReplay(1). 该参数指示要共享的先前响应的数量。我们只想要最后一个发出的,所以我们把1.
或者,您可以使用tap运算符来保留对最后发出的值的引用,of(data)如果最后一次成功,则执行操作而不是返回流。这仅适用于您不想再次调用 api 的情况(就像 OP 所讨论的那样),但我将其保持通用,以便灵活地用于其他解决方案。
return api().pipe(shareReplay(1));
Run Code Online (Sandbox Code Playgroud)
这是一个愚蠢的。很容易获得最后一个值,甚至可以为新订阅者重新运行流。但这并不能让以前的订阅者受益。您可能会获得成功的结果,但以前的订阅者都不会收到通知。本质上,您要求的是让您的主题在有新订阅时发出新值。据我所知,这是不可能的。
我的解决方法是设置自己的主题,每次有人请求直播时我都可以触发该主题。这不是同一件事,但我认为这确实是我们想要的。我们真正想要的是某种重试的方法。如果它不是通过使用retryWhen操作符来自动化,那么我们需要一些手动方式,例如新组件加载。当加载新组件时,它们会请求流,因此可以找到。
因此,我们创建一个主题并在超时后调用 next。我更愿意使用ReplaySubjectorBehaviorSubject来避免超时,但当我这样做时,我遇到了角度变化检测问题 ( ExpressionChangedAfterItHasBeenCheckedError)。我需要更深入地研究它。
请注意,它share位于外部流上。我们想分享的是这一点,而不是内心的。另请注意,我使用的是switchMap而不是switchMapTo因为我们每次都想要一个新的内部流。
const trigger = new Subject<void>();
setTimeout(() => trigger.next());
return trigger.pipe(
switchMap(() => api()),
shareReplay(1)
);
Run Code Online (Sandbox Code Playgroud)
该catchError运算符允许您返回一个可观察值。因为我们希望这成为一条消息,所以我们就这么做了catchError(e => of(e))。问题是这会结束流。解决方法是将捕获器放在内部switchMap,以便内部流可以消失,而外部流可以继续流动。
return trigger.pipe(
switchMap(() => api().pipe(
catchError(err => of(err))
),
shareReplay(1)
);
Run Code Online (Sandbox Code Playgroud)
为此,我们将创建一个具有类型属性的通用响应包装器。可能的值为“FETCHING”、“SUCCESS”和“FAILURE”。我们将使用startWith操作符在 api 调用开始后立即发送获取通知(这就是为什么它在最后)。
return trigger.pipe(
switchMap(() => api().pipe(
map((data) => ({ state: 'SUCCESS', data })),
catchError(err => of({ state: 'FAILURE', err })),
startWith({ state: 'FETCHING' })
),
shareReplay(1)
);
Run Code Online (Sandbox Code Playgroud)
基本上,如果请求正在进行中,我们不希望调用触发器。我们可以使用标志来执行此操作,也可以使用distinct带有触发器的运算符在 api 调用解析时重置它。第二种方法很棘手,因为您在构建流时需要引用流。因此,我们将只使用一个变量,并且可以将其包装trigger.next()在 if 中或在流上放置一个过滤器。我要做一个过滤器。
private state: string;
...
return trigger.pipe(
filter(() => this.state !== 'FETCHING'),
switchMap(() => api().pipe(
map((data) => ({ state: 'SUCCESS', data })),
catchError(err => of({ state: 'FAILURE', err })),
startWith({ state: 'FETCHING' }),
tap(x => { this.state = x.state; })
),
shareReplay(1)
);
Run Code Online (Sandbox Code Playgroud)
为此,您所要做的就是不调用触发器。因此,只需将触发器的条件更改为状态未初始化或“失败”即可。
...
filter(() => this.state == null || this.state === 'FAILURE'),
...
Run Code Online (Sandbox Code Playgroud)
您基本上只需要散列参数并将其用作映射的键即可。请参阅下面的完整示例。
这是所有内容的汇总。我创建了一个辅助函数来生成 api 方法。开发人员必须提供 api 方法和参数的哈希方法,因为推断起来过于复杂。
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, switchMap, map, startWith, catchError, shareReplay, filter } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';
// posible states of the api request
export enum ApiStateType {
Fetching,
Success,
Failure
}
// wrapper for the api status messages
export interface ApiStatus<T> {
state: ApiStateType;
params: any[],
data: T
}
// information related to a stream for a unique set of parameters
interface StreamConfig<T> {
state: ApiStateType;
trigger: Subject<void>;
stream: Observable<ApiStatus<T>>;
}
export function generateCachedApi<T>(
api: (...params) => Observable<T>,
generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
const cache = new Map<string, StreamConfig<T>>();
return (...params): Observable<ApiStatus<T>> => {
const key = generateKey(...params);
let config = cache.get(key);
if (!config) {
console.log(`created new stream (${key})`);
config = <StreamConfig<T>> { trigger: new Subject<void>() };
config.stream = config.trigger.pipe(
filter(() => config.state == null || config.state === ApiStateType.Failure),
switchMap(() => {
return api(...params).pipe(
map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, params, data })),
catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, params, data })),
startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, params }),
tap(x => { config.state = x.state; })
);
}),
tap(x => { console.log('PUBLISH', x)}),
shareReplay(1),
);
cache.set(key, config);
} else {
console.log(`returned existing stream (${key})`);
}
setTimeout(() => { config.trigger.next() });
return config.stream;
}
}
Run Code Online (Sandbox Code Playgroud)
这是我一起编写的一个运行示例:https ://stackblitz.com/edit/api-cache
我确信有一种更好的 Rx 方法可以做到这一点,甚至可能不需要依赖“外部缓存”(此处的地图)。显然最好的办法是为此有一个新的操作员:)
我创建了一个cacheMap运算符来尝试做到这一点。我有一个发出 api 参数的源,cacheMap操作员将为唯一的参数集找到或创建流,并返回它的mergeMap样式。问题是每个订阅者现在都会订阅该内部可观察值。所以你必须添加一个过滤器(参见下面的替代解决方案)。
这是我想到的替代解决方案。您可以拥有一个主要流,然后通过过滤器将其提供给订阅者,而不是维护多个流。
单个流的问题是重播将应用于所有参数。因此,您要么必须使用无缓冲区的重播,要么自行管理重播。
如果您使用没有缓冲区的重播,那么它将重播所有内容FETCHING,SUCCESS这可能会导致额外的处理,尽管用户可能不会注意到。理想情况下我们会有一个replayByKey运算符,但我还没有时间去写它。所以现在我只使用地图。使用地图的问题是我们仍然向已经收到它的订阅者发出相同的值。因此我们distinctUntilChanged向实例流添加一个运算符。或者,您可以创建实例流,然后takeUntil在其上放置一个触发器,该触发器是成功过滤的实例流,并在关闭管道之前放置一个delay(0)以允许最后一个值通过管道。这将完成流,这是可以的,因为一旦成功,您就永远不会获得新的值。我选择了不同的,因为如果您想更改其要求,它可以让您获得新的值。
我们使用 amergeMap而不是 a,switchMap因为我们可以对不同参数进行并发的请求,并且我们不想取消对不同参数的请求。
这是解决方案:
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, mergeMap, map, startWith, catchError, share, filter, distinctUntilChanged } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';
// posible states of the api request
export enum ApiStateType {
Fetching,
Success,
Failure
}
// wrapper for the api status messages
export interface ApiStatus<T> {
state: ApiStateType;
key: string;
params: any[];
data: T;
}
export function generateCachedApi<T>(
api: (...params) => Observable<T>,
generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
const trigger = new Subject<any[]>();
const stateCache = new Map<string, ApiStatus<T>>();
const stream = trigger.pipe(
map<any[], [any[], string]>((params) => [ params, generateKey(...params) ]),
tap(([_, key]) => {
if (!stateCache.has(key)) {
stateCache.set(key, <ApiStatus<T>> {})
}
}),
mergeMap(([params, key]) => {
const apiStatus = stateCache.get(key);
if (apiStatus.state === ApiStateType.Fetching || apiStatus.state === ApiStateType.Success) {
return of(apiStatus);
}
return api(...params).pipe(
map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, key, params, data })),
catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, key, params, data })),
startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, key, params }),
tap(state => { stateCache.set(key, state); })
)
}),
tap(x => { console.log('PUBLISH', x)}),
share()
);
return (...params): Observable<ApiStatus<T>> => {
const key = generateKey(...params);
const instanceStream = stream.pipe(
filter((response) => response.key === key),
distinctUntilChanged()
);
setTimeout(() => { trigger.next(params) });
return instanceStream;
}
}
Run Code Online (Sandbox Code Playgroud)
让我们将我们的关注点分开,一次处理一个:
\n\n{id, status, item}具有初始值的元数据,而不是materialize()我们将创建一个高阶函数,它接受getHelpContentById()并返回元数据对象的可观察值,该对象首先发出“LOADING”,然后根据响应发出“SUCCESS”或“ERROR”:
const toMeta = fetch => id =>\n fetch(id)\n .pipe(\n map(response => ({id, status: \'SUCCESS\', item: response})),\n catchError(e => [{id, status: \'ERROR\', item: null}]),\n startWith( {id, status: \'LOADING\', item: null}),\n )\nRun Code Online (Sandbox Code Playgroud)\n\n让我们忽略缓存:
\n\n这两个逻辑位都可以作为函数传递给我们的缓存运算符:
\n\n/**\n * `predicate` returns a boolean\n * `action` returns an observable\n */\nconst cacheUntil = (predicate, action) => {\n const cache = new Map()\n\n // Use our predicate to check if we should uncache an item\n //\n const cacheUpdate = key => cachedValue => {\n if (predicate(cachedValue)) cache.delete(key)\n }\n\n // Prep an item, then cache it\n //\n const cache_set = key => {\n const out$ = action(key) // fetching it in our case\n .pipe(\n tap(cacheUpdate(key)), // see cacheUpdate above this function\n shareReplay(1) // make Observable returned from `action` \n // a Hot observable\n // so that we don\'t fetch \n // on new subscriptions\n )\n cache.set(key, out$)\n return out$\n }\n\n // Glue it all together\n //\n return pipe(\n mergeMap(key => // flatten the cached Observable returned from `action()`\n cache.has(key)\n ? cache.get(key)\n : cache_set(key)\n )\n )\n}\n\n// make a stream of help ids to fetch from api\n// that we can call `next(helpId)` on\nconst helpId$ = new Subject()\n\n// use a higher-order function to make our apiCall\n// return our metadata object of `{id, status, item}`\nconst help$ = helpId$.pipe(\n cacheUntil(hasMetaError, toMeta(getHelpContentById))\n)\nRun Code Online (Sandbox Code Playgroud)\n\n我们的第一个参数的谓词,相当不言自明:
\n\nconst hasMetaError = meta => \n meta.status === \'ERROR\'\nRun Code Online (Sandbox Code Playgroud)\n\n这样,我们就可以通过主题推送帮助 id 并获取缓存的 api 调用的可观察值:
\n\nconst helpIds$ = new Subject()\n\nconst help$ = helpId$.pipe(\n cacheUntil(hasMetaError, toMeta(apiCall$))\n)\n\n// When we need a panel...\n//\nconst addPanel(panel, helpId) => {\n helpIds$.subscribe(data => panel.display(data))\n helpIds$.next(\'help-id-999\')\n}\nRun Code Online (Sandbox Code Playgroud)\n\n可是等等!每次我们访问时,每个面板都会通过其订阅接收缓存中的每个项目next(helpId)。
那么我们来过滤一下。
\n\nconst addPanel(panel, helpId) => {\n helpIds$\n .filter(meta => meta.id === helpId)\n .subscribe(data => panel.display(data))\n helpIds$.next(\'help-id-999\')\n}\nRun Code Online (Sandbox Code Playgroud)\n\n就是这样。这是一个运行示例:\n https://stackblitz.com/edit/rxjs-cache-until
\n\n通过分离元数据部分,缓存运算符在其他情况下变得更加可重用:
\n\ntype Porridge = {celsius: number}\n\nconst makePorridge = celsius => {\n console.log("Make porridge at " + celsius + " celcius") \n return Observable.of({celsius})\n}\n\nconst justRight = (p: Porridge) => p.celsius >= 60 && p.celsius <= 70\n\nconst porridge$ = new Subject<number>()\nconst cachedPorridge$ = porridge$.pipe(\n cacheUntil(justRight, makePorridge)\n)\n\nlet temperature = 30\nconst addBear = () => {\n cachedPorridge$.subscribe(() => console.log(\'Add bear\'))\n porridge$.next(temperature += 10)\n}\n\nfor(var i = 10; i; i--) {\n addBear()\n}\n// between the 60\xcb\x9a and 70\xcb\x9a \n// `makePorridge` gets called for every new bear\n// because Goldilocks keeps eating it all\nRun Code Online (Sandbox Code Playgroud)\n