标签: rxjs6

Angular/RxJs 6 - 组合路由参数和查询参数不再起作用

我花了至少 2 个小时试图让事情与版本 6 一起工作,但无济于事。我只是无法同时获得路由参数和查询参数。

这是与旧版本最接近的语法,但它只记录查询参数。

我想要做的是将其包装在全局路由服务中,以便方法调用是干净的,如果发生任何其他更新,我可以在一个地方进行更改。

    import {BehaviorSubject, combineLatest, Observable} from 'rxjs';

constructor(private router: Router, private route: ActivatedRoute)
// body of constructor left out


     // Combine them both into a single observable
    const urlParams: Observable<any> = combineLatest(
        this.route.params,
        this.route.queryParams,
        (params, queryParams) => ({ ...params, ...queryParams})
    );

    urlParams.subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)

我还注意到,由于某种原因,combinedLatest 不在“rxjs/operators”中。Observable.combineLatest 也不起作用。

谢谢。

angular rxjs6

3
推荐指数
1
解决办法
4133
查看次数

如何触发 rxjs observable

使用 rxjs v6

因此,我有多个要跟踪/观察的变量,当其中任何一个发生变化时,触发 Observable 调用 API,然后根据这些变量更新其他内容。

我以为我可以在 a 上触发一个事件div来强制 observable 做某事,但它不会根据我想要的结果返回结果

例如

var a, b, c, d // if any updates, should trigger div "update" event

let obs = fromEvent(this.$refs.updater, "update")
    .pipe(
        switchMap(i => {return this.callAnAPI()})
    )
obs.subscribe()

var update = new Event("update")
this.$refs.updater.dispatchEvent(update)
Run Code Online (Sandbox Code Playgroud)

但是,在 observable 上没有 subscribe 方法。我试图修改我的其他 Observable 之一(有效)

fromEvent(input, "keyup")
    .pipe(
        map(i => i.currentTarget.value),
        debounceTime(delay),
        distinctUntilChanged(),
        switchMap(value => 
        {
            this.page = 1
            if (ajax)
            {
                return ajax
            }   
            else
            {
                return this.axiosAjax({
                    path,
                    method, …
Run Code Online (Sandbox Code Playgroud)

javascript rxjs rxjs6

3
推荐指数
1
解决办法
6762
查看次数

使用 Rxjs 6 在 Angular 6 中返回一个空/空的 Observable

我正在使用 Rxjs 6处理Angular 6,同时我有一个关于如果响应失败或有异常返回空/空 Observable 的问题,这是我的假设,我的远程 api 将返回一个IOptionResponse对象,它包含一个消息字符串,可以被指示为“成功”或“失败”,它还包含一个模型,它是一个“IOption”对象数组

export interface IOptionResponse {
    message: string;
    model: IOption[];
}
Run Code Online (Sandbox Code Playgroud)

这是我的服务方法名称,它将返回一个 IOption 数组的 Observable,它是我的远程 API 结果的“模型”

loadIOptionMembersRelationship(): Observable<IOption[]> {
    return this.httpClient.get<IOptionResponse>('${environment.apiUrl}/api/member/XXX')
        .map(
            (response) => {
                console.log(response);
                // if response.message is success, return IOption[] model
                if (response.message == responseMessage.Success) {
                    return response.model;
                }
                else {
                    // return Observable.empty<IOption[]>(); failed
                    // return new Observable.empty<IOption[]>(); failed
                    // return new EmptyObservable<IOption[]>(); failed
                    // return new Observable<IOption[]>.from([]); failed
                    // Otherwise …
Run Code Online (Sandbox Code Playgroud)

rxjs typescript angular6 rxjs6

3
推荐指数
1
解决办法
2万
查看次数

rxjs6 分区方法:“UnaryFunction”类型的参数不可分配给“OperatorFunction”类型的参数

使用 Rxjs 6 时,IDE 会在使用partition方法时报告错误- 即使函数工作正常。

例如:我有这个示例代码:

import { of } from 'rxjs';
import { filter, partition } from 'rxjs/operators';

let obs = of(1,2,3,4,6,9,111,13,10,12);

let [a, b] = obs.pipe(
  filter(v => v > 3),
  partition(p => p % 2 == 0)
);

a.subscribe(v => {
  console.log(v);
});

b.subscribe(v => {
  console.log(v);
});
Run Code Online (Sandbox Code Playgroud)

该代码工作正常,并以奇数和偶数分隔输入。但是VSCode和StackBlitz在partition调用方法的那一行报错:

“UnaryFunction, [Observable, Observable]>”类型的参数不可分配给“OperatorFunction”类型的参数。类型 '[Observable, Observable]' 不可分配给类型 'Observable'。类型“[Observable, Observable]”中缺少属性“_isScalar”。

这是stackblitz URL:https ://stackblitz.com/edit/rxjs6-learn

rxjs6

3
推荐指数
1
解决办法
1297
查看次数

如何在史诗中获得以前的状态

史诗中有没有办法接收商店以前的状态?我觉得你不能,因为史诗站在Action -> Reducer -> Epic队列的末尾。

记住:当一个 Epic 收到一个动作时,它已经通过你的减速器运行并且状态更新了。

但也许有人会为我的问题提出一些解决方案:

我有一个处理下载 json 的史诗,然后将其存储在商店中。我认为另一部将检查已更改内容的史诗会很好地发送“新项目”通知,但为此我需要在商店更改之前访问状态:) 也许我应该在同一个史诗中执行此操作?

reactjs redux-observable rxjs6

3
推荐指数
1
解决办法
906
查看次数

保持对管道运算符之间变量的访问

我一直在尝试在节点应用程序中使用 Rxjs。fileList$是从fs.readdirsync(字符串数组)的返回。

第一个map()有一个名为 filename 的参数。

flatMap() readFileAsObservable()用于bindNodeCallback(fs.readFile)读取文件。

我的班级Testian需要 2 个参数;通过yaml-js读取文件和filename从第一张地图创建的对象。如何filename在我指定的管道中访问?

fileList$
    .pipe(
        map((filename: string) => `${resolvedDirPath}/${filename}`),
        flatMap(
            (filePath: string) => readFileAsObservable(filePath, 'utf8') as Observable<string>
        ),
        map((fileData: string) => yaml.safeLoad(fileData) as ITestYaml),
        map((testYaml: ITestYaml) => new Testian(testYaml, [I want to use filename here])),
        flatMap((testYaml: Testian) => {
            const prom: Promise<{}> = activeTests.set(testYaml);
            outgoing.sendTest(testYaml);
            return from(prom);
        })
    )
Run Code Online (Sandbox Code Playgroud)

node.js rxjs6

3
推荐指数
1
解决办法
2429
查看次数

RxJS 6 扩展不是无限的?

我一直在玩 RxJS 来探索边缘并遇到了一些意想不到的事情。

考虑以下代码:

of(1)
.pipe(expand(x => of(x + 1)))
.subscribe(
    x => console.log(x), 
    err => console.log(err), 
    () => console.log('complete'));
Run Code Online (Sandbox Code Playgroud)

在 node 中运行时,我希望这会产生一个无限的数字列表,每个数字增加 1,当它超过 JavaScript 中的最大数字时最终会出错,或者我会用 Ctrl-C(在 Windows 上)杀死它。我没想到的是,在记录的数字少于 1000 个后,它会停止而不会出错。

我得到的是写入控制台的数字 1 - 783。

然后它停止了。

每次。

我运行它。它停在 783(这不是一个特殊数字 - 256、1024 等)。它不会记录错误或“完成”。它只是停止。

我原以为它会“永远”运行,抛出错误,或者至少注销“完成”。但它只是停止并将我返回到命令行。

我做了一些挖掘并认为它可能与可选的并发参数有关,Number.POSITIVE_INFINITY根据expand文档默认

但是,如果我明确指定此参数的值,我会得到同样奇怪的结果 - 即 1 的并发值(始终为 ~860)比并发值 1,000,000(回到 783)产生的值多于 100(~850) ,以及超过 200 (~840)。似乎大多数值都是在并发为 1 的情况下发出的,然后随着并发性的增加,它慢慢地下降到 783 的低值。也就是说,我不知道是否真的是并发影响了这一点,尽管它看起来确实相关。

我快速浏览了 expand 的源代码,但没有看到任何可以解释这一点的内容。

这不是我会在生产中运行的代码,但它激起了我的好奇心。有没有人对什么可以解释这一点有任何想法?

  • RxJS 6.3.2

  • 节点 8.9.4

谢谢,

电讯

rxjs rxjs6

3
推荐指数
1
解决办法
434
查看次数

'Observable&lt;Object&gt;' 类型不存在属性 'timeout'

我正在尝试从 5 升级到 Angular 6 并收到此错误:

src/app/services/http.service.ts(17,14) 中的错误:错误 TS2339:“可观察”类型上不存在属性“超时”。

我在 http.service.ts 中的代码:

import { throwError as observableThrowError,  Observable } from 'rxjs';
import { Injectable } from '@angular/core';
import { environment } from "environments/environment";
import { AppService } from 'app/app.service';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class HttpService {

    private baseUrl = environment.apiUrl;

    constructor(private http: HttpClient, private appService: AppService) { }

    public get(endpoint: string): Observable<any>{
        return this.http.get(this.baseUrl + endpoint)
            .timeout(this.appService.timeoutInterval)
            .retryWhen(error => error.delay(this.appService.waitInterval)
                .take(this.appService.numberOfRetries)
                .concat(observableThrowError(new Error())))
            .share();
    }
} …
Run Code Online (Sandbox Code Playgroud)

rxjs5 angular angular5 angular6 rxjs6

3
推荐指数
1
解决办法
5983
查看次数

带有 RxJS 的可暂停缓冲区

我正在尝试使用 RxJS 流实现可切换的自动保存功能。目标是:

  • 启用自动保存后,将更改发送到服务器。
  • 禁用自动保存时,缓冲更改并在重新启用自动保存时将其发送到服务器。

这是我遇到的:

autoSave$ = new BehaviorSubject(true);
change$ = new Subject();

change$.pipe(
  bufferToggle(
    autoSave$.pipe(filter(autoSave => autoSave === false)),
    () => autoSave$.pipe(filter(autoSave => autoSave === true)),
  ),
  concatMap(changes => changes),
  concatMap(change => apiService.patch(change)),
).subscribe(
  () => console.log('Change sent'),
  (error) => console.error(error),
);
Run Code Online (Sandbox Code Playgroud)

感谢bufferToggle,我能够在autoSave关闭时缓冲更改并在重新启用时发送它们。

问题是,虽然autoSave启用,但没有通过。我理解这是因为bufferToggle忽略了即将到来的流量,而其开放的 observable 不发出。

我觉得我应该有一个条件来绕过bufferTogglewhileautoSave启用,但我所有的尝试都惨遭失败。

有什么想法可以实现这一目标吗?

rxjs rxjs6

3
推荐指数
1
解决办法
608
查看次数

RxJS 6 当页面不活动时暂停或缓冲可观察

所以我有一个流,比如说字母,我需要按正确的顺序将所有字母组合成一个词。一切工作正常,直到用户更改标签,尽量减少浏览器或切换应用程序-行为几乎是一样的,如果我用setTimeout()-搞砸的顺序,丢失物品等我尝试用达到我的目的bufferWhen()bufferToggle()takeUntil()publish()connect(),但没有成功了。我也考虑过使用delayWhen,但它已被弃用并且可能不适合,因为它会立即停止流。我应该使用哪些功能以及如何使用?这是我的代码:

export class MyComponent implements AfterViewInit {
  private visibilityChange$ = fromEvent(document, 'visibilitychange').pipe(startWith('visible'), shareReplay({ refCount: true, bufferSize: 1 }));
  private show$ = this.visibilityChange$.pipe(filter(() => document.visibilityState === 'visible'));
  private hide$ = this.visibilityChange$.pipe(filter(() => document.visibilityState === 'hidden'));

  public ngAfterViewInit() {
    const lettersStream$ = zip( // add delay for each letter
            from(['w', 'o', 'r', 'd']),
            interval(1000))
           // pause when hide$ fires, resume when show$
          .pipe(map(([letter, delayTime]) => letter))
          .subscribe(console.log);
  } …
Run Code Online (Sandbox Code Playgroud)

asynchronous rxjs typescript angular rxjs6

3
推荐指数
1
解决办法
1352
查看次数