如何确保生成常量Avro架构并避免"为x创建的太多架构对象"异常?

kos*_*tja 6 scala avro apache-kafka reactive-kafka confluent-schema-registry

在使用反应式kafkaavro4s生成Avro消息时,我遇到了可重现的错误.一旦到达identityMapCapacityclient(CachedSchemaRegistryClient),序列化就失败了

java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value
Run Code Online (Sandbox Code Playgroud)

这是意料之外的,因为所有消息都应该具有相同的模式 - 它们是相同案例类的序列化.

val avroProducerSettings: ProducerSettings[String, GenericRecord] = 
  ProducerSettings(system, Serdes.String().serializer(), 
  avroSerde.serializer())
 .withBootstrapServers(settings.bootstrapServer)

val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
                    ProducerMessage.Result[String, GenericRecord, String],
                    NotUsed] = Producer.flow(avroProducerSettings)

val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] = 
  Source.queue(bufferSize, overflowStrategy)
  .via(avroProdFlow)
  .map(logResult)
  .to(Sink.ignore)
  .run()

...
queue.offer(msg)
Run Code Online (Sandbox Code Playgroud)

序列化程序是一个KafkaAvroSerializer实例化的new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)

生成GenericRecord:

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
  recordFormat.to(a)

val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
  val edgeAvro: GenericRecord = toAvro(edge)
  val record   = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
  ProducerMessage.Message(record, edge.id)
}
Run Code Online (Sandbox Code Playgroud)

模式是在代码深处创建的(io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchemaio.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl我调用),我对它没有任何影响,因此我不知道如何修复泄漏.在我看来,两个融合的项目并不能很好地协同工作.

我在这里找到的问题,这里这里似乎没有解决我的用例.

目前我的两个解决方法是:

  • 不使用模式注册表 - 显然不是一个长期的解决方案
  • 创建自定义SchemaRegistryClient不依赖于对象标识 - 可行,但我想避免创建更多问题而不是重新实现

有没有办法根据消息/记录类型生成或缓存一致的模式,并将其与我的设置一起使用?

kos*_*tja 5

编辑2017.11.20

我的问题是,每个GenericRecord携带我的消息的实例都被一个不同的实例序列化RecordFormat,包含一个不同的实例Schema.这里的隐式解析每次都会生成一个新实例.

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)

解决方案是将RecordFormat实例固定到a val并显式重用它.非常感谢https://github.com/heliocentrist用于解释细节.

原始回复:

在等待一段时间后(也没有回答github问题)我必须实现自己的SchemaRegistryClient.超过90%是从原始复制CachedSchemaRegistryClient,只是翻译成scala.使用scala mutable.Map修复了内存泄漏.我没有进行任何全面的测试,因此请自担风险.

import java.util

import io.confluent.kafka.schemaregistry.client.rest.entities.{ Config, SchemaString }
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest
import io.confluent.kafka.schemaregistry.client.rest.{ RestService, entities }
import io.confluent.kafka.schemaregistry.client.{ SchemaMetadata, SchemaRegistryClient }
import org.apache.avro.Schema

import scala.collection.mutable

class CachingSchemaRegistryClient(val restService: RestService, val identityMapCapacity: Int)
    extends SchemaRegistryClient {

  val schemaCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
  val idCache: mutable.Map[String, mutable.Map[Integer, Schema]] =
    mutable.Map(null.asInstanceOf[String] -> mutable.Map())
  val versionCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()

  def this(baseUrl: String, identityMapCapacity: Int) {
    this(new RestService(baseUrl), identityMapCapacity)
  }

  def this(baseUrls: util.List[String], identityMapCapacity: Int) {
    this(new RestService(baseUrls), identityMapCapacity)
  }

  def registerAndGetId(subject: String, schema: Schema): Int =
    restService.registerSchema(schema.toString, subject)

  def getSchemaByIdFromRegistry(id: Int): Schema = {
    val restSchema: SchemaString = restService.getId(id)
    (new Schema.Parser).parse(restSchema.getSchemaString)
  }

  def getVersionFromRegistry(subject: String, schema: Schema): Int = {
    val response: entities.Schema = restService.lookUpSubjectVersion(schema.toString, subject)
    response.getVersion.intValue
  }

  override def getVersion(subject: String, schema: Schema): Int = synchronized {
    val schemaVersionMap: mutable.Map[Schema, Integer] =
      versionCache.getOrElseUpdate(subject, mutable.Map())

    val version: Integer = schemaVersionMap.getOrElse(
      schema, {
        if (schemaVersionMap.size >= identityMapCapacity) {
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        }

        val version = new Integer(getVersionFromRegistry(subject, schema))
        schemaVersionMap.put(schema, version)
        version
      }
    )
    version.intValue()
  }

  override def getAllSubjects: util.List[String] = restService.getAllSubjects()

  override def getByID(id: Int): Schema = synchronized { getBySubjectAndID(null, id) }

  override def getBySubjectAndID(subject: String, id: Int): Schema = synchronized {
    val idSchemaMap: mutable.Map[Integer, Schema] = idCache.getOrElseUpdate(subject, mutable.Map())
    idSchemaMap.getOrElseUpdate(id, getSchemaByIdFromRegistry(id))
  }

  override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = {
    val response = restService.getVersion(subject, version)
    val id       = response.getId.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def getLatestSchemaMetadata(subject: String): SchemaMetadata = synchronized {
    val response = restService.getLatestVersion(subject)
    val id       = response.getId.intValue
    val version  = response.getVersion.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def updateCompatibility(subject: String, compatibility: String): String = {
    val response: ConfigUpdateRequest = restService.updateCompatibility(compatibility, subject)
    response.getCompatibilityLevel
  }

  override def getCompatibility(subject: String): String = {
    val response: Config = restService.getConfig(subject)
    response.getCompatibilityLevel
  }

  override def testCompatibility(subject: String, schema: Schema): Boolean =
    restService.testCompatibility(schema.toString(), subject, "latest")

  override def register(subject: String, schema: Schema): Int = synchronized {
    val schemaIdMap: mutable.Map[Schema, Integer] =
      schemaCache.getOrElseUpdate(subject, mutable.Map())

    val id = schemaIdMap.getOrElse(
      schema, {
        if (schemaIdMap.size >= identityMapCapacity)
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        val id: Integer = new Integer(registerAndGetId(subject, schema))
        schemaIdMap.put(schema, id)
        idCache(null).put(id, schema)
        id
      }
    )
    id.intValue()
  }
}
Run Code Online (Sandbox Code Playgroud)