我们有以下问题:
我们想听某些Kafka主题并建立它的“历史记录”-因此,对于指定的键,提取一些数据,将其添加到该键的现有列表中(如果不存在,则创建一个新的键),然后将其放入另一个主题,它只有一个分区,并且高度压缩。另一个应用程序可以仅收听该主题并更新其历史记录列表。
我在想它如何适合Kafka流库。我们当然可以使用聚合:
msgReceived.map((key, word) -> new KeyValue<>(key, word))
.groupBy((k,v) -> k, stringSerde, stringSerde)
.aggregate(String::new,
(k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
stringSerde, "summaries2")
.to(stringSerde, stringSerde, "transaction-summary50");
Run Code Online (Sandbox Code Playgroud)
这会创建一个由Kafka支持的本地商店,并将其用作历史记录表。
我担心的是,如果我们决定扩展此类应用程序,则每个正在运行的实例都会创建一个新的主题${applicationId}-${storeName}-changelog(我假设每个应用程序都有不同的主题applicationId)。每个实例开始消耗输入主题,获取不同的键集并构建状态的不同子集。如果Kafka决定重新平衡,则某些实例将开始错过本地存储中的某些历史状态,因为它们会使用一组全新的分区。
问题是,如果我只是为每个正在运行的实例设置相同的applicationId,那么它最终是否应从同一实例(每个正在运行的实例具有相同的本地状态)重播所有数据?
我的配置:
spring.datasource.url=jdbc:h2:mem:db;SCHEMA=public;DB_CLOSE_DELAY=-1;
Run Code Online (Sandbox Code Playgroud)
JPA 生成以下查询:
select itement0_.id as id1_0_, itement0_.brand as brand2_0_, itement0_._item_id as item3_0_, itement0_.product_group as product_4_0_ from public.item itement0_ where itement0_.item_id=? [42102-200]
Run Code Online (Sandbox Code Playgroud)
爆炸了
org.h2.jdbc.JdbcSQLSyntaxErrorException: Table "ITEM" not found; SQL statement
Run Code Online (Sandbox Code Playgroud)
据我所知,问题是 h2 期望双引号中的公共模式,因此以下查询在 h2 控制台中工作正常:
select itement0_.id as id1_0_, itement0_.brand as brand2_0_, itement0_._item_id as item3_0_, itement0_.product_group as product_4_0_ from "public".item itement0_ where itement0_.item_id=? [42102-200]
Run Code Online (Sandbox Code Playgroud)
我的实体:
@Getter
@Entity(name = "item")
@Table(name="item", schema = "public")
@AllArgsConstructor
@NoArgsConstructor(force = true)
public class ItemEntity {
@Id
@GeneratedValue(generator="UUID")
private final UUID id;
@NotNull
@Column(unique …Run Code Online (Sandbox Code Playgroud)