小编tha*_*man的帖子

如何在 spring-kafka 中配置 kafka 消费者民意调查的频率

我正在尝试在我的 Spring Boot 项目中使用 spring-kafka 读取来自我的 kafka 的消息。我正在使用@KafkaListener,但问题是我的消费者始终在运行。一旦我从控制台生成一条消息,它就会在我的应用程序中弹出。我想定期进行轮询。我怎样才能实现这个目标?

@Service
public class KafkaReciever {

private static final Logger LOGGER =
        LoggerFactory.getLogger(KafkaReciever.class);

private CountDownLatch latch = new CountDownLatch(1);

public CountDownLatch getLatch() {
    return latch;
}

@KafkaListener(topics = "test")
public void receive(String payload) {
    LOGGER.info("received payload='{}'", payload);
    latch.countDown();
}
Run Code Online (Sandbox Code Playgroud)

}

这是我的消费者配置:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    // list of host:port pairs used for establishing the initial connections to the Kafka cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, …
Run Code Online (Sandbox Code Playgroud)

java spring-boot spring-kafka

6
推荐指数
1
解决办法
1万
查看次数

在ForkJoin池中等待(Java)

我在Java中使用Fork联接池进行多任务处理。现在我遇到了一种情况,对于每个任务,我需要点击一个网址,然后等待10分钟,然后再次点击另一个网址以读取数据。现在的问题是,在那10分钟内,我的CPU处于空闲状态,并且没有启动其他任务(比fork联接池中定义的任务更多)。

static ForkJoinPool pool = new ForkJoinPool(10);
public static void main(String[] args){
    List<String> list = new ArrayList<>();
    for(int i=1; i<=100; i++){
        list.add("Str"+i);
    }
    final Tasker task = new Tasker(list);
    pool.invoke(task);

public class Tasker extends RecursiveAction{

    private static final long serialVersionUID = 1L;
    List<String> myList;
    public Tasker(List<String> checkersList) {
        super();
        this.myList = checkersList;
    }
    @Override
    protected void compute() {
        if(myList.size()==1){
            System.out.println(myList.get(0) + "start");
            //Date start = new Date();
            try {

                    Thread.sleep(10*60*1000);

            } catch (Exception e) {
                // TODO Auto-generated catch block …
Run Code Online (Sandbox Code Playgroud)

java cpu multithreading fork-join threadpool

4
推荐指数
1
解决办法
1633
查看次数