注册流"完成"钩子

Luk*_*der 27 java java-8 java-stream

使用Java 8 StreamAPI,我想注册一个"完成钩子",类似于:

Stream<String> stream = Stream.of("a", "b", "c");

// additional filters / mappings that I don't control
stream.onComplete((Completion c) -> {
    // This is what I'd like to do:
    closeResources();

    // This might also be useful:
    Optional<Throwable> exception = c.exception();
    exception.ifPresent(e -> throw new ExceptionWrapper(e));
});
Run Code Online (Sandbox Code Playgroud)

我为什么要那么做的原因是因为我想包装在一个资源Stream的API客户端,消费,我想的是Stream,一旦它被消耗自动清理资源.如果可能,那么客户可以致电:

Collected collectedInOneGo =
Utility.something()
       .niceLookingSQLDSL()
       .moreDSLFeatures()
       .stream()
       .filter(a -> true)
       .map(c -> c)
       .collect(collector);
Run Code Online (Sandbox Code Playgroud)

而不是目前所需要的:

try (Stream<X> meh = Utility.something()
                            .niceLookingSQLDSL()
                            .moreDSLFeatures()
                            .stream()) {

    Collected collectedWithUglySyntacticDissonance =
    meh.filter(a -> true)
       .map(c -> c)
       .collect(collector);
}
Run Code Online (Sandbox Code Playgroud)

理想情况下,我想进入java.util.stream.ReferencePipeline各种方法,例如:

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    try {

        // Existing loop
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

    // These would be nice:
    catch (Throwable t) {
        completion.onFailure(t);
    }
    finally {
        completion.onSuccess();
    }
}
Run Code Online (Sandbox Code Playgroud)

使用现有的JDK 8 API有一种简单的方法吗?

Hol*_*ger 12

最简单的解决方案是将流包装在另一个流中并将其平面映射到自身:

// example stream
Stream<String> original=Stream.of("bla").onClose(()->System.out.println("close action"));

// this is the trick
Stream<String> autoClosed=Stream.of(original).flatMap(Function.identity());

//example op
int sum=autoClosed.mapToInt(String::length).sum();
System.out.println(sum);
Run Code Online (Sandbox Code Playgroud)

它起作用的原因在于flatMap操作:

每个映射的流在其内容放入此流后关闭.

但是当前的实现并不像使用时那样懒惰flatMap.这已在Java 10中修复.


我的建议是在try(…)需要关闭返回的流时继续使用标准解决方案和文档.毕竟,在终端操作之后关闭资源的流是不安全的,因为没有保证客户端实际上将调用终端操作.改变它的想法并放弃流即时是一种有效的用途,而close()当文档指定它是必需的时,不调用该方法则不是.

  • 是的,因为实际用例无论如何都会通过`StreamSupport`,使用惰性工厂方法是一种选择. (4认同)

Tag*_*eev 11

拦截终端操作除了flatMap基于解决方案(由@Holger提出)的任何解决方案都会对以下代码脆弱:

Stream<String> stream = getAutoCloseableStream();
if(stream.iterator().hasNext()) {
    // do something if stream is non-empty
}
Run Code Online (Sandbox Code Playgroud)

这种用法在规范中绝对合法.不要忘记它iterator()并且spliterator()是终端流操作,但在执行后您仍然需要访问流源.放弃IteratorSpliterator处于任何状态都是完全有效的,所以你无法知道它是否会被进一步使用.

您可以考虑advicing用户不要使用iterator()spliterator(),但对于这个代码?

Stream<String> stream = getAutoCloseableStream();
Stream.concat(stream, Stream.of("xyz")).findFirst();
Run Code Online (Sandbox Code Playgroud)

这内部spliterator().tryAdvance()用于第一个流,然后放弃它(尽管在close()显式调用结果流时关闭).您需要让用户不要使用Stream.concat.而据我了解内部库中的您正在使用iterator()/ spliterator()漂亮的时候,所以你需要重新审视这些地方可能出现的问题.而且,当然还有很多其他库也使用iterator()/ spliterator()并且可能在此之后短路:所有这些库都会与您的功能不兼容.

为什么flatMap基于解决方案在这里工作?因为在的第一个呼叫hasNext()或者tryAdvance()它转储整个流内容到中间缓冲器和关闭所述原始流源.因此,根据流的大小,您可能会浪费很多中间内存,甚至可能OutOfMemoryError.

您也可以考虑将PhantomReferences 保留在Stream对象上并监视对象ReferenceQueue.在这种情况下,完成将由垃圾收集器触发(这也有一些缺点).

总之,我的建议是继续尝试资源.

  • `flatMap`不一定非常渴望实现这种行为,实现知道它何时从一个子流切换到下一个子流,因此,此时可以关闭旧子流并且最多有一个待处理的子流.在"Stream"实现中进行内部清理操作并不奇怪."flatMap"提供这种保证的原因在于,提供子流的代码不可能确保正确的清理(类似于OP问题的问题). (4认同)
  • 我并没有建议添加关闭外部流的要求,这确实为时已晚,只是为了使`flatMap`变得懒惰,声称这对大多数终端操作都是可能的.但是,如果流用户调用`iterator()`或`spliterator()`,那么除了回归渴望兼容行为之外别无选择. (3认同)
  • @Holger,是的,不一定如此。我正在描述当前的实现。尽管可能现在为时已晚,但由于某些人可能已经依赖于当前行为,因此对此进行更改。如果外部流不持有要关闭的资源,但内部流却持有呢?当前,您不必使用try-with-resources。如果实现发生更改,则可能会导致资源泄漏。 (2认同)

Lou*_*man 6

Java 8已经开创了需要关闭的流如何运行的先例.在他们的Javadoc中,它提到:

Streams有一个BaseStream.close()方法并实现AutoCloseable,但几乎所有的流实例实际上都不需要在使用后关闭.通常,只有源为IO通道的流(例如Files.lines(Path,Charset)返回的流)才需要关闭.大多数流都由集合,数组或生成函数支持,不需要特殊的资源管理.(如果流确实需要关闭,则可以在try-with-resources语句中将其声明为资源.)

所以Java 8的建议是在try-with-resources中打开这些流.一旦你这样做,Stream 提供了一种方法来添加一个关闭钩子,几乎完全如你所描述的那样:onClose(Runnable)它接受一个lambda告诉它该做什么并返回一个Stream也会在关闭时执行该操作.

这就是API设计和文档建议你做的事情的方式.

  • @LukasEder听路易; 他做得对.你提出的问题不是出于"未被注意"的情况; 事实上,它被广泛讨论.规则很简单:获取资源的人会释放资源.期待流可以关闭资源,当资源完成时(a)不可靠和(b)错误.如果要关闭它,请在TWR块中获取它. (8认同)
  • 我可以理解这个观点,但是JDK似乎已经非常刻意地决定了这种行为 - 他们清楚地考虑过这种情况,这就是他们提出的API - 而API看起来很适合这种方法.我不认为JDK如何处理这些用例是不明确的.你写的是由你自己决定的.我不认为这里有足够的理由忽略JDK似乎已经决定采用的方法来处理这个用例. (4认同)
  • @LukasEder或许,只是没有问题,只有一个选择"我们愿意容忍多少摩擦,以换取将这种抽象的使用扩展到其最佳位置之外." 如果我们说"根本不能关闭流,那么基于IO的流就会丢失",你会感到不快.但是将一个成语延伸到其最佳位置之后会涉及妥协.(当然,在扩展的范围和如何妥协方面存在差异,但最终我认为真正的问题是(隐含的)假设必须犯错误.) (4认同)
  • @LukasEder我所说的几乎就是JDK开创了这样做的先例,你应该坚持这一点.坦率地说,他们比......更聪明. (3认同)
  • @LukasEder,就像AutoCloseable一样,即使在Java-7中也有一堆不必关闭的类(如`ByteArrayOutputStream`或`StringReader`).它在他们的文档中明确指定(例如"关闭`ByteArrayOutputStream`没有效果").所以这不是API变更,而是澄清. (3认同)