我正在使用单节点Kafka集群(版本0.10.2)和单节点zookeeper(3.4.9).我每个主题有大约310个主题为Avg 100分区的主题.在我运行命令以检查组中的使用者状态时,在Kafka代理上
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group
大多数情况下它会抛出一个错误:
Error: Executing consumer group command failed due to Request GROUP_COORDINATOR failed on brokers List(localhost:9092 (id: -1 rack: null))
Run Code Online (Sandbox Code Playgroud)
它剩下的时间显示正确的结果.
我用谷歌搜索它但无法找到此错误的确切结果.有人告诉我如何解决此错误?任何帮助,将不胜感激.
我正在使用单节点 Kafka(v 0.10.2)和单节点zookeeper(v 3.4.8),我的controller.log 文件充满了这个异常
java.io.IOException: Connection to 1 was disconnected before the response was read
at kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3(NetworkClientBlockingOps.scala:114)
at kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3$adapted(NetworkClientBlockingOps.scala:112)
at scala.Option.foreach(Option.scala:257)
at kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$1(NetworkClientBlockingOps.scala:112)
at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
at kafka.utils.NetworkClientBlockingOps$.pollContinuously$extension(NetworkClientBlockingOps.scala:142)
at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:192)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Run Code Online (Sandbox Code Playgroud)
我用谷歌搜索了这个异常,但无法找到这个异常的根本原因。有人可以告诉我为什么会发生这个错误以及如何防止它吗?
我在我的生产服务器中使用 Kafka0.10.0和 zookeeper 3.4.6。我有 20 个主题,每个主题有大约 50 个分区。我总共有 100 个消费者,每个消费者都订阅了不同的主题和分区。所有消费者都具有相同的 groupId。那么,如果为特定主题添加或删除消费者,那么附加到不同主题的消费者也会经历重新平衡吗?
我的消费者代码是:
public static void main(String[] args) {
String groupId = "prod"
String topicRegex = args[0]
String consumerTimeOut = "10000"
int n_threads = 1
if (args && args.size() > 1) {
ConfigLoader.init(args[1])
}
else {
ConfigLoader.init('development')
}
if(args && args.size() > 2 && args[2].isInteger()){
n_threads = (args[2]).toInteger()
}
ExecutorService executor = Executors.newFixedThreadPool(n_threads)
addShutdownHook(executor)
String zooKeeper = ConfigLoader.conf.zookeeper.hostName
List<Runnable> taskList = []
for(int i = 0; i < …Run Code Online (Sandbox Code Playgroud) 我正在使用单节点Kafka代理(0.10.2)和单节点Zookeeper代理(3.4.9)。我有一个消费服务器(单核和1.5 GB RAM)。每当我运行具有5个或更多线程的进程时,抛出这些异常后,我的使用者的线程就会被杀死
java.lang.OutOfMemoryError:Java堆空间位于org.apache.kafka.common.network.NetworkReceive的java.nio.ByteBuffer.allocate(ByteBuffer.java:335)处的java.nio.HeapByteBuffer。(HeapByteBuffer.java:57) org.apache.kafka.common.network上的.readFromReadableChannel(NetworkReceive.java:93).org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169.net.NetworkReceive.readFrom(NetworkReceive.java:71) )的org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)的org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)。 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)的org.apache.kafka.clients.consumer.internals的common.network.Selector.poll(Selector.java:303)。org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)上的ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)位于org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ HeartbeatThread。运行(AbstractCoordinator.java:887)
kafka-coordinator-heartbeat-thread中未捕获的异常 topic1:java.lang.OutOfMemoryError:直接缓冲存储器位于java.nio.DirectByteBuffer。(DirectByteBuffer.java:123)处的java.nio.Bits.reserveMemory(Bits.java:693)。java.nio.ByteBuffer.allocateDirect(ByteBuffer .java:311),位于sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241),位于sun.nio.ch.IOUtil.read(IOUtil.java:195),位于sun.nio.ch.SocketChannelImpl.read( org.apache.kafka.common.network的SocketChannelImpl.java:380)org.org.apache.kafka.common.network.NetworkReceive.readFromReadableReadableChannel(NetworkReceive.java:97)的org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) org.apache.kafka.common上的.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)。
我用谷歌搜索并使用了下面提到的JVM参数,但是仍然发生了相同的异常
-XX:MaxDirectMemorySize = 768m
-Xms512m
如何解决此问题?是否需要其他任何javm参数调整?
我的卡夫卡消费者代码是
import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern
class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = …Run Code Online (Sandbox Code Playgroud) java multithreading jvm-arguments apache-kafka kafka-consumer-api
我是REST APIS的新手,我正在尝试构建本地内存缓存,并使用java和spring框架编写REST APIS以获取和检索数据,以下是我要实现的详细描述:
- Building the REST API to store key-value in local memory cache
- API must have 2 endpoints following specification below
• /cache/add (This must store unique key only (existing key must be ignored), This will return true if the element was successfully added )
•/cache/take (This method retrieves and removes the most recently added element from the cache and waits if necessary until an element becomes available)
Run Code Online (Sandbox Code Playgroud)
我不知道如何实现/ cache / take方法。任何建议将不胜感激。