我创建了一个流媒体环境的配置,并试图在访问此配置open()的方法RichMapFunction。
例子:
Configuration conf = new Configuration();
conf.setBoolean("a", true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(8, conf);
DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5);
source.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
boolean a = parameters.getBoolean("a", false);
super.open(parameters);
}
@Override
public Integer map(Integer value) throws Exception {
return value;
}
}).print();
env.execute();
Run Code Online (Sandbox Code Playgroud)
但是,在调试该open()方法时,我发现配置为空。
我究竟做错了什么?如何RichFunction在流媒体环境中将配置正确传递给 a ?
Flink 的 DataStream 和 DataSet API 共享与RichMapFunction您的示例中相同的用户功能接口。
在Configuration该参数open弗林克的方法RichFunction是从DataSet API的第一个版本,而不是通过Datastream API使用的遗产。Flink 序列化您在map()调用中提供的对象并将其发送给并行工作程序。因此,您可以直接在对象中将参数设置为常规字段。
| 归档时间: |
|
| 查看次数: |
1292 次 |
| 最近记录: |