Kafka Broker 为生成快速压缩消息的客户端抛出错误

ken*_*tor 1 apache-kafka

我刚刚使用 Confluence Platform 6.0.0(Apache Kafka 版本 2.6.0)完成了 Kafka 集群设置。Kafka 代理部署在 Kubernetes 中。只要不压缩,向新主题生成消息就可以正常工作。

然而,我只是尝试生成一个快速的压缩消息并返回一个错误。因此,我查看了代理日志并在代理中看到了以下异常:

[2020-11-24 13:14:37,834] ERROR (data-plane-kafka-request-handler-0:Logging) [ReplicaManager broker=1] Error processing append operation on partition customers-2
org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
        at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:92)
        at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:261)
        at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:340)
        at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:401)
        at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1$adapted(LogValidator.scala:394)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
        at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:394)
        at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
        at kafka.log.Log.$anonfun$append$2(Log.scala:1095)
        at kafka.log.Log.append(Log.scala:2340)
        at kafka.log.Log.appendAsLeader(Log.scala:1019)
        at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:984)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:972)
        at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:883)
        at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.mutable.HashMap.map(HashMap.scala:34)
        at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:871)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:571)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:605)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
        at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:145)
        at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
        at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
        at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:90)
        ... 24 more
Run Code Online (Sandbox Code Playgroud)

附加信息:在 Kubernetes 中,我配置容器没有将文件写入本地文件系统的权限。我不确定这是否相关,但也许这是成功初始化 snappy 类所必需的?

为什么 Kafka 无法处理快速压缩消息?

Ale*_*uma 5

您可以通过两种不同的方式解决这个问题:

  1. 设置环境变量KAFKA_OPTS='-Dorg.xerial.snappy.tempdir=/some/other/path/with/exec/permissions'kafka-run-class.sh这将由(& )获取,kafka-server-start以便您的代理将库解压到具有执行权限的合适文件系统。
  2. 重建您的 docker 映像,以便您使用 exec 权限挂载 /tmp ( mount -o remount,exec /tmp)

问题解释如下:

  1. https://issues.apache.org/jira/browse/KAFKA-8622
  2. https://docs.datastax.com/en/dse-trblshoot/doc/troubleshooting/snappytrbl.html