我正在使用单节点Kafka代理(0.10.2)和单节点Zookeeper代理(3.4.9)。我有一个消费服务器(单核和1.5 GB RAM)。每当我运行具有5个或更多线程的进程时,抛出这些异常后,我的使用者的线程就会被杀死
java.lang.OutOfMemoryError:Java堆空间位于org.apache.kafka.common.network.NetworkReceive的java.nio.ByteBuffer.allocate(ByteBuffer.java:335)处的java.nio.HeapByteBuffer。(HeapByteBuffer.java:57) org.apache.kafka.common.network上的.readFromReadableChannel(NetworkReceive.java:93).org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169.net.NetworkReceive.readFrom(NetworkReceive.java:71) )的org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)的org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)。 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)的org.apache.kafka.clients.consumer.internals的common.network.Selector.poll(Selector.java:303)。org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)上的ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)位于org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ HeartbeatThread。运行(AbstractCoordinator.java:887)
kafka-coordinator-heartbeat-thread中未捕获的异常 topic1:java.lang.OutOfMemoryError:直接缓冲存储器位于java.nio.DirectByteBuffer。(DirectByteBuffer.java:123)处的java.nio.Bits.reserveMemory(Bits.java:693)。java.nio.ByteBuffer.allocateDirect(ByteBuffer .java:311),位于sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241),位于sun.nio.ch.IOUtil.read(IOUtil.java:195),位于sun.nio.ch.SocketChannelImpl.read( org.apache.kafka.common.network的SocketChannelImpl.java:380)org.org.apache.kafka.common.network.NetworkReceive.readFromReadableReadableChannel(NetworkReceive.java:97)的org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) org.apache.kafka.common上的.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)。
我用谷歌搜索并使用了下面提到的JVM参数,但是仍然发生了相同的异常
-XX:MaxDirectMemorySize = 768m
-Xms512m
如何解决此问题?是否需要其他任何javm参数调整?
我的卡夫卡消费者代码是
import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern
class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = …Run Code Online (Sandbox Code Playgroud) java multithreading jvm-arguments apache-kafka kafka-consumer-api