我正在尝试在我的 Spring Boot 项目中使用 spring-kafka 读取来自我的 kafka 的消息。我正在使用@KafkaListener,但问题是我的消费者始终在运行。一旦我从控制台生成一条消息,它就会在我的应用程序中弹出。我想定期进行轮询。我怎样才能实现这个目标?
@Service
public class KafkaReciever {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaReciever.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "test")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
latch.countDown();
}
Run Code Online (Sandbox Code Playgroud)
}
这是我的消费者配置:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, …Run Code Online (Sandbox Code Playgroud) 我在Java中使用Fork联接池进行多任务处理。现在我遇到了一种情况,对于每个任务,我需要点击一个网址,然后等待10分钟,然后再次点击另一个网址以读取数据。现在的问题是,在那10分钟内,我的CPU处于空闲状态,并且没有启动其他任务(比fork联接池中定义的任务更多)。
static ForkJoinPool pool = new ForkJoinPool(10);
public static void main(String[] args){
List<String> list = new ArrayList<>();
for(int i=1; i<=100; i++){
list.add("Str"+i);
}
final Tasker task = new Tasker(list);
pool.invoke(task);
public class Tasker extends RecursiveAction{
private static final long serialVersionUID = 1L;
List<String> myList;
public Tasker(List<String> checkersList) {
super();
this.myList = checkersList;
}
@Override
protected void compute() {
if(myList.size()==1){
System.out.println(myList.get(0) + "start");
//Date start = new Date();
try {
Thread.sleep(10*60*1000);
} catch (Exception e) {
// TODO Auto-generated catch block …Run Code Online (Sandbox Code Playgroud)