Ang*_*ity 281 rxjs angular2-services rxjs5 angular
通过使用Http,我们调用一个执行网络调用并返回http observable的方法:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
Run Code Online (Sandbox Code Playgroud)
如果我们采用这个可观察的并添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
Run Code Online (Sandbox Code Playgroud)
我们想要做的是确保这不会导致多个网络请求.
这可能看起来像是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了observable以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者.
在RxJs 5中这样做的正确方法是什么?
也就是说,这似乎工作正常:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
Run Code Online (Sandbox Code Playgroud)
但这是在RxJs 5中这样做的惯用方式,还是我们应该做其他事情呢?
注意:根据Angular 5 new HttpClient,.map(res => res.json())所有示例中的部分现在都没用,因为现在默认采用JSON结果.
Gün*_*uer 216
缓存数据,如果可用缓存,则返回此项,否则发出HTTP请求.
import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';
@Injectable()
export class DataService {
private url:string = 'https://cors-test.appspot.com/test';
private data: Data;
private observable: Observable<any>;
constructor(private http:Http) {}
getData() {
if(this.data) {
// if `data` is available just return it as `Observable`
return Observable.of(this.data);
} else if(this.observable) {
// if `this.observable` is set then the request is in progress
// return the `Observable` for the ongoing request
return this.observable;
} else {
// example header (not necessary)
let headers = new Headers();
headers.append('Content-Type', 'application/json');
// create the request, store the `Observable` for subsequent subscribers
this.observable = this.http.get(this.url, {
headers: headers
})
.map(response => {
// when the cached data is available we don't need the `Observable` reference anymore
this.observable = null;
if(response.status == 400) {
return "FAILURE";
} else if(response.status == 200) {
this.data = new Data(response.json());
return this.data;
}
// make it shared so more than one subscriber can get the result
})
.share();
return this.observable;
}
}
}
Run Code Online (Sandbox Code Playgroud)
这个artile https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html是一个很好的解释如何缓存shareReplay.
Ang*_*ity 42
根据@Cristian的建议,这是一种适用于HTTP observable的方法,它只发出一次然后完成:
getCustomer() {
return this.http.get('/someUrl')
.map(res => res.json()).publishLast().refCount();
}
Run Code Online (Sandbox Code Playgroud)
小智 36
更新:Ben Lesh说5.2.0之后的下一个小版本,你将能够只调用shareReplay()来真正缓存.
先前.....
首先,不要使用share()或publishReplay(1).refCount(),它们是相同的,并且它的问题在于,只有在observable处于活动状态时才建立连接,如果在完成后连接它,它再次创建一个新的可观察,翻译,而不是真正的缓存.
Birowski在上面提供了正确的解决方案,即使用ReplaySubject.在我们的案例1中,ReplaySubject将缓存你给它的值(bufferSize).一旦refCount达到零并且你建立一个新的连接,这将是一个新的observable,这是一个新的连接,这是缓存的正确行为.
这是一个可重用的功能
export function cacheable<T>(o: Observable<T>): Observable<T> {
let replay = new ReplaySubject<T>(1);
o.subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
return replay.asObservable();
}
Run Code Online (Sandbox Code Playgroud)
这是如何使用它
import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';
@Injectable()
export class SettingsService {
_cache: Observable<any>;
constructor(private _http: Http, ) { }
refresh = () => {
if (this._cache) {
return this._cache;
}
return this._cache = cacheable<any>(this._http.get('YOUR URL'));
}
}
Run Code Online (Sandbox Code Playgroud)
下面是可缓存功能的更高级版本.这个允许有自己的查找表+提供自定义查找表的能力.这样,您不必像上面的示例中那样检查this._cache.另请注意,不是将observable作为第一个参数传递,而是传递一个返回observable的函数,这是因为Angular的Http立即执行,所以通过返回一个惰性执行函数,我们可以决定不调用它,如果它已经在我们的缓存.
let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
if (!!key && (customCache || cacheableCache)[key]) {
return (customCache || cacheableCache)[key] as Observable<T>;
}
let replay = new ReplaySubject<T>(1);
returnObservable().subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
let observable = replay.asObservable();
if (!!key) {
if (!!customCache) {
customCache[key] = observable;
} else {
cacheableCache[key] = observable;
}
}
return observable;
}
Run Code Online (Sandbox Code Playgroud)
用法:
getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
Run Code Online (Sandbox Code Playgroud)
Arl*_*rlo 27
rxjs 5.4.0有一个新的shareReplay方法.
作者明确表示"理想的处理诸如缓存AJAX结果之类的东西"
rxjs PR#2443 feat(shareReplay):添加shareReplay变体publishReplay
shareReplay返回一个observable,它是ReplaySubject上的多播源.该重放主题在来自源的错误时被回收,但不是在源完成时回收.这使得shareReplay非常适合处理诸如缓存AJAX结果之类的事情,因为它是可重试的.然而,它的重复行为与分享的不同之处在于它不会重复源观察,而是重复源可观察的值.
小智 23
根据这篇文章
事实证明,我们可以通过添加publishReplay(1)和refCount轻松地将缓存添加到observable.
所以 内部if语句只是附加
.publishReplay(1)
.refCount();
Run Code Online (Sandbox Code Playgroud)
至 .map(...)
我主演了这个问题,但我会试着去试试这个问题.
//this will be the shared observable that
//anyone can subscribe to, get the value,
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);
getCustomer().subscribe(customer$);
//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));
//here's the second subscriber
setTimeout(() => {
customer$.subscribe(val => console.log('subscriber 2: ' + val));
}, 1000);
function getCustomer() {
return new Rx.Observable(observer => {
console.log('api request');
setTimeout(() => {
console.log('api response');
observer.next('customer object');
observer.complete();
}, 500);
});
}
Run Code Online (Sandbox Code Playgroud)
这是证明 :)
只有一个外卖: getCustomer().subscribe(customer$)
我们没有订阅api响应getCustomer(),我们正在订阅一个可观察的ReplaySubject,它也可以订阅一个不同的Observable并且(这很重要)保存它的最后一个值并将其重新发布到它的任何一个(ReplaySubject的)订阅者.
您选择的实现将取决于您是否希望 unsubscribe() 取消您的 HTTP 请求。
无论如何,TypeScript 装饰器是一种标准化行为的好方法。这是我写的一篇:
@CacheObservableArgsKey
getMyThing(id: string): Observable<any> {
return this.http.get('things/'+id);
}
Run Code Online (Sandbox Code Playgroud)
装饰器定义:
/**
* Decorator that replays and connects to the Observable returned from the function.
* Caches the result using all arguments to form a key.
* @param target
* @param name
* @param descriptor
* @returns {PropertyDescriptor}
*/
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
const originalFunc = descriptor.value;
const cacheMap = new Map<string, any>();
descriptor.value = function(this: any, ...args: any[]): any {
const key = args.join('::');
let returnValue = cacheMap.get(key);
if (returnValue !== undefined) {
console.log(`${name} cache-hit ${key}`, returnValue);
return returnValue;
}
returnValue = originalFunc.apply(this, args);
console.log(`${name} cache-miss ${key} new`, returnValue);
if (returnValue instanceof Observable) {
returnValue = returnValue.publishReplay(1);
returnValue.connect();
}
else {
console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
}
cacheMap.set(key, returnValue);
return returnValue;
};
return descriptor;
}
Run Code Online (Sandbox Code Playgroud)
我找到了一种将http get结果存储到sessionStorage并将其用于会话的方法,这样它就永远不会再次调用服务器。
我用它来调用github API以避免使用限制。
@Injectable()
export class HttpCache {
constructor(private http: Http) {}
get(url: string): Observable<any> {
let cached: any;
if (cached === sessionStorage.getItem(url)) {
return Observable.of(JSON.parse(cached));
} else {
return this.http.get(url)
.map(resp => {
sessionStorage.setItem(url, resp.text());
return resp.json();
});
}
}
}
Run Code Online (Sandbox Code Playgroud)
仅供参考,sessionStorage限制为5M(或4.75M)。因此,不应将其像这样用于大量数据。
------编辑-------------
如果要使用F5刷新数据,则使用内存数据代替sessionStorage;
@Injectable()
export class HttpCache {
cached: any = {}; // this will store data
constructor(private http: Http) {}
get(url: string): Observable<any> {
if (this.cached[url]) {
return Observable.of(this.cached[url]));
} else {
return this.http.get(url)
.map(resp => {
this.cached[url] = resp.text();
return resp.json();
});
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用 Rxjs Observer/Observable + 缓存 + 订阅的可缓存 HTTP 响应数据
请参阅下面的代码
*免责声明:我是 rxjs 的新手,所以请记住,我可能会滥用 observable/observer 方法。我的解决方案纯粹是我找到的其他解决方案的集合,并且是未能找到一个简单的、有据可查的解决方案的结果。因此,我提供了我的完整代码解决方案(正如我希望找到的那样),希望它能帮助其他人。
*注意,这种方法松散地基于 GoogleFirebaseObservables。不幸的是,我缺乏适当的经验/时间来复制他们在幕后所做的事情。但以下是提供对某些可缓存数据的异步访问的简单方法。
情况:“产品列表”组件的任务是显示产品列表。该站点是一个单页网络应用程序,带有一些菜单按钮,可以“过滤”页面上显示的产品。
解决方案:组件“订阅”一个服务方法。service 方法返回一个产品对象数组,组件通过订阅回调访问这些对象。服务方法将其活动包装在新创建的观察者中并返回观察者。在这个观察者内部,它搜索缓存的数据并将其传递回订阅者(组件)并返回。否则,它会发出一个 http 调用来检索数据,订阅响应,您可以在其中处理该数据(例如,将数据映射到您自己的模型),然后将数据传回给订阅者。
编码
产品列表.component.ts
import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';
@Component({
selector: 'app-product-list',
templateUrl: './product-list.component.html',
styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
products: Product[];
constructor(
private productService: ProductService
) { }
ngOnInit() {
console.log('product-list init...');
this.productService.getProducts().subscribe(products => {
console.log('product-list received updated products');
this.products = products;
});
}
}
Run Code Online (Sandbox Code Playgroud)
产品.服务.ts
import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';
@Injectable()
export class ProductService {
products: Product[];
constructor(
private http:Http
) {
console.log('product service init. calling http to get products...');
}
getProducts():Observable<Product[]>{
//wrap getProducts around an Observable to make it async.
let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
//return products if it was previously fetched
if(this.products){
console.log('## returning existing products');
observer.next(this.products);
return observer.complete();
}
//Fetch products from REST API
console.log('** products do not yet exist; fetching from rest api...');
let headers = new Headers();
this.http.get('http://localhost:3000/products/', {headers: headers})
.map(res => res.json()).subscribe((response:ProductResponse) => {
console.log('productResponse: ', response);
let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
this.products = productlist;
observer.next(productlist);
});
});
return productsObservable$;
}
}
Run Code Online (Sandbox Code Playgroud)
product.ts(模型)
export interface ProductResponse {
success: boolean;
msg: string;
products: Product[];
}
export class Product {
product_id: number;
sku: string;
product_title: string;
..etc...
constructor(product_id: number,
sku: string,
product_title: string,
...etc...
){
//typescript will not autoassign the formal parameters to related properties for exported classes.
this.product_id = product_id;
this.sku = sku;
this.product_title = product_title;
...etc...
}
//Class method to convert products within http response to pure array of Product objects.
//Caller: product.service:getProducts()
static fromJsonList(products:any): Product[] {
let mappedArray = products.map(Product.fromJson);
return mappedArray;
}
//add more parameters depending on your database entries and constructor
static fromJson({
product_id,
sku,
product_title,
...etc...
}): Product {
return new Product(
product_id,
sku,
product_title,
...etc...
);
}
}
Run Code Online (Sandbox Code Playgroud)
这是我在 Chrome 中加载页面时看到的输出示例。请注意,在初始加载时,产品是从 http 获取的(调用我的节点休息服务,该服务在端口 3000 上本地运行)。当我然后单击导航到产品的“过滤”视图时,可以在缓存中找到这些产品。
我的 Chrome 日志(控制台):
core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init. calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse: {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products
Run Code Online (Sandbox Code Playgroud)
...[点击菜单按钮过滤产品]...
app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products
Run Code Online (Sandbox Code Playgroud)
结论:这是我发现(到目前为止)实现可缓存 http 响应数据的最简单方法。在我的 angular 应用程序中,每次我导航到产品的不同视图时,产品列表组件都会重新加载。ProductService 似乎是一个共享实例,因此在导航过程中会保留 ProductService 中 'products: Product[]' 的本地缓存,随后对“GetProducts()”的调用返回缓存值。最后一个注意事项,我已经阅读了有关如何在完成后关闭 observables/subscriptions 以防止“内存泄漏”的评论。我没有在这里包括这个,但这是需要记住的。
rxjs版本5.4.0(2017-05-09)添加了对shareReplay的支持。
为什么要使用shareReplay?
如果您不希望在多个订户之间执行副作用或繁重的计算,则通常需要使用shareReplay。在您知道您的流的后期订阅者需要访问以前发出的值的情况下,这也可能很有价值。重播订阅值的这种能力是share和shareReplay的与众不同之处。
您可以轻松地修改角度服务以使用此服务,并返回具有可缓存结果的observable,该结果将仅使http调用一次(假设第一次调用成功)。
这是使用的非常简单的客户服务shareReplay。
客户服务
import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
@Injectable()
export class CustomerService {
private readonly _getCustomers: Observable<ICustomer[]>;
constructor(private readonly http: HttpClient) {
this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
}
getCustomers() : Observable<ICustomer[]> {
return this._getCustomers;
}
}
export interface ICustomer {
/* ICustomer interface fields defined here */
}
Run Code Online (Sandbox Code Playgroud)
请注意,可以将构造函数中的赋值移至该方法,getCustomers但是由于返回的可观察对象HttpClient是“冷”的,因此在构造函数中执行此操作是可以接受的,因为http调用只会在第一次调用时进行subscribe。
此处还假设初始返回的数据在应用程序实例的生存期内不会过时。
Bra*_*don -5
您是否尝试过运行已有的代码?
因为您是根据 产生的 Promise 构建 Observable getJSON(),所以网络请求是在任何人订阅之前发出的。由此产生的承诺是由所有订阅者共享的。
var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
80039 次 |
| 最近记录: |