我有一个InputStreamand OutputStream(没有套接字)。
我有一个基于流的代码,可以执行一些映射/过滤/分组/处理。
我的主要目标是在超出时终止流maxDuration:
void fillStreamMap(BufferedReader reader) {
final Instant end = Instant.now().plusNanos(TimeUnit.NANOSECONDS.convert(maxDuration));
this.map = reader.lines()
.takeWhile(e -> checkTimeout(end))
.map(this::jsonToBuyerEventInput)
.filter(Objects::nonNull)
.filter(getFilter()::apply)
.limit(super.maxEvent)
.collect(Collectors.groupingBy(BuyerEventInput::getBuyer));
}
boolean checkTimeout(Instant end){
return Instant.now().getEpochSecond() <= end.getEpochSecond();
}
Run Code Online (Sandbox Code Playgroud)
我正在使用takeWhile这是一个非常有用的函数,但它会检查终止条件是否有即将发生的事件。
因此,如果没有发送数据,它不会检查条件,因为该函数是为了将 aPredicate作为参数而构建的。
有什么办法可以实现这个目标吗?