小编kyl*_*kyl的帖子

消费者陷入重新加入

我已经阅读了其他主题,并且通过使用新的组ID解决了这个问题,但是我想了解是什么原因引起的。

我有一个包含16个分区的主题,我已将session.timeout.ms = 30000设置为max.poll.interval.ms = 30000000。

我运行了我的程序,然后按Ctrl + C组合键,因此无法正常关闭。在我猜了16次之后,我陷入了这个重新加入的问题。session.timeout.ms是心跳超时,因此30秒后它应该使我的用户正常运行,而我的分区应该“释放”对吗?还是只听我的max.poll.interval.ms?

编辑:我仍然间歇性地收到此错误,并且发生这种情况时,我必须重新启动所有使用者。即使我的消费者运行良好,然后他们开始陷入重新加入的困境(没有添加/移除消费者),也会发生这种情况。这是一个错误日志,来自当我尝试将新使用者卡在该状态时与它连接之后:

https://pastebin.com/AXJeSHkp

2017-06-29 17:28:16,215 DEBUG [AbstractCoordinator] - [scheduler-1] - Sending JoinGroup ((type: JoinGroupRequest, groupId=ingestion-matching-kafka-consumer-group-dev1, sessionTimeout=30000, rebalanceTimeout=43200000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@b45e5583)) to coordinator kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 (id: 2147483644 rack: null)

2017-06-29 17:37:21,261 DEBUG [NetworkClient] - [scheduler-1] - Node 2147483644 disconnected.
2017-06-29 17:37:21,263 DEBUG [ConsumerNetworkClient] - [scheduler-1] - Cancelled JOIN_GROUP request {api_key=11,api_version=1,correlation_id=19,client_id=ingestion-matching-kafka-consumer-dev1} with correlation id 19 due to node 2147483644 being disconnected
Run Code Online (Sandbox Code Playgroud)

这些是我认为相关的第一个和最后一个消息。这是我设置的相关超时:

session.timeout.ms=30000
max.poll.interval.ms=43200000    
request.timeout.ms=43205000 # the docs said to keep this higher than max.poll.interval.ms …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api message-hub

6
推荐指数
2
解决办法
3423
查看次数

什么是 ParallelStream 队列行为?

我正在使用parallelStream 并行上传一些文件,有些是大文件,有些是小文件。我注意到并非所有工人都被使用。

一开始一切都运行良好,所有线程都被使用(我将并行度选项设置为 16)。然后在某一时刻(一旦到达更大的文件),它只使用一个线程

简化代码:

files.parallelStream().forEach((file) -> {
    try (FileInputStream fileInputStream = new FileInputStream(file)) {
                IDocumentStorageAdaptor uploader = null;

                try {
                    logger.debug("Adaptors before taking: " + uploaderPool.size());
                    uploader = uploaderPool.take();
                    logger.debug("Took an adaptor!");
                    logger.debug("Adaptors after taking: " + uploaderPool.size());
                    uploader.addNewFile(file);
                } finally {
                    if (uploader != null) {
                        logger.debug("Adding one back!");
                        uploaderPool.put(uploader);
                        logger.debug("Adaptors after putting: " + uploaderPool.size());
                    }
                }
            } catch (InterruptedException | IOException e) {
                throw new UploadException(e);
            }
});
Run Code Online (Sandbox Code Playgroud)

uploaderPool 是一个 ArrayBlockingQueue。日志:

[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: …
Run Code Online (Sandbox Code Playgroud)

java multithreading forkjoinpool

2
推荐指数
1
解决办法
774
查看次数