小编Mar*_*lik的帖子

Kafka Stream计算时间窗口未报告零值

我正在使用Kafka流来计算使用跳跃时间窗口在过去3分钟内发生的事件数:

public class ViewCountAggregator {

    void buildStream(KStreamBuilder builder) {      

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
        KStream<String, Long> viewCount = views
            .groupBy((key, value) -> value)
            .count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
            .toStream()
            .map((key, value) -> new KeyValue<>(key.key(), value));

        viewCount.to(stringSerde, longSerde, "streams-view-count-output");        
    }

    public static void main(String[] args) throws Exception {                   
        // some not so important initialization code
        ...  
    }

}
Run Code Online (Sandbox Code Playgroud)

运行使用者并将某些消息推送到输入主题时,随着时间的推移,它会收到以下更新:

single  1
single  1
single  1
five    1
five    4
five    5
five    4 …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
910
查看次数

在C++ 11中释放动态分配的uv_timer_t(libuv)实例

我有一个需要将任务安排到libuv事件循环的函数.我的想法是创建一个0ms超时的计时器.我试过以下代码:

void myFunction() {
    ...
    uv_timer_t* timer = new uv_timer_t();
    uv_timer_init(uv_default_loop(), timer);
    uv_timer_start(timer, [&](uv_timer_t* timer, int status) {
        // Scheduled task
    }, 0, 0);
}
Run Code Online (Sandbox Code Playgroud)

这种方法运行良好但问题是,动态分配的计时器永远不会被释放.我已经尝试在回调中释放计时器,但这导致了分段错误:

void myFunction() {
    ...
    uv_timer_t* timer = new uv_timer_t();
    uv_timer_init(uv_default_loop(), timer);
    uv_timer_start(timer, [&](uv_timer_t* timer, int status) {
        // Scheduled task
        delete timer;
    }, 0, 0);
}
Run Code Online (Sandbox Code Playgroud)

我也尝试过调用uv_timer_stop(timer);uv_unref((uv_handle_t*) timer);在实际内存释放之前,但是分段错误仍然是remian.

c++ memory-leaks event-loop dynamic-allocation c++11

1
推荐指数
1
解决办法
1456
查看次数