有没有办法用Kafka Stream手动提交?
通常使用KafkaConsumer,我做类似下面的事情:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
// process records
}
consumer.commitAsync();
}
Run Code Online (Sandbox Code Playgroud)
我手动调用提交的地方.我没有看到类似的API KStream.
实现使用参考数据丰富存储在Kafka中的传入事件流的用例的简单方法是通过在map()运算符中调用为每个传入事件提供此参考数据的外部服务REST API.
eventStream.map((key, event) -> /* query the external service here, then return the enriched event */)
Run Code Online (Sandbox Code Playgroud)
另一种方法是将第二个事件流与参考数据一起存储,KTable然后将其存储在一个轻量级嵌入式"数据库"中,然后将主事件流与其连接.
KStream<String, Object> eventStream = builder.stream(..., "event-topic");
KTable<String, Object> referenceDataTable = builder.table(..., "reference-data-topic");
KTable<String, Object> enrichedEventStream = eventStream
.leftJoin(referenceDataTable , (event, referenceData) -> /* return the enriched event */)
.map((key, enrichedEvent) -> new KeyValue<>(/* new key */, enrichedEvent)
.to("enriched-event-topic", ...);
Run Code Online (Sandbox Code Playgroud)
"天真"的方法可以被视为反模式吗?可以KTable推荐" "方法作为首选方法吗?
Kafka每分钟可以轻松管理数百万条消息.从map()操作员调用的服务也应该能够处理高负载并且也具有高可用性.这些是服务实现的额外要求.但是,如果服务满足这些标准,可以采用"天真"的方法吗?
我想通过jdbc接收器批量读取5000条记录,为此我在jdbc接收器配置文件中使用了batch.size:
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
batch.size=5000
topics=postgres_users
connection.url=jdbc:postgresql://localhost:34771/postgres?user=foo&password=bar
file=test.sink.txt
auto.create=true
Run Code Online (Sandbox Code Playgroud)
但是,batch.size 不起作用,因为当新记录插入源数据库时,记录也会插入数据库。
如何实现批量插入5000个?
connector apache-kafka apache-kafka-connect confluent-platform