Flink 中是否已弃用 JSONDeserializationSchema() ?

Raj*_*war 5 apache-flink flink-streaming

我是 Flink 新手,正在做与以下链接非常相​​似的事情。

下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息

我还尝试添加 JSONDeserializationSchema() 作为没有密钥的 Kafka 输入 JSON 消息的反序列化器。

但我发现 JSONDeserializationSchema() 不存在。

如果我做错了什么,请告诉我。

在此输入图像描述

Dav*_*son 8

JSONDeserializationSchema在之前被弃用后,在 Flink 1.8 中被删除。

推荐的方法是编写一个实现DeserializationSchema<T>. 这是我从Flink Operations Playground复制的一个示例:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
 * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
 *
 */
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {

    private static final long serialVersionUID = 1L;

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, ClickEvent.class);
    }

    @Override
    public boolean isEndOfStream(ClickEvent nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ClickEvent> getProducedType() {
        return TypeInformation.of(ClickEvent.class);
    }
}
Run Code Online (Sandbox Code Playgroud)

对于 Kafka 生产者,您需要实现KafkaSerializationSchema<T>,并且您会在同一项目中找到相关示例。