我有一个TypeScript/Angular 2 Observable,在我第一次调用它时效果很好.但是,我有兴趣将多个订阅者附加到同一个observable,并以某种方式刷新可观察的和附加的订阅者.这是我得到的:
query(): Rx.Observable<any> {
return this.server.get('http://localhost/rawData.json').toRx().concatMap(
result =>
result.json().posts
)
.map((post: any) => {
var refinedPost = new RefinedPost();
refinedPost.Message = post.Message.toLowerCase();
return refinedPost;
}).toArray();
}
Run Code Online (Sandbox Code Playgroud)
假设有一个刷新按钮,当按下该按钮时,重新执行此observable,并且连接到它的任何订阅者都会获得一组更新的数据.
我怎么能做到这一点?
我有连接的服务Subject()做分页.我正在使用next(newData)传递给主题,这让事情保持活力,现在我需要complete()在每个ajax调用上使用并将其传递给主题.但在做了一个之后,complete()我开始得到错误.
我想知道,如果一旦被触发,我们仍然可以传递Subject可观察量吗?next(newData)completed()
如何对一个Observable进行分组,并从每个GroupedObservable中仅保留内存中最后一个发出的项目?这样每个组的行为就像BehaviorSubject一样.
像这样的东西:
{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
Run Code Online (Sandbox Code Playgroud)
所以在内存中我们只有最后一项user:
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
Run Code Online (Sandbox Code Playgroud)
当订户订阅时,这两个项目立即发布(每个项目都在其自己的排放中).就像我们每个都有BehaviorSubject一样user.
因为人们可能会永远聊天,所以onCompleted()永远不会被激发.
我事先并不知道user可以有什么价值.
为什么为每个订户评估地图运营商而不是一次?
const obs1 = Rx.Observable.interval(1000).take(1).map((x, i) => {
console.log(i+1 + ':1 map')
return 'obs1';
})
const obs2 = Rx.Observable.interval(1300).take(1).map((x, i) => {
console.log(i+1 + ':2 map')
return 'obs2';
})
const obs3 = Rx.Observable.interval(1700).take(2).map((x, i) => {
console.log(i+1 + ':3 map')
return 'obs3';
})
const view = obs1.combineLatest(obs2, obs3, (obs1, obs2, obs3) => { return obs1 + ', ' + obs2 + ', ' + obs3; });
// Every subscriber adds more calls to map - why is it called multiple …Run Code Online (Sandbox Code Playgroud) 我想使用RXJS设置一个ORDERED数据流,该数据流以随机间隔(例如每1-5秒)发出一个数字,我想将其用作时间随机数据源来测试RXJS的其他部分。以下代码以随机顺序(由于延迟)生成项目,但我希望该顺序仅保留随机时间。
function randomDelay(bottom, top) {
return Math.floor( Math.random() * ( 1 + top - bottom ) ) + bottom;
}
var source = Rx.Observable
.range(1, 10)
.flatMap(function (x) {
return Rx.Observable
.of(x)
.delay(randomDelay(1000,5000));
})
.timeInterval();
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
Run Code Online (Sandbox Code Playgroud)
给我以下输出的变体:
Next: {"value":1,"interval":1229}
Next: {"value":2,"interval":321}
Next: {"value":4,"interval":645}
Next: {"value":5,"interval":28}
Next: {"value":9,"interval":728}
Next: {"value":10,"interval":269}
Next: {"value":3,"interval":107}
Next: {"value":6,"interval":265}
Next: …Run Code Online (Sandbox Code Playgroud) 有没有办法从一个EventEmitter(或Angular 2 alpha 46/RxJS 5 alpha中可用的等效物)获得热观察?即,如果我们在值解析后进行订阅,则会使用先前解析的值触发.与我们总是返回相同承诺时的情况类似.
理想情况下,只使用Angular 2对象(我稍后会在某处读取轻量级RxJS以删除依赖项),否则导入RxJS就可以了.AsyncSubject似乎符合我的需要,但在RxJS 5 alpha中不可用.
我尝试了以下,没有成功(从不触发).有关如何使用它的任何想法?
let emitter = new EventEmitter<MyObj>();
setTimeout(() => {emitter.next(new MyObj());});
this.observable = emitter;
return this.observable.share();
Run Code Online (Sandbox Code Playgroud)
用例:只访问一些异步对象(例如,一系列HTTP调用合并/包装在新的中EventEmitter),但将已解析的异步对象提供给订阅它的任何服务/组件,即使它们在解析后订阅(HTTP)收到回复).
编辑:问题不是关于如何合并HTTP响应,而是如何从EventEmitter或任何等效的Angular 2 alpha 46/RxJS 5 alpha获得(hot?)observable,允许在检索/解析异步结果后进行订阅(HTTP只是异步原点的一个例子).myEventEmitter.share()不工作(以上CF plunker),虽然它与可观察到通过HTTP(CF返回工作从plunker @Eric马丁内斯).从Angular 2 alpha 46开始,.toRx()方法不再存在,EventEmitter是observable和subject本身.
只要我们总是返回相同的promise对象,这对promises来说效果很好.由于我们有观察者介绍了HTTP Angular 2服务,我想避免混合使用promises和观察者(据说观察者比promises更强大,因此它应该允许使用promise进行简单的操作).
有关share()的规范(我没有找到版本5 alpha的文档 - Angular 2使用的版本) - 处理Observable由Angular 2 HTTP服务返回的,而不是在EventEmitter上工作.
编辑:澄清为什么不使用HTTP返回的Observable,并补充说不直接使用RxJS会更好.
编辑:更改说明:关注的是多个订阅,而不是合并HTTP结果.
谢谢!
我想采用Observable <T []>并将其转换为Observable <T>,以便Observable <T []>中的每个数组都被分解,然后通过单独的方式发出数组的各个元素Observable <T>.
这样做有标准的操作员吗?我已经四处寻找但没找到任何东西.谢谢.
在指向concatMap/flatMap的方向后,我提出了以下一般解决方案:
var source: Observable<T[]>;
...
var splitSource = source.flatMap<T>((x:T[]) => { return Rx.Observable.fromArray(x); });
Run Code Online (Sandbox Code Playgroud) 我可以从文档中看到:
Cold Observables从开始到每个Observer发出整个值序列.
和
无论观察者何时订阅,ReplaySubject都会向任何观察者发出源Observable发出的所有项目.
那么这两个概念有什么区别呢?
谢谢
我猜这应该有点容易实现,但我遇到了麻烦(从概念上讲,我猜)找出如何解决它.
我所拥有的是一个返回JSON对象数组的API.我需要逐步浏览这些对象,并为每个对象进行另一个AJAX调用.问题是处理每个AJAX调用的系统一次只能处理两个活动调用(因为它是一个CPU密集型任务,可以挂钩到桌面应用程序中).
我想知道如何使用RxJS(使用版本5或4)来实现这一目标?
编辑:此外,是否可以同时运行一系列步骤.即
Downloading File: 1
Processing File: 1
Converting File: 1
Uploading File: 1
Downloading File: 2
Processing File: 2
Converting File: 2
Uploading File: 2
Downloading File: 3
Processing File: 3
Converting File: 3
Uploading File: 3
我尝试过这样的事情:
Rx.Observable.fromPromise(start())
.concatMap(arr => Rx.Observable.from(arr))
.concatMap(x => downloadFile(x))
.concatMap((entry) => processFile(entry))
.concatMap((entry) => convertFile(entry))
.concatMap((entry) => UploadFile(entry))
.subscribe(
data => console.log('data', new Date().getTime(), data),
error => logger.warn('err', error),
complete => logger.info('complete')
);
Run Code Online (Sandbox Code Playgroud)
然而,这似乎不起作用.例如,downloadFile不等待processFile,convertFile和uploadFile全部完成,而是下一个将在前一个完成后再次运行.
我有一个带有输入字段的表单。当我键入 (keyup)Enter键时,必须清除这些字段之一。我知道我可以将此字段作为受控字段处理(意味着监听keyup并维护该字段的副本),或者使用双向绑定,但在我的用例中我不能做后者,我宁愿不做前者。因此,我的问题是,如何强制渲染 Svelte 组件?
通常,表单显示为<Component field="value" />,用户修改字段并单击Enter键,我想app.set(field, value)字段重置为value,即使在 Svelte 看来,该field属性没有改变。那可能吗?
我尝试不成功的转变在于更新Svelte 组件内的属性,希望当 时app.set(field, value),Svelte 将看到该field属性的两个不同值并更新组件。但这似乎不起作用:
<script>
const watchForEnter = ev => {
if (ev.keyCode === 13) {
ev.preventDefault();
const formData = new FormData(ev.target.closest("form"));
const tag = formData.get("tag");
dispatch({ [ADDED_TAG]: tag });
}
};
const updateCurrentTag = ev => {
currentTag = new FormData(ev.target.closest("form")).get("tag");
console.log(`currentTag!`, currentTag)
}
</script>
<form>
<fieldset class="form-group"> …Run Code Online (Sandbox Code Playgroud)