如何在RxJS中将可变长度数据包流转换为固定长度数据包?

mie*_*war 6 javascript node.js observable rxjs

我是RxJS的新手。

鉴于以下流

[ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ]
Run Code Online (Sandbox Code Playgroud)

我想将其转换为固定大小的数据包(例如3个字节)+其余的数据包

['foo', ' ba', 'r b', 'az ', '123', '456', '7']
Run Code Online (Sandbox Code Playgroud)

在现实生活中,它实际上是二进制数据的缓冲区。

我想知道什么是惯用的RxJS方法。

我发现的简单方法是:

from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
.pipe(
    Rx.concatMap(v => from(v)),
    Rx.bufferCount(3),
    Rx.map(v => v.join(''))
)
.subscribe(v => console.log(v))
Run Code Online (Sandbox Code Playgroud)

将所有内容分割为一个字符似乎很浪费,所以我发现的另一种方法是使用.slice()可能更好,但更冗长。

const bufferToSize = (chunkSize) => (source) =>
    Observable.create(subscriber => {
        let buffer = new Buffer('')

        return source.subscribe({
            next: (value) => {
                buffer += value

                while (buffer.length > chunkSize) {
                    subscriber.next(buffer.slice(0, chunkSize))
                    buffer = buffer.slice(chunkSize, buffer.length)
                }
            },
            complete: () => {
                subscriber.next(buffer)
                subscriber.complete()
            }
        })
    });

from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
    .pipe(bufferToSize(3))
    .subscribe(v => console.log(v))
Run Code Online (Sandbox Code Playgroud)

都返回预期结果

foo
 ba
r b
az
123
456
7
Run Code Online (Sandbox Code Playgroud)

有更好的方法吗?或至少是更惯用的方式?

谢谢

kur*_*441 1

你的第一个选项是完美的(除了from(),只需使用v => v)。

@Mark 说它将等待可观察的完成以获取全部值,但事实并非如此。它只是等待收集到 3 个字符,然后发出缓冲区。

我创建了延迟版本来向您展示这是连续流。

https://stackblitz.com/edit/buffer-mxsltx?file=index.ts&devtoolsheight=50