小编Abh*_*nyu的帖子

错误:由于请求GROUP_COORDINATOR在代理列表上失败,执行使用者组命令失败(localhost:9092(id:-1 rack:null))

我正在使用单节点Kafka集群(版本0.10.2)和单节点zookeeper(3.4.9).我每个主题有大约310个主题为Avg 100分区的主题.在我运行命令以检查组中的使用者状态时,在Kafka代理上

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group

大多数情况下它会抛出一个错误:

Error: Executing consumer group command failed due to Request GROUP_COORDINATOR failed on brokers List(localhost:9092 (id: -1 rack: null))
Run Code Online (Sandbox Code Playgroud)

它剩下的时间显示正确的结果.

我用谷歌搜索它但无法找到此错误的确切结果.有人告诉我如何解决此错误?任何帮助,将不胜感激.

apache-kafka

7
推荐指数
1
解决办法
9725
查看次数

contoller.logs 中的 Kafka 连接错误

我正在使用单节点 Kafka(v 0.10.2)和单节点zookeeper(v 3.4.8),我的controller.log 文件充满了这个异常

java.io.IOException: Connection to 1 was disconnected before the response was read
    at kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3(NetworkClientBlockingOps.scala:114)
    at kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3$adapted(NetworkClientBlockingOps.scala:112)
    at scala.Option.foreach(Option.scala:257)
    at kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$1(NetworkClientBlockingOps.scala:112)
    at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
    at kafka.utils.NetworkClientBlockingOps$.pollContinuously$extension(NetworkClientBlockingOps.scala:142)
    at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:192)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Run Code Online (Sandbox Code Playgroud)

我用谷歌搜索了这个异常,但无法找到这个异常的根本原因。有人可以告诉我为什么会发生这个错误以及如何防止它吗?

apache-kafka

5
推荐指数
1
解决办法
6808
查看次数

Kafka 消费者组 ID 和消费者重新平衡问题

我在我的生产服务器中使用 Kafka0.10.0和 zookeeper 3.4.6。我有 20 个主题,每个主题有大约 50 个分区。我总共有 100 个消费者,每个消费者都订阅了不同的主题和分区。所有消费者都具有相同的 groupId。那么,如果为特定主题添加或删除消费者,那么附加到不同主题的消费者也会经历重新平衡吗?

我的消费者代码是:

public static void main(String[] args) {
        String groupId = "prod"
        String topicRegex = args[0]
        String consumerTimeOut = "10000"
        int n_threads = 1
        if (args && args.size() > 1) {
            ConfigLoader.init(args[1])
        }
        else {
            ConfigLoader.init('development')
        }
        if(args && args.size() > 2 && args[2].isInteger()){
            n_threads = (args[2]).toInteger()
        }

        ExecutorService executor = Executors.newFixedThreadPool(n_threads)
        addShutdownHook(executor)
        String zooKeeper = ConfigLoader.conf.zookeeper.hostName
        List<Runnable> taskList = []
        for(int i = 0; i < …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api apache-kafka-connect

3
推荐指数
1
解决办法
1340
查看次数

卡夫卡消费者抛出java.lang.OutOfMemoryError:直接缓冲内存

我正在使用单节点Kafka代理(0.10.2)和单节点Zookeeper代理(3.4.9)。我有一个消费服务器(单核和1.5 GB RAM)。每当我运行具有5个或更多线程的进程时,抛出这些异常后,我的使用者的线程就会被杀死

  1. 例外1

java.lang.OutOfMemoryError:Java堆空间位于org.apache.kafka.common.network.NetworkReceive的java.nio.ByteBuffer.allocate(ByteBuffer.java:335)处的java.nio.HeapByteBuffer。(HeapByteBuffer.java:57) org.apache.kafka.common.network上的.readFromReadableChannel(NetworkReceive.java:93).org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169.net.NetworkReceive.readFrom(NetworkReceive.java:71) )的org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)的org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)。 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)的org.apache.kafka.clients.consumer.internals的common.network.Selector.poll(Selector.java:303)。org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)上的ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)位于org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ HeartbeatThread。运行(AbstractCoordinator.java:887)

  1. 例外2

kafka-coordinator-heartbeat-thread中未捕获的异常 topic1:java.lang.OutOfMemoryError:直接缓冲存储器位于java.nio.DirectByteBuffer。(DirectByteBuffer.java:123)处的java.nio.Bits.reserveMemory(Bits.java:693)。java.nio.ByteBuffer.allocateDirect(ByteBuffer .java:311),位于sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241),位于sun.nio.ch.IOUtil.read(IOUtil.java:195),位于sun.nio.ch.SocketChannelImpl.read( org.apache.kafka.common.network的SocketChannelImpl.java:380)org.org.apache.kafka.common.network.NetworkReceive.readFromReadableReadableChannel(NetworkReceive.java:97)的org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) org.apache.kafka.common上的.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)。

我用谷歌搜索并使用了下面提到的JVM参数,但是仍然发生了相同的异常

-XX:MaxDirectMemorySize = 768m

-Xms512m

如何解决此问题?是否需要其他任何javm参数调整?

我的卡夫卡消费者代码是

import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern

class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = …
Run Code Online (Sandbox Code Playgroud)

java multithreading jvm-arguments apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
4704
查看次数

等待任务完成的HTTP请求

我是REST APIS的新手,我正在尝试构建本地内存缓存,并使用java和spring框架编写REST APIS以获取和检索数据,以下是我要实现的详细描述:

- Building the REST API to store key-value in local memory cache
- API must have 2 endpoints following specification below
     • /cache/add (This must store unique key only (existing key must be ignored), This will return true if the element was successfully added )
     •/cache/take (This method retrieves and removes the most recently added element from the cache and waits if necessary until an element becomes available)
Run Code Online (Sandbox Code Playgroud)

我不知道如何实现/ cache / take方法。任何建议将不胜感激。

java spring spring-boot

0
推荐指数
1
解决办法
45
查看次数