gRPC 文件上传总是返回 The request Stream was aborted

DaI*_*mTo 5 c# grpc grpc-dotnet

我正在尝试将文件从客户端分块上传到服务器。自从我尝试自己编写一个块上传器以来已经有十年了,这比我记忆中的要难。

上传.proto

syntax = "proto3";

option csharp_namespace = "GrpcService1";

package upload;

// The FileDownload service definition.
service Uploader {
  // Download a file
  rpc UploadFileStream(stream UploadFileRequest) returns (UploadFileResponse) {}
}

// The request message containing file data, and file name
message UploadFileRequest {
  bytes data = 1;
  string fileName = 2;
}

// The response from the upload containing the filePath
message UploadFileResponse {
  string filePath =1;
}
Run Code Online (Sandbox Code Playgroud)

上传服务

using Grpc.Core;

namespace GrpcService1.Services;

public class UploaderService : Uploader.UploaderBase
{
    private readonly ILogger<GreeterService> _logger;

    public UploaderService(ILogger<GreeterService> logger)
    {
        _logger = logger;
    }
    
    public override async Task<UploadFileResponse> UploadFileStream(IAsyncStreamReader<UploadFileRequest> request, ServerCallContext context)
    {
        while (await request.MoveNext())
        {
            Console.WriteLine(request.Current.Data);
        }

        return new UploadFileResponse(){ FilePath = @"c:\uploaded\file1.txt"};
    }
}
Run Code Online (Sandbox Code Playgroud)

客户端上传测试方法

 public static async Task Upload(GrpcChannel channel, string filePath)
    {
        var clientUpFile = new Uploader.UploaderClient(channel);
        using var uploadStream = clientUpFile.UploadFileStream();

        var x = File.Exists(filePath);
        
        using (FileStream fileStream = File.OpenRead(filePath) )
        {
            var bufferSize = 512000;
            var buffer = new byte[bufferSize];
            var lastBytesRead = 0;
            var byteCount = 0;

            while ((lastBytesRead = fileStream.Read(buffer, 0, bufferSize)) != 0)
            {
                Console.WriteLine(lastBytesRead);
                
                if (lastBytesRead > 0)
                {
                    await uploadStream.RequestStream.WriteAsync(new UploadFileRequest()
                    {
                        Data = await ByteString.FromStreamAsync(fileStream, CancellationToken.None),
                        FileName = filePath
                    });
                }
            }
        }
Run Code Online (Sandbox Code Playgroud)

问题

客户端似乎正在通过查看其控制台来上传块。日志

你好世界
[ { "name": "Files\DaimtoLogo.jpg" } ]
512

然而,服务器似乎得到了一个块,这是有道理的,因为似乎只有一个块。然后它会抛出读取流的错误。

Google.Protobuf.ByteString 信息:Grpc.AspNetCore.Server.ServerCallHandler[14] 读取消息时出错。System.IO.IOException:请求流已中止。---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: HTTP/2 连接出现故障。---> Microsoft.AspNetCore.Connections.ConnectionResetException: 远程主机强制关闭现有连接。---> System.Net.Sockets.SocketException (10054): 远程主机强制关闭现有连接。在 Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal.SocketConnection.DoReceive() --- 内部异常堆栈跟踪结束 --- 在 System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) 在 System.IO .Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.DuplexPipeStream.ReadAsyncInternal(内存1 destination, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox 1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory1 缓冲区) 在系统.IO.Pipelines.StreamPipeReader.g__Core|36_0(StreamPipeReader reader, CancellationTokenSource tokenSource, CancellationToken CancellationToken) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ReadInputAsync() 在 System.IO.Pipelines.Pipe.GetReadResult (ReadResult& 结果) 在 System.IO.Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ProcessRequestsAsync [TContext](IHttpApplication 1 application) --- End of inner exception stack trace --- --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox 1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadStreamMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func2 反序列化器,CancellationToken CancellationToken) 失败:Grpc.AspNetCore.Server.ServerCallHandler[6] 执行服务方法“UploadFileStream”时出错。System.IO.IOException:请求流已中止。---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: HTTP/2 连接出现故障。---> Microsoft.AspNetCore.Connections.ConnectionResetException: 远程主机强制关闭现有连接。---> System.Net.Sockets.SocketException (10054): 远程主机强制关闭现有连接。在 Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal.SocketConnection.DoReceive() --- 内部异常堆栈跟踪结束 --- 在 System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) 在 System.IO .Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.DuplexPipeStream.ReadAsyncInternal(内存1 destination, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox )1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory1 个缓冲区)位于 System.IO.Pipelines.StreamPipeReader.g__Core|36_0(StreamPipeReader reader、CancellationTokenSource tokenSource、CancellationToken CancellationToken),位于 System.IO 的 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ReadInputAsync() 处。 Pipelines.Pipe.GetReadResult(ReadResult& result) 在 System.IO.Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal。 GrpcService1.Services.UploaderService.UploadFileStream( IAsyncStreamReader 3.Invoke( HttpContext httpContext , ServerCallContext) 处的 Http2.Http2Connection.ProcessRequestsAsync[TContext](IHttpApplication 1 application) --- End of inner exception stack trace --- --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox 1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadStreamMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func2 反序列化器、CancellationToken CancelationToken) Grpc.AspNetCore.Server.Internal.HttpContextStreamReader 1.<MoveNext>g__MoveNextAsync|9_0(ValueTask1 readStreamTask) 1 request, ServerCallContext context) in C:\Development\FreeLance\Glassix\gRCPTest\grcp-samaple\gRCPService\GrpcService1\Services\UploaderService.cs:line 16 at Grpc.Shared.Server.ClientStreamingServerMethodInvokerserverCallContext,IAsyncStreamReader 1 requestStream) at Grpc.Shared.Server.ClientStreamingServerMethodInvoker3.Invoke(HttpContext httpContext,ServerCallContext serverCallContext,IAsyncStreamReader 1 requestStream) at Grpc.AspNetCore.Server.Internal.CallHandlers.ClientStreamingServerCallHandler3.HandleCallAsyncCore(HttpContext httpContext,HttpContextServerCallContext serverCallContext)在Grpc.AspNetCore.Server.Internal.CallHandlers.ServerCallHandlerBase 3.<HandleCallAsync>g__AwaitHandleCall|8_0(HttpContextServerCallContext serverCallContext, Method2方法,任务handleCall)

  1. 什么正在关闭流
  2. 在从服务器返回文件已上传的响应之前,流不应该保持打开状态吗?

注意:更新以修复下面答案中指出的一些拼写错误。

DaI*_*mTo 5

我终于找到问题了。这里的关键是await uploadStream.RequestStream.CompleteAsync();

我在客户端流调用下的文档中找到了这一行

当客户端发送完消息后,RequestStream.CompleteAsync()应该调用服务来通知。

public static async Task Upload(GrpcChannel channel, string filePath)
    {
        var clientUpFile = new Uploader.UploaderClient(channel);
        using var uploadStream = clientUpFile.UploadFileStream();

        // check file exits before uploading 
        if (!File.Exists(filePath)) throw new FileNotFoundException();

        using (FileStream fileStream = File.OpenRead(filePath) )
        {
            var buffer =  ArrayPool<byte>.Shared.Rent(32 * 1024);
            var lastBytesRead = 0;

            while ((lastBytesRead = fileStream.Read(buffer, 0, buffer.Length)) != 0)
            {
                if (lastBytesRead > 0)
                {
                    await uploadStream.RequestStream.WriteAsync(new UploadFileRequest()
                    {
                        Data = ByteString.CopyFrom(buffer,0 , lastBytesRead),
                        FileName = Path.GetFileName(filePath)
                    });
                }
            }
            
            // Notify the service when the client has finished sending messages
            await uploadStream.RequestStream.CompleteAsync();  
            ArrayPool<byte>.Shared.Return(buffer);
        }

        // read the response
        var response = await uploadStream.ResponseAsync;
        Console.WriteLine(response.FilePath);
        
    }
Run Code Online (Sandbox Code Playgroud)

最终上传服务

对于任何将来需要这个的人。这将存储已上传的文件。

public class UploaderService : Uploader.UploaderBase
{
    private readonly ILogger<GreeterService> _logger;

    public UploaderService(ILogger<GreeterService> logger)
    {
        _logger = logger;
    }

    public override async Task<UploadFileResponse> UploadFileStream(IAsyncStreamReader<UploadFileRequest> request,
        ServerCallContext context)
    {
        try
        {
            var dir = "Files";
            var fileName = "temp";

            await using (var fs = System.IO.File.OpenWrite($"{dir}\\temp"))
            {
                await foreach (var chunkMsg in request.ReadAllAsync().ConfigureAwait(false))
                {
                    fileName = chunkMsg.FileName;

                    fs.Write(chunkMsg.Data.ToByteArray());
                }
            }

            System.IO.File.Move($"{dir}\\temp", $"{dir}\\{fileName}", true);

            _logger.LogDebug(@"[FileUploaded] 'Files\{FileName}' uploaded", fileName);
            return new UploadFileResponse() { FilePath = $@"Files\{fileName}" };
        }
        catch (Exception e)
        {
            return new UploadFileResponse() { ErrorMessage = e.Message };
        }
    }
}
Run Code Online (Sandbox Code Playgroud)