使用 Parquet.NET 写入 Parquet 文件适用于本地文件,但会导致 Blob 存储中的空文件

Sch*_*zIT 6 c# parquet azure-blob-storage azure-functions parquet.net

我们正在使用parquet.net来编写 parquet 文件。我设置了一个包含 3 列和 2 行的简单架构:

        // Set up the file structure
        var UserKey = new Parquet.Data.DataColumn(
            new DataField<Int32>("UserKey"),
            new Int32[] { 1234, 12345}
        );

        var AADID = new Parquet.Data.DataColumn(
            new DataField<string>("AADID"),
            new string[] { Guid.NewGuid().ToString(), Guid.NewGuid().ToString() }
        );

        var UserLocale = new Parquet.Data.DataColumn(
            new DataField<string>("UserLocale"),
            new string[] { "en-US", "en-US" }
        );

        var schema = new Schema(UserKey.Field, AADID.Field, UserLocale.Field
        );
Run Code Online (Sandbox Code Playgroud)

当使用 FileStream 写入本地文件时,会创建一个文件,当代码完成时,我可以在文件中看到两行(后面是 1 kb):

            using (Stream fileStream = System.IO.File.OpenWrite("C:\\Temp\\Users.parquet")) {
                using (var parquetWriter = new ParquetWriter(schema, fileStream)) {
                    // Creare a new row group in the file
                    using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup()) {
                        groupWriter.WriteColumn(UserKey);
                        groupWriter.WriteColumn(AADID);
                        groupWriter.WriteColumn(UserLocale);
                    }
                }
            }
Run Code Online (Sandbox Code Playgroud)

然而,当我尝试使用相同的方法写入我们的 blob 存储时,只会生成一个空文件,并且数据丢失:

// Open reference to Blob Container
CloudAppendBlob blob = OpenBlobFile(blobEndPoint, fileName);

using (MemoryStream stream = new MemoryStream()) {

    blob.CreateOrReplaceAsync();

    using (var parquetWriter = new ParquetWriter(schema, stream)) {
        // Creare a new row group in the file
        using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup()) {
            groupWriter.WriteColumn(UserKey);
            groupWriter.WriteColumn(AADID);
            groupWriter.WriteColumn(UserLocale);
        }
    
    // Set stream position to 0
    stream.Position = 0;
    blob.AppendBlockAsync(stream);
    return true;
}

...

public static CloudAppendBlob OpenBlobFile (string blobEndPoint, string fileName) {
    CloudBlobContainer container = new CloudBlobContainer(new System.Uri(blobEndPoint));
    CloudAppendBlob blob = container.GetAppendBlobReference(fileName);

    return blob;
}
Run Code Online (Sandbox Code Playgroud)

阅读文档,我认为我的 blob.AppendBlocAsync 实现应该可以解决问题,但最终我得到的是一个空文件。有人会建议我为什么会出现这种情况以及如何解决它以便我实际上最终在文件中得到数据吗?

提前致谢。

Sch*_*zIT 1

文件最终为空的解释是:

blob.AppendBlockAsync(stream);
Run Code Online (Sandbox Code Playgroud)

请注意被调用的函数如何具有后缀Async。这意味着它期望任何调用它的东西等待。我将代码所在的函数转换为异步函数,并让 Visual Studio 建议对该行进行以下更改:

_ = await blob.AppendBlockAsync(stream);
Run Code Online (Sandbox Code Playgroud)

我不完全确定_代表什么,并且将鼠标悬停在它上面并不会显示更多信息,除了它是一种long数据类型之外,但代码现在按预期工作。