Joh*_*ohn 4 java type-erasure apache-flink
我有一个抽象类,其抽象方法创建一个SourceFunction,因此派生类可以返回简单或更复杂的源(例如KafkaConsumers等)。 ChangeMe是通过 AvroSchema 编译创建的简单自动生成类。
public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) {\n FromElementsFunction<ChangeMe> dataSource = null;\n\n List<ChangeMe> changeMeList = Arrays.asList(\n ChangeMe.newBuilder().setSomeField("Some field 1").build(),\n ChangeMe.newBuilder().setSomeField("Some field 2").build(),\n ChangeMe.newBuilder().setSomeField("Some field 3").build()\n );\n try {\n dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList);\n }\n catch (IOException ex){\n\n }\n\n return dataSource;\n}\nRun Code Online (Sandbox Code Playgroud)\n\n在我的 Flink 工作中,我基本上是这样的:
\n\nSourceFunction<ChangeMe> source = createSourceFunction(params);\nDataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source);\n\n\nDataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream(); // gets sourceDataStream above\nchangeMeEventsStream.print();\nRun Code Online (Sandbox Code Playgroud)\n\n当我运行作业时,我收到与 print() 调用相关的错误:
\n\nException in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function \'Custom Source\' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the \'ResultTypeQueryable\' interface.\n\xe2\x80\xa6\xe2\x80\xa6\nCaused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable \'T\' in \'class org.apache.flink.streaming.api.functions.source.FromElementsFunction\' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).\nRun Code Online (Sandbox Code Playgroud)\n\n我使用的是 Eclipse 编译器,所以我原以为会包含类型信息(尽管我认为这只是针对 lambda,而上面没有)。我需要做什么才能使其正常运行?
\n如果你想直接实例化一个类FromElementsFunction,那么你必须在调用时手动TypeInformation为该类提供一个实例。这对于 Flink 了解元素类型是必要的。ChangeMeaddSource
下面的代码片段应该可以解决问题:
SourceFunction<ChangeMe> source = createSourceFunction();
TypeInformation<ChangeMe> typeInfo = TypeInformation.of(ChangeMe.class);
DataStream<ChangeMe> sourceDataStream = env.addSource(source, typeInfo);
DataStream<ChangeMe> changeMeEventsStream = sourceDataStream;
changeMeEventsStream.print();
Run Code Online (Sandbox Code Playgroud)