Scala:java.lang.ClassCastException:无法将 java.lang.invoke.SerializedLambda 的实例分配给 scala.Function1 类型的字段 Child1.myfun

Mit*_*ani 0 java scala apache-flink flink-streaming

我想在父类中将函数名称作为参数,以便子类可以设置它。此变量将用于父类的方法之一。

abstract class Parent[T: TypeInformation] {
   val myfun: T => Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   val myfun: User => Unit = service.callme
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}
Run Code Online (Sandbox Code Playgroud)

我是 Scala 的新手,但这看起来不错。虽然编译器没有抱怨,但我收到运行时异常并且工作无法启动:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field Child1.myfun of type scala.Function1 in instance of Child1
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
    ... 11 more

Run Code Online (Sandbox Code Playgroud)

知道如何将函数名称设置为孩子的变量值以便父母可以使用它吗?

Mar*_*ijn 5

您正在谈论传递函数名称,但您传递的是实际函数本身。看起来这不能通过 flink 进行序列化。

另一种编码可能根本不使用函数值,而是一种普通的旧方法:

abstract class Parent[T: TypeInformation] {
   def myfun(t: T): Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   def myfun(t: User): Unit = service.callme(t)
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}
Run Code Online (Sandbox Code Playgroud)

如果这不是开箱即用的,它至少应该给你一个更清晰的错误消息(可能关于Service类路径上的可用性。

解决这个问题可能会让你回到以前的编码