commitOffsets是否在高级消费者块上?

Mat*_*son 10 apache-kafka kafka-consumer-api

在Java客户端(http://kafka.apache.org/documentation.html#highlevelconsumerapi)中,是否在高级使用者块上进行commitOffsets,直到成功提交偏移量,或者它是否为"即发即忘"?

Dra*_*kes 6

在成功提交偏移之前,高级使用者上的commitOffsets是否会阻塞?

它看起来像commitOffsets()每个消费者的循环,updatePersistentPath如果它的偏移量已经改变则调用,如果是,则通过它来写入数据zkClient.writeData(path, getBytes(data)).看起来虽然commitOffsets() 所有偏移都被提交之前都会阻塞.

这是commitOffsets()(ref)的源代码:

public void commitOffsets() {
    if (zkClient == null) {
        logger.error("zk client is null. Cannot commit offsets");
        return;
    }
    for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
        ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
        for (PartitionTopicInfo info : e.getValue().values()) {
            final long lastChanged = info.getConsumedOffsetChanged().get();
            if (lastChanged == 0) {
                logger.trace("consume offset not changed");
                continue;
            }
            final long newOffset = info.getConsumedOffset();
            //path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
            final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
            try {
                ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
            } catch (Throwable t) {
                logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
            } finally {
                info.resetComsumedOffsetChanged(lastChanged);
                if (logger.isDebugEnabled()) {
                    logger.debug("Committed [" + path + "] for topic " + info);
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

并为updatePersistentPath(...)(参考):

public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
    try {
        zkClient.writeData(path, getBytes(data));
    } catch (ZkNoNodeException e) {
        createParentPath(zkClient, path);
        try {
            zkClient.createPersistent(path, getBytes(data));
        } catch (ZkNodeExistsException e2) {
            zkClient.writeData(path, getBytes(data));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)