我刚刚使用 Confluence Platform 6.0.0(Apache Kafka 版本 2.6.0)完成了 Kafka 集群设置。Kafka 代理部署在 Kubernetes 中。只要不压缩,向新主题生成消息就可以正常工作。
然而,我只是尝试生成一个快速的压缩消息并返回一个错误。因此,我查看了代理日志并在代理中看到了以下异常:
[2020-11-24 13:14:37,834] ERROR (data-plane-kafka-request-handler-0:Logging) [ReplicaManager broker=1] Error processing append operation on partition customers-2
org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:92)
at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:261)
at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:340)
at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:401)
at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1$adapted(LogValidator.scala:394)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:394)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
at kafka.log.Log.$anonfun$append$2(Log.scala:1095)
at kafka.log.Log.append(Log.scala:2340)
at kafka.log.Log.appendAsLeader(Log.scala:1019)
at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:984)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:972)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:883)
at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.mutable.HashMap.map(HashMap.scala:34)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:871)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:571)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:605)
at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:145)
at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:90)
... 24 more
Run Code Online (Sandbox Code Playgroud)
附加信息:在 Kubernetes 中,我配置容器没有将文件写入本地文件系统的权限。我不确定这是否相关,但也许这是成功初始化 snappy 类所必需的?
为什么 Kafka 无法处理快速压缩消息?
您可以通过两种不同的方式解决这个问题:
KAFKA_OPTS='-Dorg.xerial.snappy.tempdir=/some/other/path/with/exec/permissions'。kafka-run-class.sh这将由(& )获取,kafka-server-start以便您的代理将库解压到具有执行权限的合适文件系统。mount -o remount,exec /tmp)问题解释如下:
| 归档时间: |
|
| 查看次数: |
4327 次 |
| 最近记录: |