每当用户喜欢我们网站上的某些内容时,我们都会收集事件,我们计划做的是每小时提交内容的汇总收藏夹并更新数据库中的收藏夹总数。
我们正在评估 Kafka Streams。遵循字数统计示例。我们的拓扑很简单,生成到一个主题 A 并读取聚合数据并将其提交到另一个主题 B。然后每小时消耗来自主题 B 的事件并提交到数据库中。
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
return new StreamsConfig(props);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
StreamsBuilder builder = streamBuilder();
KStream<String, String> source = builder.stream(topic);
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
.to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
streams.start(); …
Run Code Online (Sandbox Code Playgroud) 我们想在一定间隔后(例如每5分钟)消耗一次记录.消费者属性是标准的:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(300000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
即使我更改了属性,setPollTimeout
它在定义的间隔(5分钟)后也不会轮询,它会在30秒后继续轮询,这是我的日志:
2018-01-23 18:07:26.875 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 2
2018-01-23 18:07:56.901 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 4
Run Code Online (Sandbox Code Playgroud)
我们尝试使用窗口化聚合构建一个kafka流应用程序,并计划在y间隔后使用窗口x.
我可以在课堂上看到:KafkaMessageListenerContainer
,setConsumerTaskExecutor
设置:
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
Run Code Online (Sandbox Code Playgroud)
但是,我们如何配置此(频率)线程池何时轮询记录.任何帮助赞赏.
假设我有100种类型的对象要创建,为此我暴露了一个工厂来创建这些对象.为了创建这些对象,我有100个if, else if
.
在这种情况下,要创建许多类型的对象(当然,您希望每个类创建一个对象),是否有更好的创建模式?