小编abh*_*hra的帖子

Kafka 流 GlobalKTable 在 Tombstone - null 值 - 记录上抛出反序列化异常

我有一个基于 Spring 云流的 Kafka Streams 应用程序,我将全局 KTable 绑定到紧凑主题。当我将墓碑记录推送到主题(具有空值的非空键)时,我的 Kafka 流应用程序因反序列化异常而失败。失败是因为我的反序列化器不处理空记录。

从文档中,我认为 GlobalKTable 甚至不会“看到”空值记录。难道不是这样吗?我需要在反序列化器中处理空记录吗?

org.apache.kafka.common.errors.SerializationException: Unable to deserialize
Caused by: java.lang.IllegalArgumentException: argument "src" is null
    at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
    at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
    at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)
Run Code Online (Sandbox Code Playgroud)

java spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka

4
推荐指数
1
解决办法
852
查看次数

NiFi fetchFile处理器不允许动态属性

某些NiFi处理器不允许动态属性的原因是什么?我在其中一个工作流程中使用FetchFile处理器,并且需要在整个流程中传递一些数据才能在最后一步中使用它。但是,FetchFile通过不允许动态属性来破坏它。我想知道是否还有另一种方法?NiFi为什么在某些处理器上不允许动态属性?

我的流程就像

ExecuteScript-> EvaluateJSon->自定义处理器以写入文件-> FetchFile-> SendtoS3->标记工作流程已完成

我想发送一些元数据,以便可以将工作流程标记为完成。我将这些数据作为属性传递,但在FetchFile处断开。

apache-nifi

2
推荐指数
1
解决办法
1131
查看次数