Ste*_*lis 12 javascript rxjs angular
我正在尝试创建一个满足以下要求的可观察流:
我知道如何执行步骤(3)有很多不同的答案,但是当我试图一起执行这些步骤时,我正在寻找关于我提出的解决方案是否最简洁的指导可以(我怀疑!).
这是一个展示我当前方法的示例:
var cachedRequestToken;
var cachedRequest;
function getOrUpdateValue() {
return loadFromStorage()
.flatMap(data => {
// data doesn't exist, shortcut out
if (!data || !data.refreshtoken)
return Rx.Observable.empty();
// data still valid, return the existing value
if (data.expires > new Date().getTime())
return Rx.Observable.return(data.value);
// if the refresh token is different or the previous request is
// complete, start a new request, otherwise return the cached request
if (!cachedRequest || cachedRequestToken !== data.refreshtoken) {
cachedRequestToken = data.refreshtoken;
var pretendHttpBody = {
value: Math.random(),
refreshToken: Math.random(),
expires: new Date().getTime() + (10 * 60 * 1000) // set by server, expires in ten minutes
};
cachedRequest = Rx.Observable.create(ob => {
// this would really be a http request that exchanges
// the one use refreshtoken for new data, then saves it
// to storage for later use before passing on the value
window.setTimeout(() => { // emulate slow response
saveToStorage(pretendHttpBody);
ob.next(pretendHttpBody.value);
ob.completed();
cachedRequest = null; // clear the request now we're complete
}, 2500);
});
}
return cachedRequest;
});
}
function loadFromStorage() {
return Rx.Observable.create(ob => {
var storedData = { // loading from storage goes here
value: 15, // wrapped in observable to delay loading until subscribed
refreshtoken: 63, // other process may have updated this between requests
expires: new Date().getTime() - (60 * 1000) // pretend to have already expired
};
ob.next(storedData);
ob.completed();
})
}
function saveToStorage(data) {
// save goes here
}
// first request
getOrUpdateValue().subscribe(function(v) { console.log('sub1: ' + v); });
// second request, can occur before or after first request finishes
window.setTimeout(
() => getOrUpdateValue().subscribe(function(v) { console.log('sub2: ' + v); }),
1500);
Run Code Online (Sandbox Code Playgroud)
首先,看看一个工作的jsbin示例.
解决方案与您的初始代码有点不同,我想解释原因.需要继续返回本地存储,保存,保存标志(缓存和令牌)并不适合我的反应性,功能性方法.我给出的解决方案的核心是:
var data$ = new Rx.BehaviorSubject(storageMock);
var request$ = new Rx.Subject();
request$.flatMapFirst(loadFromServer).share().startWith(storageMock).subscribe(data$);
data$.subscribe(saveToStorage);
function getOrUpdateValue() {
return data$.take(1)
.filter(data => (data && data.refreshtoken))
.switchMap(data => (data.expires > new Date().getTime()
? data$.take(1)
: (console.log('expired ...'), request$.onNext(true) ,data$.skip(1).take(1))));
}
Run Code Online (Sandbox Code Playgroud)
关键是data$保存最新数据并始终保持最新状态,通过执行操作即可轻松访问data$.take(1).在take(1)确保您的订阅得到一个单一的价值观和终止是很重要的(因为你试图在一个程序性的工作,而不是功能性的,方式).如果没有take(1)您的订阅将保持活动状态,您将拥有多个处理程序,即您将在代码中处理未来的更新,该代码仅适用于当前更新.
另外,我还有一个request$主题,可以从服务器开始获取新数据.该功能的工作原理如下:
return Rx.Observable.empty()您的数据.data$.take(1)哪个是您可以订阅的单个元素序列.request$.onNext(true)并返回data$.skip(1).take(1).这skip(1)是为了避免当前的,过时的价值.为了简洁,我用了(console.log('expired ...'), request$.onNext(true) ,data$.skip(1).take(1))).这可能看起来有点神秘.它使用js逗号分隔语法,这在minifiers/uglifiers中很常见.它执行所有语句并返回最后一个语句的结果.如果你想要一个更易读的代码,你可以像这样重写它:
.switchMap(data => {
if(data.expires > new Date().getTime()){
return data$.take(1);
} else {
console.log('expired ...');
request$.onNext(true);
return data$.skip(1).take(1);
}
});
Run Code Online (Sandbox Code Playgroud)
最后一部分是使用flatMapFirst.这可确保在请求正在进行时,将删除所有后续请求.您可以在控制台打印输出中看到它的工作原理."从服务器加载"被打印多次,但实际的序列只被调用一次,你可以从打印输出中获得一次"从服务器完成加载".对于原始的refreshtoken标记检查,这是一个更具反应性的解决方案.
虽然我不需要保存的数据,但它会被保存,因为您提到您可能希望在将来的会话中阅读它.
关于rxjs的一些提示:
setTimeout你可以简单地做,而不是使用可能导致许多问题的问题Rx.Observable.timer(time_out_value).subscribe(...).
创建观察到的是繁琐的(你甚至不得不打电话next(...)和complete()).你有一个更清洁的方法来使用它Rx.Subject.请注意,您有此类的规范,BehaviorSubject和ReplaySubject.这些课程值得了解,可以提供很多帮助.
最后一点.这是一个非常挑战:-)我不熟悉您的服务器端代码和设计考虑因素,但抑制呼叫的需要让我觉得不舒服.除非有一个与你的后端相关的非常好的理由,否则我的自然方法是使用flatMap并让最后一个请求'win',即删除之前未终止的调用并设置值.
代码是基于rxjs 4(所以它可以在jsbin中运行),如果你使用angular2(因此rxjs 5),你需要调整它.请查看迁移指南.
================回答史蒂夫的其他问题(在下面的评论中)=======
有一篇文章,我可以推荐.它的标题说明了一切:-)
至于程序与功能方法,我会在服务中添加另一个变量:
let token$ = data$.pluck('refreshtoken');
Run Code Online (Sandbox Code Playgroud)
然后在需要时消耗它.
我的一般方法是首先映射我的数据流和关系,然后像一个好的"键盘管道工"(就像我们都是),构建管道.我的服务的顶级草案将是(跳过angular2手续和提供者为简洁):
class UserService {
data$: <as above>;
token$: data$.pluck('refreshtoken');
private request$: <as above>;
refresh(){
request.onNext(true);
}
}
Run Code Online (Sandbox Code Playgroud)
您可能需要进行一些检查,以确保pluck不会失败.
然后,需要数据或令牌的每个组件都可以直接访问它.
现在假设您有一项服务需要对数据或令牌进行更改:
class SomeService {
constructor(private userSvc: UserService){
this.userSvc.token$.subscribe(() => this.doMyUpdates());
}
}
Run Code Online (Sandbox Code Playgroud)
如果您需要合成数据,意思是使用数据/令牌和一些本地数据:
Rx.Observable.combineLatest(this.userSvc.data$, this.myRelevantData$)
.subscribe(([data, myData] => this.doMyUpdates(data.someField, myData.someField));
Run Code Online (Sandbox Code Playgroud)
同样,理念是你构建数据流和管道,连接它们,然后你所要做的就是触发东西.
我提出的"迷你模式"是在触发序列后传递给服务并注册到结果.让我们以自动完成为例:
class ACService {
fetch(text: string): Observable<Array<string>> {
return http.get(text).map(response => response.json().data;
}
}
Run Code Online (Sandbox Code Playgroud)
然后,每次文本更改时都必须调用它并将结果分配给组件:
<div class="suggestions" *ngFor="let suggestion; of suggestions | async;">
<div>{{suggestion}}</div>
</div>
Run Code Online (Sandbox Code Playgroud)
在您的组件中:
onTextChange(text) {
this.suggestions = acSVC.fetch(text);
}
Run Code Online (Sandbox Code Playgroud)
但这也可以这样做:
class ACService {
createFetcher(textStream: Observable<string>): Observable<Array<string>> {
return textStream.flatMap(text => http.get(text))
.map(response => response.json().data;
}
}
Run Code Online (Sandbox Code Playgroud)
然后在你的组件中:
textStream: Subject<string> = new Subject<string>();
suggestions: Observable<string>;
constructor(private acSVC: ACService){
this.suggestions = acSVC.createFetcher(textStream);
}
onTextChange(text) {
this.textStream.next(text);
}
Run Code Online (Sandbox Code Playgroud)
模板代码保持不变.
这似乎是一件小事,但是一旦应用程序变得越来越大,数据流越来越复杂,这样做效果会更好.您有一个包含数据的序列,您可以在任何需要它的地方使用它,您甚至可以进一步转换它.例如,假设你需要知道建议的数量,在第一种方法中,一旦得到结果,你需要进一步查询它以获得它,因此:
onTextChange(text) {
this.suggestions = acSVC.fetch(text);
this.suggestionsCount = suggestions.pluck('length'); // in a sequence
// or
this.suggestions.subscribe(suggestions => this.suggestionsCount = suggestions.length); // in a numeric variable.
}
Run Code Online (Sandbox Code Playgroud)
现在在第二种方法中,您只需定义:
constructor(private acSVC: ACService){
this.suggestions = acSVC.createFetcher(textStream);
this.suggestionsCount = this.suggestions.pluck('length');
}
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助 :-)
在写作的过程中,我试图反思我采取反应的路径.毋庸置疑,在进行实验时,无数的jsbins和奇怪的失败都是其中的重要组成部分.我认为有助于塑造我的方法的另一件事(虽然我目前没有使用它)是学习redux和读取/尝试一些ngrx(angular的redux端口).哲学和方法不会让你想到程序,所以你必须调整功能,数据,关系和流动的心态.