Scala Grpc 失败,错误为 io.grpc.StatusRuntimeException: CANCELLED: 无法读取消息

Kno*_*uch 1 scala grpc grpc-java

我正在尝试使用 GRPC 在 Scala 中编写流服务。为此我写了这个原型文件

syntax = "proto3";
package com.abhi.grpc;

message TimeRequest{}
message TimeResponse {
    int64 currentTime = 1;
}

service Clock {
    rpc StreamTime(TimeRequest) returns (stream TimeResponse);
}
Run Code Online (Sandbox Code Playgroud)

这是我的服务器端代码

import com.abhi.grpc.clock.{ClockGrpc, TimeRequest, TimeResponse}
import io.grpc.stub.StreamObserver
import monix.execution.Scheduler
import monix.execution.Scheduler.{global => scheduler}
import scala.concurrent.duration._

object ClockGrpcServer extends GrpcServer with App {
   val ssd = ClockGrpc.bindService(new ClockGRPC(), Scheduler.global)
   runServer(ssd, "Clock")
}

class ClockGRPC extends ClockGrpc.Clock {
   override def streamTime(request: TimeRequest, responseObserver: StreamObserver[TimeResponse]): Unit = {
      scheduler.scheduleWithFixedDelay(0.seconds, 3.seconds) {
         responseObserver.onNext(TimeResponse(System.currentTimeMillis))
      }
   }
}
Run Code Online (Sandbox Code Playgroud)

这是我的客户

object ClockGrpcClient extends App {
   val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build()
   val stub = ClockGrpc.stub(channel)
   val observer = new StreamObserver[TimeResponse] {
      override def onError(t: Throwable): Unit = println(s"failed with error ${t}")
      override def onCompleted(): Unit = println("closing observer")
      override def onNext(value: TimeResponse): Unit = println(s"received time ${new DateTime(value)}")
   }
   stub.streamTime(TimeRequest(), observer)
   StdIn.readLine()
}
Run Code Online (Sandbox Code Playgroud)

当我运行服务器和客户端时。服务器一旦收到客户端的任何消息就会抛出以下错误

io.grpc.StatusRuntimeException: CANCELLED
        at io.grpc.Status.asRuntimeException(Status.java:534)
        at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:279)
        at com.abhi.ClockGRPC.$anonfun$streamTime$1(ClockGRPC.scala:22)
        at monix.execution.internal.RunnableAction.run(RunnableAction.scala:25)
        at monix.execution.schedulers.ReferenceScheduler$$anon$1.run(ReferenceScheduler.scala:45)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Run Code Online (Sandbox Code Playgroud)

我用谷歌搜索了一下,发现了这篇文章

https://blog.codecentric.de/en/2017/01/hello-grpc-scalapb/

基于此我更改了我的服务器以使用 java.util 调度程序

class ClockGRPC extends ClockGrpc.Clock {
   val scheduler = Executors.newSingleThreadScheduledExecutor()
   override def streamTime(request: TimeRequest, responseObserver: StreamObserver[TimeResponse]): Unit = {
      val tick = new Runnable {
         val counter = new AtomicInteger(10)
         def run() =
            if (counter.getAndDecrement() >= 0) {
               val currentTime = System.currentTimeMillis()
               responseObserver.onNext(TimeResponse(currentTime))
            } else {
               scheduler.shutdown()
               responseObserver.onCompleted()
            }
      }
      scheduler.scheduleAtFixedRate(tick, 0l, 3000l, TimeUnit.SECONDS)
   }
}
Run Code Online (Sandbox Code Playgroud)

但我仍然收到取消错误。所以我无法让流媒体示例正常工作。

Kno*_*uch 7

我几乎已经放弃了这个问题。不过今天回来解决了。

问题出在线路上

override def onNext(value: TimeResponse): Unit = println(s"received time ${new DateTime(value)}")
Run Code Online (Sandbox Code Playgroud)

值无法传递给new DateTime

让事情进一步变得更糟。如果异常发生在回调方法中。Grpc 吞掉它并用通用错误消息替换它

info] Running com.abhi.ClockGrpcClient failed with error io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
Run Code Online (Sandbox Code Playgroud)

我运气不好,他DateTime使用了一个对象作为参数,所以编译成功了,但是运行时调用失败,异常被Grpc吞掉了。

我将其留在这里,以便对其他人有所帮助。

[info] Running com.abhi.ClockGrpcClient failed with error io.grpc.StatusRuntimeException: CANCELLED: Failed to read message
Run Code Online (Sandbox Code Playgroud)

意味着有人在回调函数中出错了。

  • 很高兴您解决了这个问题。关于 gRPC Java 的 StatusRuntimeException 的简单说明:gRPC 会将异常包装并重新抛出为 StatusRuntimeException,但原始异常不会被吞掉。它被保留为错误的原因(https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/ClientCallImpl.java#L487)并且可以可通过 Status#getCause 访问。 (2认同)