Sha*_*s88 3 java apache-kafka apache-kafka-streams
我正在尝试 Kafka Streams。编写一个简单的应用程序,在其中计算重复消息。
信息:
2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2491
2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2492
等等。
我正在尝试将此类消息拆分为session:prod-xxxx. 用它作为钥匙。并将session:prod-xxxx+Hello World: xxxx其用作价值。然后按键分组,并查看每个会话中哪些消息重复。
这是代码:
KStream<String, String> textLines = builder.stream("RegularProducer");
KTable<String, Long> ktable = textLines.map(
    (String key, String value) -> {
        try {
            String[] parts = value.split("::");
            String sessionId = parts[1];
            String message = ((parts[2]).split("=>"))[1];
            message = sessionId+":"+message;
            return new KeyValue<String,String>(sessionId.trim().toLowerCase(), message.trim().toLowerCase());
        } catch (Exception e) {
            return new KeyValue<String,String>("Invalid-Message".trim().toLowerCase(), "Invalid Message".trim().toLowerCase());
        }
    })
    .groupBy((key,value) -> value)
    .count().filter(
            (String key, Long value) -> {
                return value > 1;
            }
    );
ktable.toStream().to("RegularProducerDuplicates", 
Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
topology.describe();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
KTable 主题 RegularProducerDuplicates 被生成。但是当我使用 console-consumer 查看它时,它会因错误而崩溃。然后我在控制台消费者上使用 --skip-message-on-error 标志。现在我看到数千行这样的
session:prod-111656 : hello world: 994  [2019-02-28 16:25:18,081] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
谁能帮助我这里出了什么问题?
您的 Kafka Streams 应用程序没有问题并且工作正常。
错误在kafka-console-consumer(kafka.tools.ConsoleConsumer是实现脚本逻辑的类)。
它null在反序列化期间无法正确处理。当它null作为消息的值或键时,它会设置默认值(表示null字符串的字节数组)。如果您检查源代码,您可以找到以下功能
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
  val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
  val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
    getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
  output.write(convertedBytes)
}
您如何看到当它sourceBytes==null为反序列化获取为 null ( ) 的sourceBytes 时,它为此设置了默认值:
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
在你的情况下是"null".getBytes(StandardCharsets.UTF_8)。然后,尝试使用org.apache.kafka.common.serialization.LongDeserializer(您的值反序列化器)进行反序列化。LongDeserializer在最开始检查字节数组的大小。现在它是 4( 的字节表示null)并抛出异常。
例如,如果您使用 StringDeserializer,它不会正确反序列化它,但至少不会抛出异常,因为它不会检查字节数组的长度。
长话短说:ConsoleConsumer 的格式化程序,负责打印,为漂亮的打印设置一些默认值,某些解串器(LongDeserializer,IntegerDeserializer)无法处理这些值
关于,为什么您的应用程序null为某些键生成值:
与KTable:filter具有不同的语义KStream::filter。根据 KTable 的 javadoc:
对于每个被丢弃的记录(即,不满足给定的谓词),一个墓碑记录被转发。
对于您的filter,当count <= 1它null为键传递值时。
| 归档时间: | 
 | 
| 查看次数: | 5082 次 | 
| 最近记录: |