eth*_*nny 10 java serialization dagger apache-flink
我使用Flink(最新通过git)从kafka流到cassandra.为了简化单元测试我通过Dagger添加依赖注入.
ObjectGraph似乎正在正确设置自己,但是'内部对象'被Flink标记为"不可序列化".如果我直接包含这些对象,它们可以工作 - 那么有什么区别?
有问题的类实现了MapFunction和@Inject一个用于cassandra的模块和一个用于读取配置文件的模块.
有没有办法建立这个,所以我可以使用后期绑定或Flink使这不可能?
fwiw - 依赖注入(通过匕首)和RichMapFunction不能共存.Dagger不允许您包含任何在其定义中扩展的对象.
通过Dagger Lazy <t>实例化的对象也不会序列化.
线程"main"中的异常org.apache.flink.api.common.InvalidProgramException:对象com.someapp.SaveMap@2e029d61不可序列化
...
引起:java.io.NotSerializableException:dagger.internal.LazyBinding $ 1
Ste*_*wen 20
在深入研究问题的具体细节之前,有关Apache Flink中函数可序列化的一些背景知识:
Apache Flink使用Java Serialization(java.io.Serializable)将函数对象(此处MapFunction)发送给并行执行它们的worker.因此,函数需要是可序列化的:函数可能不包含任何非可序列化的字段,即非原始(int,long,double,...)和不实现的类型java.io.Serializable.
使用非可序列化构造的典型方法是懒惰地初始化它们.
在Flink函数中使用非可序列化类型的一种方法是懒惰地初始化它们.保存这些类型的字段仍然null是在序列化要发送的函数时,并且仅在工作人员对函数进行反序列化后设置.
例如,在Scala中,您可以简单地使用惰性字段lazy val x = new NonSerializableType().该NonSerializableType类型实际上只在首次访问变量时创建,该变量x通常在worker上.因此,类型可以是不可序列化的,因为x当函数序列化为运送给工作者时,该类型为null.
在Java中,open()如果将其设置为Rich Function,则可以在函数方法上初始化非可序列化字段.丰富的功能(如RichMapFunction)是基本功能的扩展版本(这里MapFunction),让您可以访问生命周期方法,如open()和close().
我不太熟悉依赖注入,但是dagger似乎也提供了类似于懒惰依赖的东西,这可能有助于解决方法,就像Scala中的惰性变量一样:
new MapFunction<Long, Long>() {
@Inject Lazy<MyDependency> dep;
public Long map(Long value) {
return dep.get().doSomething(value);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5254 次 |
| 最近记录: |