我正在阅读有关Java流和发现新事物的内容.我发现的新事物之一是peek()功能.几乎所有我读过的内容都说它应该用来调试你的Streams.
如果我有一个Stream,每个帐户都有一个用户名,密码字段以及login()和loggedIn()方法,该怎么办?
我也有
Consumer<Account> login = account -> account.login();
Run Code Online (Sandbox Code Playgroud)
和
Predicate<Account> loggedIn = account -> account.loggedIn();
Run Code Online (Sandbox Code Playgroud)
为什么会这么糟糕?
List<Account> accounts; //assume it's been setup
List<Account> loggedInAccount =
accounts.stream()
.peek(login)
.filter(loggedIn)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
现在据我所知,这完全符合它的目的.它;
做这样的事情的缺点是什么?有什么理由我不应该继续吗?最后,如果不是这个解决方案呢?
其原始版本使用.filter()方法如下;
.filter(account -> {
account.login();
return account.loggedIn();
})
Run Code Online (Sandbox Code Playgroud) 遇到RuntimeException流处理时,流处理是否应该中止?应该先完成吗?是否应重新抛出异常Stream.close()?异常是否被重新抛出或被包裹?Java Stream和java.util.stream包的JavaDoc 无话可说.
我发现的所有关于Stackoverflow的问题似乎都集中在如何从功能界面中包装已检查的异常以便编译代码.事实上,互联网上的博客文章和类似文章都集中在同一个警告上.这不是我关心的问题.
我根据自己的经验知道,一旦抛出一个顺序流的处理就会中止,RuntimeException并且这个异常会按原样重新抛出.仅当客户端线程抛出异常时,对于并行流才是相同的.
但是,此处的示例代码演示了如果在并行流处理期间由"工作线程"(=与调用终端操作的线程不同的线程)抛出异常,则此异常将永远丢失并且流处理完成.
示例代码将首先IntStream并行运行.然后是"正常" Stream并行.
这个例子将表明,
1)IntStream如果遇到RuntimeException,则中止并行处理没有问题.异常被重新抛出,包装在另一个RuntimeException中.
2)Stream不好玩.实际上,客户端线程永远不会看到抛出的RuntimeException的痕迹.流不仅完成处理; 将处理更多元素而不是limit()指定的元素!
在示例代码中,IntStream使用IntStream.range()生成."正常" Stream没有"范围"的概念,而是由1:s组成,但调用Stream.limit()将流限制为10亿个元素.
这是另一个转折点.生成IntStream的示例代码执行如下操作:
IntStream.range(0, 1_000_000_000).parallel().forEach(..)
Run Code Online (Sandbox Code Playgroud)
将其更改为生成的流,就像代码中的第二个示例一样:
IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)
Run Code Online (Sandbox Code Playgroud)
此IntStream的结果是相同的:异常被包装并重新抛出,处理中止.但是,第二个流现在也将包装并重新抛出异常,而不是处理超出限制的元素!因此:更改第一个流的生成方式会对第二个流的行为产生副作用.对我来说,这很奇怪.
ForkJoinPool.invoke()的 JavaDoc 并ForkJoinTask表示异常被重新抛出,这就是我对并行流的期望.
当处理并行流中的元素时,我遇到了这个"问题" Collection.stream().parallel()(我还没有验证它的行为,Collection.parallelStream()但它应该是相同的).发生的事情是"工作线程"崩溃然后静静地离开,而所有其他线程成功完成了流.我的应用程序使用默认的异常处理程序将异常写入日志文件.但是甚至没有创建这个日志文件.线程和他的例外根本就消失了.由于我需要在捕获运行时异常时立即中止,因此一种替代方法是编写将此异常泄漏给其他工作程序的代码,使其在任何其他线程抛出异常时不愿意继续.当然,这并不能保证流实现只是继续生成尝试完成流的新线程.所以我可能最终不会使用并行流,而是使用线程池/执行器进行"正常"并发编程.
这表明运行时异常丢失的问题不会与使用流Stream.generate()或流生成的流隔离Stream.limit().最重要的是,我很想知道...是预期的行为?
java parallel-processing multithreading runtimeexception java-stream