sim*_*Pod 1 java apache-kafka apache-kafka-streams
我有使用 Kafka Streams 处理的记录(使用处理器 API)。假设该记录有city_id和一些其他字段。
在 Kafka Streams 应用程序中,我想将目标城市的当前温度添加到记录中。
Temperature<->City对存储在例如中。Postgres。
在 Java 应用程序中,我可以使用 JDBC 连接到 Postgres 并进行构建,new HashMap<CityId, Temperature>这样我就可以根据city_id. 就像是tempHM.get(record.city_id)。
有几个问题如何最好地处理它:
最初,我一直在内部执行此操作,AbstractProcessor::init()但这似乎是错误的,因为它是为每个线程初始化的,并且还在重新平衡时重新初始化。
因此,我在使用它构建流拓扑构建器和处理器之前移动了它。在所有处理器实例上仅独立提取一次数据。
这是正确有效的方法吗?它有效,但是...
HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;
// Connect to DB and initialize tempHM here
Topology topology = new Topology();
topology
.addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")
.addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)
.addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;
Run Code Online (Sandbox Code Playgroud)
例如,我想每 15 分钟刷新一次温度数据。我正在考虑使用 Hashmap 容器而不是 Hashmap,这样可以处理它:
abstract class ContextContainer<T> {
T context;
Date lastRefreshAt;
ContextContainer(Date now) {
refresh(now);
}
abstract void refresh(Date now);
abstract Duration getRefreshInterval();
T get() {
return context;
}
boolean isDueToRefresh(Date now) {
return lastRefreshAt == null
|| lastRefreshAt.getTime() + getRefreshInterval().toMillis() < now.getTime();
}
}
final class CityTemperatureContextContainer extends ContextContainer<HashMap> {
CityTemperatureContextContainer(Date now) {
super(now);
}
void refresh(Date now) {
if (!isDueToRefresh(now)) {
return;
}
HashMap context = new HashMap();
// Connect to DB and get data and fill hashmap
lastRefreshAt = now;
this.context = context;
}
Duration getRefreshInterval() {
return Duration.ofMinutes(15);
}
}
Run Code Online (Sandbox Code Playgroud)
这是用 SO 文本区域编写的一个简短概念,可能包含一些语法错误,但我希望要点很清楚
然后将其传递给处理器.addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(cityTemperatureContextContainer), SOURCE)
在处理器中做
public void init(final ProcessorContext context) {
context.schedule(
Duration.ofMinutes(1),
PunctuationType.STREAM_TIME,
(timestamp) -> {
cityTemperatureContextContainer.refresh(new Date(timestamp));
tempHm = cityTemperatureContextContainer.get();
}
);
super.init(context);
}
Run Code Online (Sandbox Code Playgroud)
有没有更好的办法?主要问题是找到合适的概念,然后我就能实现它。不过,关于该主题的资源并不多。
在 Kafka Streams 应用程序中,我想将目标城市的当前温度添加到记录中。
Temperature<->City对存储在例如中。Postgres。在 Java 应用程序中,我可以使用 JDBC 连接到 Postgres 并构建新的,
HashMap<CityId, Temperature>这样我就可以根据city_id. 就像是tempHM.get(record.city_id)。
更好的替代方案是使用 Kafka Connect 将 Postgres 中的数据提取到 Kafka 主题中,使用KTableKafka Streams 将该主题读入您的应用程序中,然后将其KTable与您的其他流(记录流“以及city_id其他一些”)连接起来。字段”)。也就是说,您将执行KStream“KTable加入”操作。
思考:
### Architecture view
DB (here: Postgres) --Kafka Connect--> Kafka --> Kafka Streams Application
### Data view
Postgres Table ----------------------> Topic --> KTable
Run Code Online (Sandbox Code Playgroud)
适用于您的用例的示例连接器是https://www.confluence.io/hub/confluenceinc/kafka-connect-jdbc和https://www.confluence.io/hub/debezium/debezium-connector-postgresql。
上述基于 Kafka Connect 的设置的优点之一是,您不再需要直接从 Java 应用程序(使用 Kafka Streams)与 Postgres DB 进行通信。
另一个优点是,您不需要将上下文数据(您提到的每 15 分钟一次)从数据库“批量刷新”到 Java 应用程序中,因为应用程序将通过以下方式自动实时获取最新的数据库更改: DB->KConnect->Kafka->KStreams-应用程序流程。