如何将一个Observable转换为BehaviorSubject?

awm*_*eer 7 javascript observable rxjs typescript behaviorsubject

我正在尝试将Observable转换为BehaviorSubject。像这样:

a$ = new Observable()
b$ = BehaviorSubject.create(new BehaviorSubject(123), a$)
// 
Run Code Online (Sandbox Code Playgroud)

我也尝试过:

a$ = new Observable()
b$ = new BehaviorSubject(a$, 123)
// 
Run Code Online (Sandbox Code Playgroud)

和:

a$ = new Observable()
b$ = a$.asBehaviorSubject(123)
// 
Run Code Online (Sandbox Code Playgroud)

和:

a$ = new Observable()
b$ = a$.pipe(
  toBehaviorSubject(123)
)
// 
Run Code Online (Sandbox Code Playgroud)

但是这些都不起作用。现在,我必须像这样实现:

a$ = new Observable()
b$ = new BehaviorSubject(123)
a$.subscribe(b$)
// 
Run Code Online (Sandbox Code Playgroud)

在课堂上,这有点难看:

class Foo() {
  a$ = new Observable() // Actually, a$ is more complicated than this.
  b$ = new BehaviorSubject(123)

  constructor() {
    this.a$.subscribe(this.b$)
  }
}
Run Code Online (Sandbox Code Playgroud)

因此,有没有更简单的方法可以在不使用类构造函数的情况下将Observable转换为BehaviorSubject?


这是我的真实情况:

export class Foo {
  autoCompleteItems$ = new BehaviorSubject<string[]>(null)
  autoCompleteSelected$ = new BehaviorSubject<number>(-1)
  autoCompleteSelectedChange$ = new Subject<'up'|'down'>()

  constructor() {
    this.autoCompleteItems$.pipe(
      switchMap((items) => {
        if (!items) return EMPTY
        return this.autoCompleteSelectedChange$.pipe(
          startWith('down'),
          scan<any, number>((acc, value) => {
            if (value === 'up') {
              if (acc <= 0) {
                return items.length - 1
              } else {
                return acc - 1
              }
            } else {
              if (acc >= items.length - 1) {
                return 0
              } else {
                return acc + 1
              }
            }
          }, -1)
        )
      })
    ).subscribe(this.autoCompleteSelected$)
  }

  doAutoComplete = () => {
    const item = this.autoCompleteItems$.value[this.autoCompleteSelected$.value]
    // do something with `item`
  }
}
Run Code Online (Sandbox Code Playgroud)

yay*_*aya 55

不需要转换它。

只需创建一个主题并将可观察对象附加到它: obs.subscribe(sub)

例子:

var obs = new rxjs.Observable((s) => {setTimeout(()=>{s.next([1])} , 500)}) //observable
var sub = new rxjs.BehaviorSubject([0]) //create subject
obs.subscribe(sub) //<----- HERE ----- attach observable to subject
setTimeout(() => {sub.next([2, 3])}, 1500) //subject updated
sub.subscribe(a => console.log(a)) //subscribe to subject
Run Code Online (Sandbox Code Playgroud)

注意obs.subscribe(sub)相当于:

obs.subscribe({
  next: v => sub.next(v),
  error: v => sub.error(v),
  complete: () => sub.complete()
})
Run Code Online (Sandbox Code Playgroud)

在线运行

  • 没有“取消订阅”? (7认同)
  • 主题可以被视为观察者,因此可以直接用作订阅的参数。obs.订阅(子)。而不是手动连接到 sub.next、error 到 sub.error 等。 (2认同)

And*_*son 9

将可观察量通过管道传输到shareReplay(1). 任何订阅者都shareReplay(1)将:

  1. 立即接收从源发出的最新事件
  2. 源发出的任何未来事件。

这使得它完全充当一个行为主体。

https://rxjs.dev/api/operators/shareReplay

const { Subject, shareReplay } = rxjs; 
    
console.log("Start")
let source = new Subject();
    
let converted = source.pipe(shareReplay(1));
    
    
console.log("Subscription A starting")
converted.subscribe(s=>console.log("Subscription A emits",s));

console.log("Source next 1")
source.next(1);

console.log("Subscription B starting")
converted.subscribe(s=>console.log("Subscription B emits",s));

console.log("Source next 2")
source.next(2);
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@7.5.7/dist/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)


tmu*_*sch 7

这是我将Observables转换为BehaviorSubjects的方式:

import { Observable, BehaviorSubject } from 'rxjs';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
    const subject = new BehaviorSubject(initValue);

    observable.subscribe({
        complete: () => subject.complete(),
        error: x => subject.error(x),
        next: x => subject.next(x)
    });

    return subject;
}
Run Code Online (Sandbox Code Playgroud)

  • @JuozasRastenis 不,这段代码显然不会取消订阅。但您可以自己实现该功能。此代码片段只是为您自己的解决方案提供一个有用的起点。 (4认同)
  • 在“return”之后,订阅会被取消订阅吗?我们如何确定没有可能的内存泄漏? (3认同)