我有很多很多FETCH_SESSION_ID_NOT_FOUND
INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=2] Node 1 was unable to process the fetch request with (sessionId=1229568311, epoch=511): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=5] Node 1 was unable to process the fetch request with (sessionId=136816338, epoch=504): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
INFO [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=2] Node 0 was unable to process the fetch request with (sessionId=311282207, epoch=569): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
...
Run Code Online (Sandbox Code Playgroud)
我阅读了Kafka:连续获取 FETCH_SESSION_ID_NOT_FOUND和如何检查 Kafka 集群中使用的增量获取会话缓存槽的实际数量?
目前,我们刚刚从Burrow获得了有关滞后的指标。
我的问题:
1、谁能解释一下为什么我有这么多FETCH_SESSION_ID_NOT_FOUND?这是什么意思?我之前没有得到它们。
有些消费者发送了太多请求?还是领导人总是连任?
我不知道。有人可以给我更多细节吗?
2、如果是因为某些消费者发送的请求过多,如何识别这些消费者?
谢谢
我计划使用Debezium来制作活动,Kafka并且我需要强有力的交付和订购保证。通过在生产者配置中使用enable.idempotence=true参数,我可以获得这些保证。
我的问题是:
根据 Kafka 文档;
新的 Java Consumer 现在支持来自后台线程的心跳。有一个新的配置 max.poll.interval.ms 控制消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟)。配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms 因为这是消费者重新平衡时 JoinGroup 请求可以在服务器上阻塞的最长时间,因此我们更改了其默认值到 5 分钟以上。
但我无法理解使 max.poll.interval.ms 大于 request.timeout.ms 的可能结果。
任何人都可以解释这部分文件:
因为这是消费者重新平衡时 JoinGroup 请求可以在服务器上阻塞的最长时间
现在我在我的项目中使用这些消费者参数:
kafka.consumer.session.timeout.ms=30000
kafka.consumer.heartbeat.interval.ms=10000
kafka.consumer.request.timeout.ms=31000
kafka.consumer.max.poll.interval.ms=259200000
kafka.consumer.max.partition.fetch.bytes=10242880
Run Code Online (Sandbox Code Playgroud)
这些用法会产生什么负面影响?
注意:我的 kafka-clients 版本是 1.1.0
从 Kafka 2.0.0 版开始,此规则不再有效。来自 Kafka 文档 2.0.0 中的显着变化:
同样作为 KIP-266 的一部分,request.timeout.ms 的默认值已更改为 30 秒。先前的值略高于 5 分钟,以考虑重新平衡所需的最长时间。现在我们将重新平衡中的 JoinGroup 请求视为一种特殊情况,并使用从 max.poll.interval.ms 派生的值作为请求超时。所有其他请求类型使用 request.timeout.ms 定义的超时
有时,Hibernate 在持久操作期间似乎随机执行这样的查询:
select currval('MY_TABLE_NAME_id_seq');
Run Code Online (Sandbox Code Playgroud)
实体:
@Entity
@Table(name = "MY_TABLE_NAME")
public class MyEntity {
@Id
@Column(name = "ID", unique = true, nullable = false)
@GeneratedValue(strategy=GenerationType.IDENTITY)
private Long id;
}
Run Code Online (Sandbox Code Playgroud)
代码:
@Transactional
public void persistMyEntity(String name) {
MyEntity entity= new MyEntity (name);
sessionFactory.getCurrentSession().persist(entity);
}
Run Code Online (Sandbox Code Playgroud)
生成的sql:
insert into MY_TABLE_NAME(name) values ('xyz');
select currval('MY_TABLE_NAME_id_seq');
Run Code Online (Sandbox Code Playgroud)
但通常select currval不会被执行。对此有什么解释吗?
顺便说一句,我的问题与此非常相似,但问题中的解决方案对我不起作用。
笔记:
My_TABLE_NAME ddl sql:
CREATE TABLE my_table_name (
id bigserial NOT NULL,
name character varying(256) NOT NULL,
CONSTRAINT my_table_name_id PRIMARY KEY (id)
);
Run Code Online (Sandbox Code Playgroud)
休眠特性: …