相关疑难解决方法(0)

如何使用Kafka Stream手动提交?

有没有办法用Kafka Stream手动提交?

通常使用KafkaConsumer,我做类似下面的事情:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}
Run Code Online (Sandbox Code Playgroud)

我手动调用提交的地方.我没有看到类似的API KStream.

apache-kafka apache-kafka-streams

12
推荐指数
1
解决办法
7160
查看次数

Kafka Streams和RPC:在map()运算符中调用REST服务被认为是反模式?

实现使用参考数据丰富存储在Kafka中的传入事件流的用例的简单方法是通过在map()运算符中调用为每个传入事件提供此参考数据的外部服务REST API.

eventStream.map((key, event) -> /* query the external service here, then return the enriched event */)
Run Code Online (Sandbox Code Playgroud)

另一种方法是将第二个事件流与参考数据一起存储,KTable然后将其存储在一个轻量级嵌入式"数据库"中,然后将主事件流与其连接.

KStream<String, Object> eventStream = builder.stream(..., "event-topic");
KTable<String, Object> referenceDataTable = builder.table(..., "reference-data-topic");
KTable<String, Object> enrichedEventStream = eventStream 
    .leftJoin(referenceDataTable , (event, referenceData) -> /* return the enriched event */)
    .map((key, enrichedEvent) -> new KeyValue<>(/* new key */, enrichedEvent)
    .to("enriched-event-topic", ...);
Run Code Online (Sandbox Code Playgroud)

"天真"的方法可以被视为反模式吗?可以KTable推荐" "方法作为首选方法吗?

Kafka每分钟可以轻松管理数百万条消息.从map()操作员调用的服务也应该能够处理高负载并且也具有高可用性.这些是服务实现的额外要求.但是,如果服务满足这些标准,可以采用"天真"的方法吗?

apache-kafka apache-kafka-streams

6
推荐指数
2
解决办法
1933
查看次数

kafka jdbc接收器连接器中的批量大小

我想通过jdbc接收器批量读取5000条记录,为此我在jdbc接收器配置文件中使用了batch.size:

name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
batch.size=5000
topics=postgres_users

connection.url=jdbc:postgresql://localhost:34771/postgres?user=foo&password=bar
file=test.sink.txt
auto.create=true
Run Code Online (Sandbox Code Playgroud)

但是,batch.size 不起作用,因为当新记录插入源数据库时,记录也会插入数据库。

如何实现批量插入5000个?

connector apache-kafka apache-kafka-connect confluent-platform

3
推荐指数
1
解决办法
7088
查看次数