在RxJs 5中分享Angular Http网络调用结果的正确方法是什么?

Ang*_*ity 281 rxjs angular2-services rxjs5 angular

通过使用Http,我们调用一个执行网络调用并返回http observable的方法:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}
Run Code Online (Sandbox Code Playgroud)

如果我们采用这个可观察的并添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
Run Code Online (Sandbox Code Playgroud)

我们想要做的是确保这不会导致多个网络请求.

这可能看起来像是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了observable以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者.

在RxJs 5中这样做的正确方法是什么?

也就是说,这似乎工作正常:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}
Run Code Online (Sandbox Code Playgroud)

但这是在RxJs 5中这样做的惯用方式,还是我们应该做其他事情呢?

注意:根据Angular 5 new HttpClient,.map(res => res.json())所有示例中的部分现在都没用,因为现在默认采用JSON结果.

Gün*_*uer 216

缓存数据,如果可用缓存,则返回此项,否则发出HTTP请求.

import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';

@Injectable()
export class DataService {
  private url:string = 'https://cors-test.appspot.com/test';

  private data: Data;
  private observable: Observable<any>;

  constructor(private http:Http) {}

  getData() {
    if(this.data) {
      // if `data` is available just return it as `Observable`
      return Observable.of(this.data); 
    } else if(this.observable) {
      // if `this.observable` is set then the request is in progress
      // return the `Observable` for the ongoing request
      return this.observable;
    } else {
      // example header (not necessary)
      let headers = new Headers();
      headers.append('Content-Type', 'application/json');
      // create the request, store the `Observable` for subsequent subscribers
      this.observable = this.http.get(this.url, {
        headers: headers
      })
      .map(response =>  {
        // when the cached data is available we don't need the `Observable` reference anymore
        this.observable = null;

        if(response.status == 400) {
          return "FAILURE";
        } else if(response.status == 200) {
          this.data = new Data(response.json());
          return this.data;
        }
        // make it shared so more than one subscriber can get the result
      })
      .share();
      return this.observable;
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

Plunker的例子

这个artile https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html是一个很好的解释如何缓存shareReplay.

  • @Günter,谢谢您的代码,它的工作原理.但是,我试图理解为什么要分别跟踪Data和Observable.难道你不能通过像这样缓存Observable <Data>来有效地达到同样的效果吗?`if(this.observable){return this.observable; } else {this.observable = this.http.get(url).map(res => res.json().data); 返回this.observable; }` (8认同)
  • `do()`与`map()`相反,不会修改事件.你也可以使用`map()`但是你必须确保在回调结束时返回正确的值. (3认同)
  • 如果执行`.subscribe()`的调用站点不需要值,那么你可以这样做,因为它可能只是`null`(取决于`this.extractData`返回的内容),但恕我直言这不是很好地表达代码的意图. (3认同)
  • @HarleenKaur这是一个收到的JSON被反序列化的类,以获得强大的类型检查和自动完成.没有必要使用它,但它很常见. (3认同)
  • 当`this.extraData`结束时,`extraData(){if(foo){doSomething();}}`否则返回最后一个表达式的结果,这可能不是你想要的. (2认同)
  • 如果我有多个组件请求相同的数据,并且我知道它不会改变,那么使用该方法是否有意义,或者是否有其他方法可以这样做?我已经设法使这个示例正常工作并且工作正常,我唯一的问题是我的用例是否需要此设置。 (2认同)
  • 万分感谢。我工作了,但我不得不改变 return Observable.of(this.data); 返回(this.data);。“导入‘of’运算符的正确方法”是什么意思? (2认同)
  • 我认为这是演示和学习实际问题和所涉及步骤的好方法,但是“shareReplay”通常更方便https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with -rxjs.html (2认同)

Ang*_*ity 42

根据@Cristian的建议,这是一种适用于HTTP observable的方法,它只发出一次然后完成:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}
Run Code Online (Sandbox Code Playgroud)

  • 小澄清...虽然严格来说,由 `publishLast().refCount()` 共享的源 observable 无法取消,但一旦取消对 `refCount` 返回的 observable 的所有订阅,最终效果是源 observable 将被取消取消订阅,如果“正在飞行”则取消订阅 (2认同)

小智 36

更新:Ben Lesh说5.2.0之后的下一个小版本,你将能够只调用shareReplay()来真正缓存.

先前.....

首先,不要使用share()或publishReplay(1).refCount(),它们是相同的,并且它的问题在于,只有在observable处于活动状态时才建立连接,如果在完成后连接它,它再次创建一个新的可观察,翻译,而不是真正的缓存.

Birowski在上面提供了正确的解决方案,即使用ReplaySubject.在我们的案例1中,ReplaySubject将缓存你给它的值(bufferSize).一旦refCount达到零并且你建立一个新的连接,这将是一个新的observable,这是一个新的连接,这是缓存的正确行为.

这是一个可重用的功能

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}
Run Code Online (Sandbox Code Playgroud)

这是如何使用它

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}
Run Code Online (Sandbox Code Playgroud)

下面是可缓存功能的更高级版本.这个允许有自己的查找表+提供自定义查找表的能力.这样,您不必像上面的示例中那样检查this._cache.另请注意,不是将observable作为第一个参数传递,而是传递一个返回observable的函数,这是因为Angular的Http立即执行,所以通过返回一个惰性执行函数,我们可以决定不调用它,如果它已经在我们的缓存.

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}
Run Code Online (Sandbox Code Playgroud)

用法:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
Run Code Online (Sandbox Code Playgroud)


Arl*_*rlo 27

rxjs 5.4.0有一个新的shareReplay方法.

作者明确表示"理想的处理诸如缓存AJAX结果之类的东西"

rxjs PR#2443 feat(shareReplay):添加shareReplay变体publishReplay

shareReplay返回一个observable,它是ReplaySubject上的多播源.该重放主题在来自源的错误时被回收,但不是在源完成时回收.这使得shareReplay非常适合处理诸如缓存AJAX结果之类的事情,因为它是可重试的.然而,它的重复行为与分享的不同之处在于它不会重复源观察,而是重复源可观察的值.

  • 我尝试将.shareReplay(1,10000)添加到一个observable,但我没有注意到任何缓存或行为更改.有可用的工作示例吗? (4认同)

小智 23

根据这篇文章

事实证明,我们可以通过添加publishReplay(1)和refCount轻松地将缓存添加到observable.

所以 内部if语句只是附加

.publishReplay(1)
.refCount();
Run Code Online (Sandbox Code Playgroud)

.map(...)


Bir*_*sky 9

我主演了这个问题,但我会试着去试试这个问题.

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}
Run Code Online (Sandbox Code Playgroud)

这是证明 :)

只有一个外卖: getCustomer().subscribe(customer$)

我们没有订阅api响应getCustomer(),我们正在订阅一个可观察的ReplaySubject,它也可以订阅一个不同的Observable并且(这很重要)保存它的最后一个值并将其重新发布到它的任何一个(ReplaySubject的)订阅者.


Arl*_*rlo 6

您选择的实现将取决于您是否希望 unsubscribe() 取消您的 HTTP 请求。

无论如何,TypeScript 装饰器是一种标准化行为的好方法。这是我写的一篇:

  @CacheObservableArgsKey
  getMyThing(id: string): Observable<any> {
    return this.http.get('things/'+id);
  }
Run Code Online (Sandbox Code Playgroud)

装饰器定义:

/**
 * Decorator that replays and connects to the Observable returned from the function.
 * Caches the result using all arguments to form a key.
 * @param target
 * @param name
 * @param descriptor
 * @returns {PropertyDescriptor}
 */
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
  const originalFunc = descriptor.value;
  const cacheMap = new Map<string, any>();
  descriptor.value = function(this: any, ...args: any[]): any {
    const key = args.join('::');

    let returnValue = cacheMap.get(key);
    if (returnValue !== undefined) {
      console.log(`${name} cache-hit ${key}`, returnValue);
      return returnValue;
    }

    returnValue = originalFunc.apply(this, args);
    console.log(`${name} cache-miss ${key} new`, returnValue);
    if (returnValue instanceof Observable) {
      returnValue = returnValue.publishReplay(1);
      returnValue.connect();
    }
    else {
      console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
    }
    cacheMap.set(key, returnValue);
    return returnValue;
  };

  return descriptor;
}
Run Code Online (Sandbox Code Playgroud)


all*_*kim 5

我找到了一种将http get结果存储到sessionStorage并将其用于会话的方法,这样它就永远不会再次调用服务器。

我用它来调用github API以避免使用限制。

@Injectable()
export class HttpCache {
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    let cached: any;
    if (cached === sessionStorage.getItem(url)) {
      return Observable.of(JSON.parse(cached));
    } else {
      return this.http.get(url)
        .map(resp => {
          sessionStorage.setItem(url, resp.text());
          return resp.json();
        });
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

仅供参考,sessionStorage限制为5M(或4.75M)。因此,不应将其像这样用于大量数据。

------编辑-------------
如果要使用F5刷新数据,则使用内存数据代替sessionStorage;

@Injectable()
export class HttpCache {
  cached: any = {};  // this will store data
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    if (this.cached[url]) {
      return Observable.of(this.cached[url]));
    } else {
      return this.http.get(url)
        .map(resp => {
          this.cached[url] = resp.text();
          return resp.json();
        });
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

  • 但这会给用户带来意想不到的行为。当用户点击浏览器的 F5 或刷新按钮时,他期望来自服务器的新鲜数据。但实际上他从 localStorage 获取过时的数据。错误报告、支持票证等传入...正如名称“sessionStorage”所示,我只会将其用于**预计**在整个会话中保持一致的数据。 (2认同)

Obj*_*eTC 5

使用 Rxjs Observer/Observable + 缓存 + 订阅的可缓存 HTTP 响应数据

请参阅下面的代码

*免责声明:我是 rxjs 的新手,所以请记住,我可能会滥用 observable/observer 方法。我的解决方案纯粹是我找到的其他解决方案的集合,并且是未能找到一个简单的、有据可查的解决方案的结果。因此,我提供了我的完整代码解决方案(正如我希望找到的那样),希望它能帮助其他人。

*注意,这种方法松散地基于 GoogleFirebaseObservables。不幸的是,我缺乏适当的经验/时间来复制他们在幕后所做的事情。但以下是提供对某些可缓存数据的异步访问的简单方法。

情况:“产品列表”组件的任务是显示产品列表。该站点是一个单页网络应用程序,带有一些菜单按钮,可以“过滤”页面上显示的产品。

解决方案:组件“订阅”一个服务方法。service 方法返回一个产品对象数组,组件通过订阅回调访问这些对象。服务方法将其活动包装在新创建的观察者中并返回观察者。在这个观察者内部,它搜索缓存的数据并将其传递回订阅者(组件)并返回。否则,它会发出一个 http 调用来检索数据,订阅响应,您可以在其中处理该数据(例如,将数据映射到您自己的模型),然后将数据传回给订阅者。

编码

产品列表.component.ts

import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';

@Component({
  selector: 'app-product-list',
  templateUrl: './product-list.component.html',
  styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
  products: Product[];

  constructor(
    private productService: ProductService
  ) { }

  ngOnInit() {
    console.log('product-list init...');
    this.productService.getProducts().subscribe(products => {
      console.log('product-list received updated products');
      this.products = products;
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

产品.服务.ts

import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';

@Injectable()
export class ProductService {
  products: Product[];

  constructor(
    private http:Http
  ) {
    console.log('product service init.  calling http to get products...');

  }

  getProducts():Observable<Product[]>{
    //wrap getProducts around an Observable to make it async.
    let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
      //return products if it was previously fetched
      if(this.products){
        console.log('## returning existing products');
        observer.next(this.products);
        return observer.complete();

      }
      //Fetch products from REST API
      console.log('** products do not yet exist; fetching from rest api...');
      let headers = new Headers();
      this.http.get('http://localhost:3000/products/',  {headers: headers})
      .map(res => res.json()).subscribe((response:ProductResponse) => {
        console.log('productResponse: ', response);
        let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
        this.products = productlist;
        observer.next(productlist);
      });
    }); 
    return productsObservable$;
  }
}
Run Code Online (Sandbox Code Playgroud)

product.ts(模型)

export interface ProductResponse {
  success: boolean;
  msg: string;
  products: Product[];
}

export class Product {
  product_id: number;
  sku: string;
  product_title: string;
  ..etc...

  constructor(product_id: number,
    sku: string,
    product_title: string,
    ...etc...
  ){
    //typescript will not autoassign the formal parameters to related properties for exported classes.
    this.product_id = product_id;
    this.sku = sku;
    this.product_title = product_title;
    ...etc...
  }



  //Class method to convert products within http response to pure array of Product objects.
  //Caller: product.service:getProducts()
  static fromJsonList(products:any): Product[] {
    let mappedArray = products.map(Product.fromJson);
    return mappedArray;
  }

  //add more parameters depending on your database entries and constructor
  static fromJson({ 
      product_id,
      sku,
      product_title,
      ...etc...
  }): Product {
    return new Product(
      product_id,
      sku,
      product_title,
      ...etc...
    );
  }
}
Run Code Online (Sandbox Code Playgroud)

这是我在 Chrome 中加载页面时看到的输出示例。请注意,在初始加载时,产品是从 http 获取的(调用我的节点休息服务,该服务在端口 3000 上本地运行)。当我然后单击导航到产品的“过滤”视图时,可以在缓存中找到这些产品。

我的 Chrome 日志(控制台):

core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init.  calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse:  {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products
Run Code Online (Sandbox Code Playgroud)

...[点击菜单按钮过滤产品]...

app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products
Run Code Online (Sandbox Code Playgroud)

结论:这是我发现(到目前为止)实现可缓存 http 响应数据的最简单方法。在我的 angular 应用程序中,每次我导航到产品的不同视图时,产品列表组件都会重新加载。ProductService 似乎是一个共享实例,因此在导航过程中会保留 ProductService 中 'products: Product[]' 的本地缓存,随后对“GetProducts()”的调用返回缓存值。最后一个注意事项,我已经阅读了有关如何在完成后关闭 observables/subscriptions 以防止“内存泄漏”的评论。我没有在这里包括这个,但这是需要记住的。

  • 如果 observable 仅打算接收一次数据,则替代方法是 .first() 或 .take(1) 。应在“ngOnDestroy()”中取消订阅所有其他 observable 的“无限流”,如果不这样做,则最终可能会出现重复的“observable”回调。/sf/ask/1960544421/ (3认同)
  • 注意 - 我已经找到了一个更强大的解决方案,涉及 RxJS BehaviorSubjects,它简化了代码并大大减少了“开销”。在 products.service.ts 中, 1. import { BehaviorSubject } from 'rxjs'; 2. 将 'products:Product[]' 改为 'product$: BehaviorSubject&lt;Product[]&gt; = new BehaviorSubject&lt;Product[]&gt;([]);' 3. 现在您可以简单地调用 http 而不返回任何内容。http_getProducts(){this.http.get(...).map(res =&gt; res.json()).subscribe(products =&gt; this.product$.next(products))}; (2认同)
  • 局部变量'product$' 是一个behaviorSubject,它将EMIT 和STORE 最新产品(来自第3 部分中的product$.next(..) 调用)。现在在你的组件中,像往常一样注入服务。您可以使用 productService.product$.value 获得最近分配的 product$ 值。或者,如果您想在 product$ 收到新值时执行操作(即,在第 3 部分中调用 product$.next(...) 函数),请订阅 product$。 (2认同)
  • 例如,在 products.component.ts... this.productService.product$ .takeUntil(this.ngUnsubscribe) .subscribe((products) =&gt; {this.category); 让filteredProducts = this.productService.getProductsByCategory(this.category); this.products = 过滤产品;}); (2认同)
  • 关于从 observable 取消订阅的重要说明:“.takeUntil(this.ngUnsubscribe)”。请参阅此堆栈溢出问题/答案,它似乎显示了取消订阅事件的“事实上的”推荐方法:/sf/ask/2660583411/ -订阅 (2认同)

Igo*_*gor 5

rxjs版本5.4.0(2017-05-09)添加了对shareReplay的支持。

为什么要使用shareReplay?

如果您不希望在多个订户之间执行副作用或繁重的计算,则通常需要使用shareReplay。在您知道您的流的后期订阅者需要访问以前发出的值的情况下,这也可能很有价值。重播订阅值的这种能力是share和shareReplay的与众不同之处。

您可以轻松地修改角度服务以使用此服务,并返回具有可缓存结果的observable,该结果将仅使http调用一次(假设第一次调用成功)。

Angular服务示例

这是使用的非常简单的客户服务shareReplay

客户服务

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class CustomerService {

    private readonly _getCustomers: Observable<ICustomer[]>;

    constructor(private readonly http: HttpClient) {
        this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
    }

    getCustomers() : Observable<ICustomer[]> {
        return this._getCustomers;
    }
}

export interface ICustomer {
  /* ICustomer interface fields defined here */
}
Run Code Online (Sandbox Code Playgroud)

请注意,可以将构造函数中的赋值移至该方法,getCustomers但是由于返回的可观察对象HttpClient是“冷”的,因此在构造函数中执行此操作是可以接受的,因为http调用只会在第一次调用时进行subscribe

此处还假设初始返回的数据在应用程序实例的生存期内不会过时。


Bra*_*don -5

您是否尝试过运行已有的代码?

因为您是根据 产生的 Promise 构建 Observable getJSON(),所以网络请求是在任何人订阅之前发出的。由此产生的承诺是由所有订阅者共享的。

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...
Run Code Online (Sandbox Code Playgroud)