Tec*_*et9 6 c# stream amazon-s3 parquet .net-core
我正在使用两个C#流API,其中一个是数据源,另一个是数据接收器.
两个API都没有实际公开流对象; 两者都希望您将流传递给它们,并且它们处理流中的写入/读取.
有没有办法将这些API链接在一起,以便源的输出流入接收器,而不必在MemoryStream中缓冲整个源?这是一个非常敏感的RAM应用程序.
这是一个使用我正在尝试避免的MemoryStream方法的示例,因为它在将整个流缓存到RAM之前将其缓存在RAM中:
using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
// This destructor finishes the file and transferUtil closes
// the stream, so we need this weird using nesting to keep everyone happy.
using (var parquetWriter = new ParquetWriter(schema, buffer))
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(...);
...
}
transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}
Run Code Online (Sandbox Code Playgroud)
您正在寻找可以传递给数据源和接收器的流,并且可以异步地"转移"两者之间的数据.有许多可能的解决方案,我可能已经考虑过围绕BlockingCollection的生产者 - 消费者模式.
最近,System.IO.Pipelines,Span和Memory类型的添加确实专注于高性能IO,我认为它非常适合这里.Pipe类与它相关联的Reader和Writer,可以自动处理它们之间的流量控制,背压和IO,同时利用所有新的Span和Memory相关类型.
我在PipeStream上传了一个Gist ,它将为您提供一个带有内部Pipe实现的自定义流,您可以将其传递给两个API类.写入WriteAsync(或Write)方法的任何内容都将可用于ReadAsync(或Read)方法,而无需任何进一步的byte []或MemoryStream分配
在你的情况下,你只需将MemoryStream替换为这个新类,它应该是开箱即用的.我还没有完整的S3测试工作,但直接从Parquet流中读取并将其转储到控制台窗口,这表明它是异步工作的.
// Create some very badly 'mocked' data
var idColumn = new DataColumn(
new DataField<int>("id"),
Enumerable.Range(0, 10000).Select(i => i).ToArray());
var cityColumn = new DataColumn(
new DataField<string>("city"),
Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());
var schema = new Schema(idColumn.Field, cityColumn.Field);
using (var pipeStream = new PipeStream())
{
var buffer = new byte[4096];
int read = 0;
var readTask = Task.Run(async () =>
{
//transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread
while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var incoming = Encoding.ASCII.GetString(buffer, 0, read);
Console.WriteLine(incoming);
// await Task.Delay(5000); uncomment this to simulate very slow consumer
}
});
using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(idColumn); // Step through both these statements to see data read before the parquetWriter completes
rowGroupWriter.WriteColumn(cityColumn);
}
}
Run Code Online (Sandbox Code Playgroud)
实现还没有完全完成,但我认为它显示了一个很好的方法.在控制台'readTask'中,您可以取消注释Task.Delay以模拟慢速读取(transferUtil),您应该看到管道自动限制写入任务.
对于其中一个Span扩展方法,您需要使用C#7.2或更高版本(VS 2017 - >项目属性 - >构建 - >高级 - >语言版本),但它应与任何.Net Framework兼容.您可能需要Nuget包
流是可读写的(显然!)但不可搜索,在这种情况下应该适合您,但不能从需要可搜索流的Parquet SDK中读取.
希望能帮助到你
| 归档时间: |
|
| 查看次数: |
384 次 |
| 最近记录: |