我有一个与 devise gem 集成的用户表。该表具有某些字段,如 current_sign_in_ip、last_sign_in_ip、current_sign_in_at 等。Devise 在用户登录时修改这些字段。我希望将这些字段移动到另一个表并通过 devise 或其他方式填充它们。有什么方法可以实现这个目标吗?
我有一个使用以下属性定义的 Kafka 消费者:
session.timeout.ms = 60000
heartbeat.interval.ms = 6000
Run Code Online (Sandbox Code Playgroud)
我们注意到大约 2000 条消息的滞后,并看到消费者多次使用相同的消息(通过我们的应用程序日志)。另外,注意到一些消息需要大约 10 秒才能完全处理。我们怀疑消费者没有正确提交偏移量(或重复提交相同的旧偏移量),因此消费者接收到了相同的消息。
为了解决这个问题,我们引入了更多的属性:
auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go
And, we set the concurrency to 1.
Run Code Online (Sandbox Code Playgroud)
这解决了我们的问题。滞后开始减少并最终变为 0。
但是,我仍然不清楚为什么会首先出现问题。据我了解,默认情况下:
enable.auto.commit = true
auto.commit.interval.ms=5000
Run Code Online (Sandbox Code Playgroud)
因此,理想情况下,消费者应该每 5 秒提交一次。如果在此时间范围内未完全处理消息,会发生什么情况?消费者正在提交什么偏移量?问题是否是由于轮询记录大小过大(默认为 500)导致的
另外,关于 poll() 方法,我读到:
poll() 调用在后台以 set auto.commit.interval.ms 发出。
那么,最初如果 poll() 更早地每 5 秒发生一次(默认为 auto.commit.interval),为什么它不提交最新的偏移量?因为消费者还没有完成处理吗?然后,它应该在接下来的第 5 秒提交该偏移量。
有人可以回答这些查询并解释为什么会出现原始问题吗?
spring apache-kafka spring-boot kafka-consumer-api spring-kafka