我一直在尝试在节点应用程序中使用 Rxjs。fileList$是从fs.readdirsync(字符串数组)的返回。
第一个map()有一个名为 filename 的参数。
flatMap() readFileAsObservable()用于bindNodeCallback(fs.readFile)读取文件。
我的班级Testian需要 2 个参数;通过yaml-js读取文件和filename从第一张地图创建的对象。如何filename在我指定的管道中访问?
fileList$
.pipe(
map((filename: string) => `${resolvedDirPath}/${filename}`),
flatMap(
(filePath: string) => readFileAsObservable(filePath, 'utf8') as Observable<string>
),
map((fileData: string) => yaml.safeLoad(fileData) as ITestYaml),
map((testYaml: ITestYaml) => new Testian(testYaml, [I want to use filename here])),
flatMap((testYaml: Testian) => {
const prom: Promise<{}> = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
Run Code Online (Sandbox Code Playgroud)
这在任何涉及链式函数(例如承诺)的 API 中的处理方式类似。
临时变量可用于存储超出应访问它的函数范围的值。这是一个简单但非惯用的解决方法:
let filename;
fileList$.pipe(
map((_filename) => {
filename = _filename;
return `${resolvedDirPath}/${filename}`;
}),
flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
map((fileData) => yaml.safeLoad(fileData)),
map((testYaml) => new Testian(testYaml, filename)),
flatMap((testYaml) => {
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
Run Code Online (Sandbox Code Playgroud)
竞争条件可能存在问题,具体取决于特定的可观察对象。
filename可以嵌套使用的函数从父作用域访问变量:
fileList$.pipe(
flatMap((filename) => of(`${resolvedDirPath}/${filename}`).pipe(
flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
map((fileData) => yaml.safeLoad(fileData)),
map((testYaml) => new Testian(testYaml, filename)
),
flatMap((testYaml) => {
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
Run Code Online (Sandbox Code Playgroud)
在可能的情况下,该变量可以与其他其他结果一起传递:
fileList$.pipe(
map((filename) => [filename, `${resolvedDirPath}/${filename}`]),
flatMap(
([filename, filePath]) => forkJoin(filename, readFileAsObservable(filePath, 'utf8')),
),
map(([filename, fileData]) => [filename, yaml.safeLoad(fileData) as ITestYaml)],
map(([filename, testYaml]) => new Testian(testYaml, filename)),
flatMap((testYaml) => {
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return from(prom);
})
)
Run Code Online (Sandbox Code Playgroud)
如果流允许切换到 promises and async..await,则可以这样做,因为 function 中不存在函数作用域的问题async。
fileList$.pipe(
flatMap(async (filename) => {
const filePath = `${resolvedDirPath}/${filename}`;
const fileData = await readFileAsObservable(filePath, 'utf8').toPromise();
let testYaml = yaml.safeLoad(fileData);
testYaml = new Testian(testYaml, filename);
const prom = activeTests.set(testYaml);
outgoing.sendTest(testYaml);
return prom;
})
)
Run Code Online (Sandbox Code Playgroud)
由于这个 observable 已经使用了flatMap和 promises,它可以安全地单独使用 promises 来编写。RxJS observables 有一些不适合 promises 的用例,但这不是其中之一。