为什么我会收到此编译错误:“无法找到 kstream.Consumed 的隐式值”以及如何修复它?

Nic*_*oiu 2 scala jsonschema avro apache-kafka apache-kafka-streams

我们有这些依赖关系:

libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"
Run Code Online (Sandbox Code Playgroud)

我们使用代码生成器从 AVRO 模式文件生成 Scala 案例类。一个这样生成的案例类具有 Either 值作为其字段之一。在 AVRO 模式中,这是用 type=[t1,t2] 表示的,因此生成看起来不错,这是一个总和类型:可以是类型 t1 或类型 t2。

问题变成从主题到案例类(二进制 -> Avro Map -> 案例类)的反序列化路径中缺少什么。

基本上我目前收到此错误:

could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")
Run Code Online (Sandbox Code Playgroud)

第一个想法是 kafka-streams-avro-serde,但可能这个库只确保 AVRO Map 的 Serde[GenericRecord],而不是案例类。因此,其他依赖项之一是帮助 AVRO GenericRecord 到案例类映射并返回。我们还有一些手写代码,可以根据模式生成案例类,这些代码似乎可以直接与 Spray json 一起使用。

我认为在 (binary <-> Avro GenericRecord <-> case class 实例) 转换中,存在差距,并且可能是在 case 类中存在 Either 字段的事实?

我现在正在尝试创建 Serde[UserEvent] 实例。因此,在我的理解中,这将涉及 UserEvent 和 AVRO GenericRecord 之间的转换,类似于 Map,然后是 AVRO Record 和二进制之间的转换 - 这可能由 kafka-streams-avro-serde 依赖项覆盖,就像应该有一个 Serde[GenericRecord]或类似的。

明智的导入,我们有这样的导入隐式:


import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
Run Code Online (Sandbox Code Playgroud)

Nic*_*oiu 6

事实上缺少一个进口。现在可以编译了。以下是进口:

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
Run Code Online (Sandbox Code Playgroud)