我编写了一个流实现,它在文件的行上执行四个简单的缩减(+和<).
起初我执行了四个流,但我决定编写自己的累加器和组合器,以便我可以在一个流中执行所有四个缩减.在小型数据集(10,000,000行)上,运行时间按预期减少到大约1/4,在我的硬件上运行14秒.
fileIn = new BufferedReader(new InputStreamReader(
            new URL(args[0].trim()).openStream()));
final Results results = fileIn.lines()
        .parallel()
        .skip(1)
        .map(User::parse)
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(Results::new, Results::accumulate, Results::combine);
Run Code Online (Sandbox Code Playgroud)
Results::accumulate并且分别Results::combine将用户正确地组合到结果和结果与结果中,并且此实现适用于小数据集.
我也试过使用.reduce(),结果很相似,但我试图.collect()减少短期对象的创建.
问题在于,当我使用具有10亿行的真实大小的数据时,我遇到的问题表明Java 8流无法完成任务.在JConsole中观察堆内存,以大致线性方式爬升到分配的12 GB,然后是OOM.
我的印象是收集器或减速器的性能可以与迭代解决方案相媲美,迭代解决方案应该受CPU和IO的限制而不是内存,因为减少步骤产生的结果不会增长,而是减少!
当我进行堆转储并将其放入jhat时,我看到大约7GB被字符串占用,并且这些字符串可以清楚地看作是输入文件的行.我觉得它们根本不应该在内存中,但是jhat显示了一个非常大的ForkJoin相关结构在内存中累积:
Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :
--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) …Run Code Online (Sandbox Code Playgroud) 我试图找出在 Spring 5 中使用组件的多个副本(通过泛型类型进行区分)的最简洁、最有效的方法。显然我错过了一个概念,因为根据我读到的内容,我觉得这个(下面)应该起作用,但它不起作用,所以我可能误解了泛型类型如何作为限定符工作。
我有这样的安排:
public class Beer {}
Run Code Online (Sandbox Code Playgroud)
public class Coffee {}
Run Code Online (Sandbox Code Playgroud)
@Component
public class Order<T> {
    @Autowired
    Shop<T> shopToSendOrderTo;
    void place(int quantity) {
        log.info("Inside the order: " + this);
        shopToSendOrderTo.sendOrder(quantity);
    }
}
Run Code Online (Sandbox Code Playgroud)
@Component
public class Shop<T> {
    void sendOrder(int quantity) {
        log.info("Inside the shop: " + this);
    }
}
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class Manager implements InitializingBean {
    @Autowired
    Order<Beer> beerOrder;
    @Autowired …Run Code Online (Sandbox Code Playgroud)