我正在使用Kafka流来计算使用跳跃时间窗口在过去3分钟内发生的事件数:
public class ViewCountAggregator {
void buildStream(KStreamBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
KStream<String, Long> viewCount = views
.groupBy((key, value) -> value)
.count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value));
viewCount.to(stringSerde, longSerde, "streams-view-count-output");
}
public static void main(String[] args) throws Exception {
// some not so important initialization code
...
}
}
Run Code Online (Sandbox Code Playgroud)
运行使用者并将某些消息推送到输入主题时,随着时间的推移,它会收到以下更新:
single 1
single 1
single 1
five 1
five 4
five 5
five 4 …Run Code Online (Sandbox Code Playgroud) 我有一个需要将任务安排到libuv事件循环的函数.我的想法是创建一个0ms超时的计时器.我试过以下代码:
void myFunction() {
...
uv_timer_t* timer = new uv_timer_t();
uv_timer_init(uv_default_loop(), timer);
uv_timer_start(timer, [&](uv_timer_t* timer, int status) {
// Scheduled task
}, 0, 0);
}
Run Code Online (Sandbox Code Playgroud)
这种方法运行良好但问题是,动态分配的计时器永远不会被释放.我已经尝试在回调中释放计时器,但这导致了分段错误:
void myFunction() {
...
uv_timer_t* timer = new uv_timer_t();
uv_timer_init(uv_default_loop(), timer);
uv_timer_start(timer, [&](uv_timer_t* timer, int status) {
// Scheduled task
delete timer;
}, 0, 0);
}
Run Code Online (Sandbox Code Playgroud)
我也尝试过调用uv_timer_stop(timer);并uv_unref((uv_handle_t*) timer);在实际内存释放之前,但是分段错误仍然是remian.