相关疑难解决方法(0)

卡夫卡消费者抛出java.lang.OutOfMemoryError:直接缓冲内存

我正在使用单节点Kafka代理(0.10.2)和单节点Zookeeper代理(3.4.9)。我有一个消费服务器(单核和1.5 GB RAM)。每当我运行具有5个或更多线程的进程时,抛出这些异常后,我的使用者的线程就会被杀死

  1. 例外1

java.lang.OutOfMemoryError:Java堆空间位于org.apache.kafka.common.network.NetworkReceive的java.nio.ByteBuffer.allocate(ByteBuffer.java:335)处的java.nio.HeapByteBuffer。(HeapByteBuffer.java:57) org.apache.kafka.common.network上的.readFromReadableChannel(NetworkReceive.java:93).org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169.net.NetworkReceive.readFrom(NetworkReceive.java:71) )的org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)的org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)。 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)的org.apache.kafka.clients.consumer.internals的common.network.Selector.poll(Selector.java:303)。org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)上的ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)位于org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ HeartbeatThread。运行(AbstractCoordinator.java:887)

  1. 例外2

kafka-coordinator-heartbeat-thread中未捕获的异常 topic1:java.lang.OutOfMemoryError:直接缓冲存储器位于java.nio.DirectByteBuffer。(DirectByteBuffer.java:123)处的java.nio.Bits.reserveMemory(Bits.java:693)。java.nio.ByteBuffer.allocateDirect(ByteBuffer .java:311),位于sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241),位于sun.nio.ch.IOUtil.read(IOUtil.java:195),位于sun.nio.ch.SocketChannelImpl.read( org.apache.kafka.common.network的SocketChannelImpl.java:380)org.org.apache.kafka.common.network.NetworkReceive.readFromReadableReadableChannel(NetworkReceive.java:97)的org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) org.apache.kafka.common上的.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)。

我用谷歌搜索并使用了下面提到的JVM参数,但是仍然发生了相同的异常

-XX:MaxDirectMemorySize = 768m

-Xms512m

如何解决此问题?是否需要其他任何javm参数调整?

我的卡夫卡消费者代码是

import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern

class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = …
Run Code Online (Sandbox Code Playgroud)

java multithreading jvm-arguments apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
4704
查看次数