Flink键控流密钥为空

Abh*_*ury 1 java apache-flink flink-streaming

我正在尝试在Flink中的KeyedStream上执行地图操作:

stream.map(new JsonToMessageObjectMapper())
                    .keyBy("keyfield")
                    .map(new MessageProcessorStateful())
Run Code Online (Sandbox Code Playgroud)

JsonToObjectMapper运算符的输出是MessageObject类的POJO,它具有String字段' keyfield '.然后将该流键入此字段.

MessageProcessorStateful是一个RichMapFunction,如下所示:

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

    private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
    ...
    @Override
    public void open(Configuration config) throws Exception {
        MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
                    new MapStateDescriptor<>(
                        "state",                                                                                     // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
                    state = getRuntimeContext().getMapState(descriptor);

        state.put(...); // Insert a key, value here. Exception here!

    }
}
Run Code Online (Sandbox Code Playgroud)

代码抛出NullPointer异常:

Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
    at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
    at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

似乎keyedState中的一个KeyedStream中的键为null,尽管我已经验证'keyfield'始终是一个有效的字符串.根据Flink文档,Rest似乎是正确的.知道发生了什么事吗?

Fab*_*ske 10

问题是您尝试访问方法中的键控状态open().

键控状态为每个键维护一个状态实例.在您使用的示例中MapState.所以MapState每个键都有一个实例.访问状态时,您将始终获得与当前处理的记录的键对应的状态实例.在MapFunction(如您的示例中)这将是传递给map()方法的记录.

由于open()在不调用一个记录,目前的关键open()null,这是不可能的访问密钥的状态.