最短的代码缓存Rxjs http请求而不完整?

Ste*_*lis 12 javascript rxjs angular

我正在尝试创建一个满足以下要求的可观察流:

  1. 在订阅时加载来自存储的数据
  2. 如果数据尚未过期,则返回存储值的可观察值
  3. 如果数据过期,则返回一个HTTP请求observable,该请求使用刷新令牌获取新值存储它
    • 如果在请求完成之前再次访问此代码,则返回相同的请求observable
    • 如果在上一个请求完成后使用此代码或使用其他刷新令牌,则启动新请求

我知道如何执行步骤(3)有很多不同的答案,但是当我试图一起执行这些步骤时,我正在寻找关于我提出的解决方案是否最简洁的指导可以(我怀疑!).

这是一个展示我当前方法的示例:

var cachedRequestToken;
var cachedRequest;

function getOrUpdateValue() {
    return loadFromStorage()
      .flatMap(data => {
        // data doesn't exist, shortcut out
        if (!data || !data.refreshtoken) 
            return Rx.Observable.empty();

        // data still valid, return the existing value
        if (data.expires > new Date().getTime())
            return Rx.Observable.return(data.value);

        // if the refresh token is different or the previous request is 
        // complete, start a new request, otherwise return the cached request
        if (!cachedRequest || cachedRequestToken !== data.refreshtoken) { 
            cachedRequestToken = data.refreshtoken;

            var pretendHttpBody = {
                value: Math.random(),
                refreshToken: Math.random(),
                expires: new Date().getTime() + (10 * 60 * 1000) // set by server, expires in ten minutes
            };

            cachedRequest = Rx.Observable.create(ob => { 
                // this would really be a http request that exchanges
                // the one use refreshtoken for new data, then saves it
                // to storage for later use before passing on the value

                window.setTimeout(() => { // emulate slow response   
                    saveToStorage(pretendHttpBody);
                    ob.next(pretendHttpBody.value);                 
                    ob.completed(); 
                    cachedRequest = null; // clear the request now we're complete
                }, 2500);
          });
        }

        return cachedRequest;
    });
}

function loadFromStorage() {
    return Rx.Observable.create(ob => {
        var storedData = {  // loading from storage goes here
            value: 15,      // wrapped in observable to delay loading until subscribed
            refreshtoken: 63, // other process may have updated this between requests
            expires: new Date().getTime() - (60 * 1000) // pretend to have already expired
        };

        ob.next(storedData);
        ob.completed();
    })
}

function saveToStorage(data) {
    // save goes here
}

// first request
getOrUpdateValue().subscribe(function(v) { console.log('sub1: ' + v); }); 

// second request, can occur before or after first request finishes
window.setTimeout(
    () => getOrUpdateValue().subscribe(function(v) { console.log('sub2: ' + v); }),
    1500); 
Run Code Online (Sandbox Code Playgroud)

Mei*_*eir 5

首先,看看一个工作的jsbin示例.

解决方案与您的初始代码有点不同,我想解释原因.需要继续返回本地存储,保存,保存标志(缓存和令牌)并不适合我的反应性,功能性方法.我给出的解决方案的核心是:

var data$ = new Rx.BehaviorSubject(storageMock);
var request$ = new Rx.Subject();
request$.flatMapFirst(loadFromServer).share().startWith(storageMock).subscribe(data$);
data$.subscribe(saveToStorage);

function getOrUpdateValue() {
    return data$.take(1)
      .filter(data => (data && data.refreshtoken))
      .switchMap(data => (data.expires > new Date().getTime() 
                    ? data$.take(1)
                    : (console.log('expired ...'), request$.onNext(true) ,data$.skip(1).take(1))));
}
Run Code Online (Sandbox Code Playgroud)

关键是data$保存最新数据并始终保持最新状态,通过执行操作即可轻松访问data$.take(1).在take(1)确保您的订阅得到一个单一的价值观和终止是很重要的(因为你试图在一个程序性的工作,而不是功能性的,方式).如果没有take(1)您的订阅将保持活动状态,您将拥有多个处理程序,即您将在代码中处理未来的更新,该代码仅适用于当前更新.

另外,我还有一个request$主题,可以从服务器开始获取新数据.该功能的工作原理如下:

  1. 过滤器确保如果您的数据为空或没有令牌,则没有任何内容通过,类似于return Rx.Observable.empty()您的数据.
  2. 如果数据是最新的,则返回data$.take(1)哪个是您可以订阅的单个元素序列.
  3. 如果没有,则需要刷新.为此,它会触发request$.onNext(true)并返回data$.skip(1).take(1).这skip(1)是为了避免当前的,过时的价值.

为了简洁,我用了(console.log('expired ...'), request$.onNext(true) ,data$.skip(1).take(1))).这可能看起来有点神秘.它使用js逗号分隔语法,这在minifiers/uglifiers中很常见.它执行所有语句并返回最后一个语句的结果.如果你想要一个更易读的代码,你可以像这样重写它:

.switchMap(data => {
  if(data.expires > new Date().getTime()){ 
    return data$.take(1);
  } else {
    console.log('expired ...');
    request$.onNext(true);
    return data$.skip(1).take(1);
  }
});
Run Code Online (Sandbox Code Playgroud)

最后一部分是使用flatMapFirst.这可确保在请求正在进行时,将删除所有后续请求.您可以在控制台打印输出中看到它的工作原理."从服务器加载"被打印多次,但实际的序列只被调用一次,你可以从打印输出中获得一次"从服务器完成加载".对于原始的refreshtoken标记检查,这是一个更具反应性的解决方案.

虽然我不需要保存的数据,但它会被保存,因为您提到您可能希望在将来的会话中阅读它.

关于rxjs的一些提示:

  1. setTimeout你可以简单地做,而不是使用可能导致许多问题的问题Rx.Observable.timer(time_out_value).subscribe(...).

  2. 创建观察到的是繁琐的(你甚至不得不打电话next(...)complete()).你有一个更清洁的方法来使用它Rx.Subject.请注意,您有此类的规范,BehaviorSubjectReplaySubject.这些课程值得了解,可以提供很多帮助.

最后一点.这是一个非常挑战:-)我不熟悉您的服务器端代码和设计考虑因素,但抑制呼叫的需要让我觉得不舒服.除非有一个与你的后端相关的非常好的理由,否则我的自然方法是使用flatMap并让最后一个请求'win',即删除之前未终止的调用并设置值.

代码是基于rxjs 4(所以它可以在jsbin中运行),如果你使用angular2(因此rxjs 5),你需要调整它.请查看迁移指南.

================回答史蒂夫的其他问题(在下面的评论中)=======

一篇文章,我可以推荐.它的标题说明了一切:-)

至于程序与功能方法,我会在服务中添加另一个变量:

let token$ = data$.pluck('refreshtoken');
Run Code Online (Sandbox Code Playgroud)

然后在需要时消耗它.

我的一般方法是首先映射我的数据流和关系,然后像一个好的"键盘管道工"(就像我们都是),构建管道.我的服务的顶级草案将是(跳过angular2手续和提供者为简洁):

class UserService {
  data$: <as above>;
  token$: data$.pluck('refreshtoken');
  private request$: <as above>;

  refresh(){
    request.onNext(true);
  }
}
Run Code Online (Sandbox Code Playgroud)

您可能需要进行一些检查,以确保pluck不会失败.

然后,需要数据或令牌的每个组件都可以直接访问它.

现在假设您有一项服务需要对数据或令牌进行更改:

class SomeService {
  constructor(private userSvc: UserService){
    this.userSvc.token$.subscribe(() => this.doMyUpdates());
  }
}
Run Code Online (Sandbox Code Playgroud)

如果您需要合成数据,意思是使用数据/令牌和一些本地数据:

Rx.Observable.combineLatest(this.userSvc.data$, this.myRelevantData$)
  .subscribe(([data, myData] => this.doMyUpdates(data.someField, myData.someField));
Run Code Online (Sandbox Code Playgroud)

同样,理念是你构建数据流和管道,连接它们,然后你所要做的就是触发东西.

我提出的"迷你模式"是在触发序列后传递给服务并注册到结果.让我们以自动完成为例:

class ACService {
   fetch(text: string): Observable<Array<string>> {
     return http.get(text).map(response => response.json().data;
   }
}
Run Code Online (Sandbox Code Playgroud)

然后,每次文本更改时都必须调用它并将结果分配给组件:

<div class="suggestions" *ngFor="let suggestion; of suggestions | async;">
  <div>{{suggestion}}</div>
</div>
Run Code Online (Sandbox Code Playgroud)

在您的组件中:

onTextChange(text) {
  this.suggestions = acSVC.fetch(text);
}
Run Code Online (Sandbox Code Playgroud)

但这也可以这样做:

class ACService {
   createFetcher(textStream: Observable<string>): Observable<Array<string>> {
     return textStream.flatMap(text => http.get(text))
       .map(response => response.json().data;
   }
}
Run Code Online (Sandbox Code Playgroud)

然后在你的组件中:

textStream: Subject<string> = new Subject<string>();
suggestions: Observable<string>;

constructor(private acSVC: ACService){
  this.suggestions = acSVC.createFetcher(textStream);
}

onTextChange(text) {
  this.textStream.next(text);
}
Run Code Online (Sandbox Code Playgroud)

模板代码保持不变.

这似乎是一件小事,但是一旦应用程序变得越来越大,数据流越来越复杂,这样做效果会更好.您有一个包含数据的序列,您可以在任何需要它的地方使用它,您甚至可以进一步转换它.例如,假设你需要知道建议的数量,在第一种方法中,一旦得到结果,你需要进一步查询它以获得它,因此:

onTextChange(text) {
  this.suggestions = acSVC.fetch(text);
  this.suggestionsCount = suggestions.pluck('length'); // in a sequence
  // or
  this.suggestions.subscribe(suggestions => this.suggestionsCount = suggestions.length); // in a numeric variable.
}
Run Code Online (Sandbox Code Playgroud)

现在在第二种方法中,您只需定义:

constructor(private acSVC: ACService){
  this.suggestions = acSVC.createFetcher(textStream);
  this.suggestionsCount = this.suggestions.pluck('length');
}
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助 :-)

在写作的过程中,我试图反思我采取反应的路径.毋庸置疑,在进行实验时,无数的jsbins和奇怪的失败都是其中的重要组成部分.我认为有助于塑造我的方法的另一件事(虽然我目前没有使用它)是学习redux和读取/尝试一些ngrx(angular的redux端口).哲学和方法不会让你想到程序,所以你必须调整功能,数据,关系和流动的心态.