@aws-sdk/lib-storage 使用 JSONStream.stringify() 将 JSON 从 MongoDB 流式传输到 S3

Oro*_*vid 7 javascript mongodb amazon-web-services node.js aws-sdk

我正在尝试使用新版本的 @aws-sdk/lib-storage 将 JSON 从 MongoDB 流式传输到 S3:

"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",
Run Code Online (Sandbox Code Playgroud)

尝试#1:看来我没有正确使用 JSONStream.stringify() :

import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};
Run Code Online (Sandbox Code Playgroud)

错误#1:

TypeError [ERR_INVALID_ARG_TYPE]:第一个参数必须是字符串、Buffer、ArrayBuffer、Array 或 Array-like Object 类型之一。在 processTicksAndRejections (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 处的 Function.from (buffer.js:305:9) 接收类型对象 (内部/process/task_queues.js:94:5) 在Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 在Upload.__doConcurrentUpload ( /.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在异步 Promise.all (索引 0) 在 Upload.__doMultipartUpload (/.../node_modules/@aws- sdk/lib-storage/src/Upload.ts:196:5) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

尝试#2,使用变量jsonStream

  const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    const jsonStream = readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: jsonStream,
      },
    });
Run Code Online (Sandbox Code Playgroud)

错误#2:

ReferenceError:ReadableStream 未在 Upload.__doMultipartUpload (/.../node_modules/@) 处的 Object.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) 处定义aws-sdk/lib-storage/src/Upload.ts:187:24) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)

尝试#3:使用stream.PassThrough

    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));

...

const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
  try{

    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    const res = await upload.done();
    
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    return;
  }
};
Run Code Online (Sandbox Code Playgroud)

错误#3:

'dest.on 不是 Stream.pipe 中的函数(internal/streams/legacy.js:30:8'

尝试 #4: mongodb.stream({transform: doc => JSON.stringify...}) 而不是 JSONStream:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName)
      .find('{}')
      .limit(5)
      .stream({ transform: doc => JSON.stringify(doc) + '\n' });
  
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
  
    await upload.done(); 
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};
Run Code Online (Sandbox Code Playgroud)

错误:#4:

TypeError [ERR_INVALID_ARG_TYPE]:第一个参数必须是字符串、Buffer、ArrayBuffer、Array 或 Array-like Object 类型之一。在 processTicksAndRejections (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 处的 Function.from (buffer.js:305:9) 接收类型对象 (内部/process/task_queues.js:94:5) 在Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 在Upload.__doConcurrentUpload ( /.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在异步 Promise.all (索引 0) 在 Upload.__doMultipartUpload (/.../node_modules/@aws- sdk/lib-storage/src/Upload.ts:196:5) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

尝试#5:使用stream.PassThrough()pass返回pipe

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
    readStream.pipe(uploadStreamFile());
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


const stream = require('stream');

export const uploadStreamFile = async() => {
  try{
    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    await upload.done();
    return pass;
  }
  catch(err){
    log.error('pawoooooo', err);
    return;
  }
};
Run Code Online (Sandbox Code Playgroud)

错误#5:

TypeError:dest.on 不是 Cursor.pipe 中的函数 (_stream_read.js:680:8)

jcc*_*ero 2

查看错误堆栈跟踪后,问题可能与 MongoDB 驱动程序在对象模式下提供游标这一事实有关,而Body的参数Upload需要传统流,适合Buffer在这种情况下进行处理。

以您的原始代码作为参考,您可以尝试提供一个Transform流来处理这两个要求。

请考虑以下代码:

import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    // We are creating here a Transform to adapt both sides
    const toJSONTransform = new Transform({
      writableObjectMode: true,
      transform(chunk, encoding, callback) {
        this.push(JSON.stringify(chunk) + '\n');
        callback();  
      }  
    });

    readStream.pipe(toJSONTransform);
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: toJSONTransform,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};
Run Code Online (Sandbox Code Playgroud)

在代码中,toJSONTransform我们将流的可写部分定义为对象模式;相反,可读部分将适合从 S3 方法读取Upload......至少,我希望如此。

关于您报告的第二个错误,与 相关的错误dest.on,我最初认为,并且我写信给您讨论了这种可能性,该错误的动机是因为uploadStreamFile您返回的是Promise,而不是流,并且您将其传递Promisepipe方法,该方法需要一个流,基本上你返回了错误的变量。但我没有意识到您正在尝试将流PassThrough作为参数传递给Upload方法:请注意,该流不包含任何信息,因为您没有向它传递任何信息,获得的可读流的内容来自 MongoDB 查询的数据永远不会传递给回调函数,也不会传递给其Upload本身。