DaI*_*mTo 5 c# grpc grpc-dotnet
我正在尝试将文件从客户端分块上传到服务器。自从我尝试自己编写一个块上传器以来已经有十年了,这比我记忆中的要难。
syntax = "proto3";
option csharp_namespace = "GrpcService1";
package upload;
// The FileDownload service definition.
service Uploader {
// Download a file
rpc UploadFileStream(stream UploadFileRequest) returns (UploadFileResponse) {}
}
// The request message containing file data, and file name
message UploadFileRequest {
bytes data = 1;
string fileName = 2;
}
// The response from the upload containing the filePath
message UploadFileResponse {
string filePath =1;
}
Run Code Online (Sandbox Code Playgroud)
using Grpc.Core;
namespace GrpcService1.Services;
public class UploaderService : Uploader.UploaderBase
{
private readonly ILogger<GreeterService> _logger;
public UploaderService(ILogger<GreeterService> logger)
{
_logger = logger;
}
public override async Task<UploadFileResponse> UploadFileStream(IAsyncStreamReader<UploadFileRequest> request, ServerCallContext context)
{
while (await request.MoveNext())
{
Console.WriteLine(request.Current.Data);
}
return new UploadFileResponse(){ FilePath = @"c:\uploaded\file1.txt"};
}
}
Run Code Online (Sandbox Code Playgroud)
public static async Task Upload(GrpcChannel channel, string filePath)
{
var clientUpFile = new Uploader.UploaderClient(channel);
using var uploadStream = clientUpFile.UploadFileStream();
var x = File.Exists(filePath);
using (FileStream fileStream = File.OpenRead(filePath) )
{
var bufferSize = 512000;
var buffer = new byte[bufferSize];
var lastBytesRead = 0;
var byteCount = 0;
while ((lastBytesRead = fileStream.Read(buffer, 0, bufferSize)) != 0)
{
Console.WriteLine(lastBytesRead);
if (lastBytesRead > 0)
{
await uploadStream.RequestStream.WriteAsync(new UploadFileRequest()
{
Data = await ByteString.FromStreamAsync(fileStream, CancellationToken.None),
FileName = filePath
});
}
}
}
Run Code Online (Sandbox Code Playgroud)
客户端似乎正在通过查看其控制台来上传块。日志
你好世界
[ { "name": "Files\DaimtoLogo.jpg" } ]
512
然而,服务器似乎得到了一个块,这是有道理的,因为似乎只有一个块。然后它会抛出读取流的错误。
Google.Protobuf.ByteString 信息:Grpc.AspNetCore.Server.ServerCallHandler[14] 读取消息时出错。System.IO.IOException:请求流已中止。---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: HTTP/2 连接出现故障。---> Microsoft.AspNetCore.Connections.ConnectionResetException: 远程主机强制关闭现有连接。---> System.Net.Sockets.SocketException (10054): 远程主机强制关闭现有连接。在 Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal.SocketConnection.DoReceive() --- 内部异常堆栈跟踪结束 --- 在 System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) 在 System.IO .Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.DuplexPipeStream.ReadAsyncInternal(内存
1 destination, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory1 缓冲区) 在系统.IO.Pipelines.StreamPipeReader.g__Core|36_0(StreamPipeReader reader, CancellationTokenSource tokenSource, CancellationToken CancellationToken) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ReadInputAsync() 在 System.IO.Pipelines.Pipe.GetReadResult (ReadResult& 结果) 在 System.IO.Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ProcessRequestsAsync [TContext](IHttpApplication1 application) --- End of inner exception stack trace --- --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadStreamMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func2 反序列化器,CancellationToken CancellationToken) 失败:Grpc.AspNetCore.Server.ServerCallHandler[6] 执行服务方法“UploadFileStream”时出错。System.IO.IOException:请求流已中止。---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: HTTP/2 连接出现故障。---> Microsoft.AspNetCore.Connections.ConnectionResetException: 远程主机强制关闭现有连接。---> System.Net.Sockets.SocketException (10054): 远程主机强制关闭现有连接。在 Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal.SocketConnection.DoReceive() --- 内部异常堆栈跟踪结束 --- 在 System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) 在 System.IO .Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.DuplexPipeStream.ReadAsyncInternal(内存1 destination, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox )1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory1 个缓冲区)位于 System.IO.Pipelines.StreamPipeReader.g__Core|36_0(StreamPipeReader reader、CancellationTokenSource tokenSource、CancellationToken CancellationToken),位于 System.IO 的 Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ReadInputAsync() 处。 Pipelines.Pipe.GetReadResult(ReadResult& result) 在 System.IO.Pipelines.Pipe.GetReadAsyncResult() 在 System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 令牌) 在 Microsoft.AspNetCore.Server.Kestrel.Core.Internal。 GrpcService1.Services.UploaderService.UploadFileStream( IAsyncStreamReader 3.Invoke( HttpContext httpContext , ServerCallContext) 处的 Http2.Http2Connection.ProcessRequestsAsync[TContext](IHttpApplication1 application) --- End of inner exception stack trace --- --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadStreamMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func2 反序列化器、CancellationToken CancelationToken) Grpc.AspNetCore.Server.Internal.HttpContextStreamReader1.<MoveNext>g__MoveNextAsync|9_0(ValueTask1 readStreamTask)1 request, ServerCallContext context) in C:\Development\FreeLance\Glassix\gRCPTest\grcp-samaple\gRCPService\GrpcService1\Services\UploaderService.cs:line 16 at Grpc.Shared.Server.ClientStreamingServerMethodInvokerserverCallContext,IAsyncStreamReader1 requestStream) at Grpc.Shared.Server.ClientStreamingServerMethodInvoker3.Invoke(HttpContext httpContext,ServerCallContext serverCallContext,IAsyncStreamReader1 requestStream) at Grpc.AspNetCore.Server.Internal.CallHandlers.ClientStreamingServerCallHandler3.HandleCallAsyncCore(HttpContext httpContext,HttpContextServerCallContext serverCallContext)在Grpc.AspNetCore.Server.Internal.CallHandlers.ServerCallHandlerBase3.<HandleCallAsync>g__AwaitHandleCall|8_0(HttpContextServerCallContext serverCallContext, Method2方法,任务handleCall)
注意:更新以修复下面答案中指出的一些拼写错误。
我终于找到问题了。这里的关键是await uploadStream.RequestStream.CompleteAsync();
我在客户端流调用下的文档中找到了这一行
当客户端发送完消息后,
RequestStream.CompleteAsync()应该调用服务来通知。
public static async Task Upload(GrpcChannel channel, string filePath)
{
var clientUpFile = new Uploader.UploaderClient(channel);
using var uploadStream = clientUpFile.UploadFileStream();
// check file exits before uploading
if (!File.Exists(filePath)) throw new FileNotFoundException();
using (FileStream fileStream = File.OpenRead(filePath) )
{
var buffer = ArrayPool<byte>.Shared.Rent(32 * 1024);
var lastBytesRead = 0;
while ((lastBytesRead = fileStream.Read(buffer, 0, buffer.Length)) != 0)
{
if (lastBytesRead > 0)
{
await uploadStream.RequestStream.WriteAsync(new UploadFileRequest()
{
Data = ByteString.CopyFrom(buffer,0 , lastBytesRead),
FileName = Path.GetFileName(filePath)
});
}
}
// Notify the service when the client has finished sending messages
await uploadStream.RequestStream.CompleteAsync();
ArrayPool<byte>.Shared.Return(buffer);
}
// read the response
var response = await uploadStream.ResponseAsync;
Console.WriteLine(response.FilePath);
}
Run Code Online (Sandbox Code Playgroud)
对于任何将来需要这个的人。这将存储已上传的文件。
public class UploaderService : Uploader.UploaderBase
{
private readonly ILogger<GreeterService> _logger;
public UploaderService(ILogger<GreeterService> logger)
{
_logger = logger;
}
public override async Task<UploadFileResponse> UploadFileStream(IAsyncStreamReader<UploadFileRequest> request,
ServerCallContext context)
{
try
{
var dir = "Files";
var fileName = "temp";
await using (var fs = System.IO.File.OpenWrite($"{dir}\\temp"))
{
await foreach (var chunkMsg in request.ReadAllAsync().ConfigureAwait(false))
{
fileName = chunkMsg.FileName;
fs.Write(chunkMsg.Data.ToByteArray());
}
}
System.IO.File.Move($"{dir}\\temp", $"{dir}\\{fileName}", true);
_logger.LogDebug(@"[FileUploaded] 'Files\{FileName}' uploaded", fileName);
return new UploadFileResponse() { FilePath = $@"Files\{fileName}" };
}
catch (Exception e)
{
return new UploadFileResponse() { ErrorMessage = e.Message };
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2211 次 |
| 最近记录: |