小编mme*_*sen的帖子

卡夫卡INVALID_FETCH_SESSION_EPOCH

我们正在将kafka代理设置与使用Spring云流kafka运行的kafka stream应用程序结合使用。尽管看起来运行良好,但在日志中确实出现了以下错误语句:

2019-02-21 22:37:20,253 INFO kafka-coordinator-heartbeat-thread | anomaly-timeline org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=anomaly-timeline-56dc4481-3086-4359-a8e8-d2dae12272a2-StreamThread-1-consumer, groupId=anomaly-timeline] Node 2 was unable to process the fetch request with (sessionId=1290440723, epoch=2089): INVALID_FETCH_SESSION_EPOCH. 
Run Code Online (Sandbox Code Playgroud)

我搜索了互联网,但是关于此错误的信息不多。我猜想这可能与经纪人和使用者之间的时间设置有所不同,但是两台机器的时间服务器设置相同。

知道如何解决吗?

apache-kafka spring-cloud-stream apache-kafka-streams

10
推荐指数
2
解决办法
1万
查看次数

启用一次会导致流由于初始化事务状​​态时超时而关闭

我编写了一个简单的示例来测试连接功能。由于我有时会在结果主题中收到重复的消息,有时会在该主题中丢失消息,因此我在查明问题时认为启用恰好一次语义。然而,在通过以下方式执行此操作时:

props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
Run Code Online (Sandbox Code Playgroud)

我遇到超时,导致我的应用程序中的 kafka 流关闭:

2019-05-02 17:02:32.585  INFO 153056 --- [-StreamThread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-05-02 17:02:32.585  INFO 153056 --- [-StreamThread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-05-02 17:02:32.593  INFO 153056 --- [-StreamThread-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1-0_0-producer, transactionalId=join-test-0_0] ProducerId set to -1 with epoch -1
2019-05-02 17:03:32.599 ERROR 153056 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance: {}

org.apache.kafka.common.errors.TimeoutException: Timeout expired while …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

5
推荐指数
0
解决办法
2152
查看次数

在 kafka 本地状态存储/变更日志中的保留时间

我使用 Kafka 和 Kafka Streams 作为 Spring Cloud Stream 的一部分。在我的 Kafka Streams 应用程序中流动的数据正在按特定时间窗口聚合和具体化:

Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
    oneHour.withLoggingEnabled(topicConfig);
    events
            .map(getStringSensorMeasurementKeyValueKeyValueMapper())
            .groupByKey()
            .windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
            .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                    (oneHour));
Run Code Online (Sandbox Code Playgroud)

按照设计,正在具体化的信息也由变更日志主题支持。

我们的应用程序还有一个 rest 端点,它将像这样查询 statestore:

 ReadOnlyWindowStore<String, Double> windowStore =  queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
 WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);
Run Code Online (Sandbox Code Playgroud)

查看创建的更改日志主题的设置,它显示:

min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1
Run Code Online (Sandbox Code Playgroud)

我认为当地的州商店至少会将信息保留 61 天(~2 个月)。然而,似乎只有最后一天的数据保留在存储中。

什么可能导致数据这么快被删除?

使用解决方案更新 Kafka Streams 2.0.1 版不包含 Materialized.withRetention 方法。对于这个特定版本,我能够使用以下解决我问题的代码设置状态存储的保留时间:

TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
    timeWindows.until(retentionMs);
Run Code Online (Sandbox Code Playgroud)

使我的代码写成:

...

.groupByKey()
        .windowedBy(timeWindows)
        .reduce((aggValue, newValue) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
3290
查看次数