如何链接两个希望您提供流的C#API?

Tec*_*et9 6 c# stream amazon-s3 parquet .net-core

我正在使用两个C#流API,其中一个是数据源,另一个是数据接收器.

两个API都没有实际公开流对象; 两者都希望您将流传递给它们,并且它们处理流中的写入/读取.

有没有办法将这些API链接在一起,以便源的输出流入接收器,而不必在MemoryStream中缓冲整个源?这是一个非常敏感的RAM应用程序.

这是一个使用我正在尝试避免的MemoryStream方法的示例,因为它在将整个流缓存到RAM之前将其缓存在RAM中:

using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
    // This destructor finishes the file and transferUtil closes 
    // the stream, so we need this weird using nesting to keep everyone happy.
    using (var parquetWriter = new ParquetWriter(schema, buffer)) 
        using (var rowGroupWriter = parquetWriter.CreateRowGroup())
        {
            rowGroupWriter.WriteColumn(...);
            ...
        }
    transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}
Run Code Online (Sandbox Code Playgroud)

Ste*_*les 5

您正在寻找可以传递给数据源和接收器的流,并且可以异步地"转移"两者之间的数据.有许多可能的解决方案,我可能已经考虑过围绕BlockingCollection的生产者 - 消费者模式.

最近,System.IO.Pipelines,Span和Memory类型的添加确实专注于高性能IO,我认为它非常适合这里.Pipe类与它相关联的Reader和Writer,可以自动处理它们之间的流量控制,背压和IO,同时利用所有新的Span和Memory相关类型.

我在PipeStream上传了一个Gist ,它将为您提供一个带有内部Pipe实现的自定义流,您可以将其传递给两个API类.写入WriteAsync(或Write)方法的任何内容都将可用于ReadAsync(或Read)方法,而无需任何进一步的byte []或MemoryStream分配

在你的情况下,你只需将MemoryStream替换为这个新类,它应该是开箱即用的.我还没有完整的S3测试工作,但直接从Parquet流中读取并将其转储到控制台窗口,这表明它是异步工​​作的.

// Create some very badly 'mocked' data
var idColumn = new DataColumn(
    new DataField<int>("id"),
    Enumerable.Range(0, 10000).Select(i => i).ToArray());

var cityColumn = new DataColumn(
    new DataField<string>("city"),
    Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());

var schema = new Schema(idColumn.Field, cityColumn.Field);

using (var pipeStream = new PipeStream())
{
    var buffer = new byte[4096];
    int read = 0;

    var readTask = Task.Run(async () =>
    {
        //transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread 
        while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
        {
            var incoming = Encoding.ASCII.GetString(buffer, 0, read);
            Console.WriteLine(incoming);
            // await Task.Delay(5000); uncomment this to simulate very slow consumer
        }
    });

    using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
    using (var rowGroupWriter = parquetWriter.CreateRowGroup())
    {
        rowGroupWriter.WriteColumn(idColumn);  // Step through both these statements to see data read before the parquetWriter completes
        rowGroupWriter.WriteColumn(cityColumn);
    }       
}
Run Code Online (Sandbox Code Playgroud)

实现还没有完全完成,但我认为它显示了一个很好的方法.在控制台'readTask'中,您可以取消注释Task.Delay以模拟慢速读取(transferUtil),您应该看到管道自动限制写入任务.

对于其中一个Span扩展方法,您需要使用C#7.2或更高版本(VS 2017 - >项目属性 - >构建 - >高级 - >语言版本),但它应与任何.Net Framework兼容.您可能需要Nuget包

流是可读写的(显然!)但不可搜索,在这种情况下应该适合您,但不能从需要可搜索流的Parquet SDK中读取.

希望能帮助到你