gok*_*ari 5 java apache-kafka apache-kafka-streams
当我未设置processing.gurantee时,这意味着流将以其默认值(at_least_once)启动,此代码可以成功记录并向相关主题发送加入的消息。
如果在同一流应用程序上启用了fully_once配置,则某些数据将无法成功通过联接。即使有第一个peek块的日志,我也看不到一些第二个peek日志和一些我需要的消息。
我确定kstream和ktable都必须具有不为null的值。双方都定期收到消息。
流配置:
processing.guarantee =精确_一次
plication.factor = 3(这会增加内部主题的复制因子)
Kafka(3名经纪人)详细信息:
问题是,一次加工保证设置到底会导致这种情况吗?
final KStream<String, UserProfile> userProfileStream = builder.stream(TOPIC_USER_PROFILE);
final KTable<String, Device> deviceKTable = builder.table(TOPIC_DEVICE);
userProfileStream
.peek((genericId, userProfile) ->
log.debug("[{}] Processing user profile: {}", openUserId, userProfile)
)
.join(
deviceKTable,
(userProfile, device) -> {
userProfile.setDevice(device);
return userProfile;
},
Joined.with(Serdes.String(), userProfileSerde, deviceSerde)
)
.peek((genericId, userProfile) ->
log.debug("[{}] Updated user profile: {}", genericId, userProfile)
)
.to(TOPIC_UPDATED_USER_PROFILE, Produced.with(Serdes.String(), userProfileSerde));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
85 次 |
| 最近记录: |