Java流图中可重用的单实例包装器/对象

tso*_*akp 9 java java-8 java-stream

似乎这个问题应该已经有了答案,但我找不到重复的答案。

无论如何,我想知道社区对这样的Stream.map用例有何看法?

Wrapper wrapper = new Wrapper();
list.stream()
    .map( s -> {
        wrapper.setSource(s);
        return wrapper;
    } )
    .forEach( w -> processWrapper(w) );

public static class Source {
    private final String name;

    public Source(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

public static class Wrapper {
    private Source source = null;

    public void setSource(Source source) {
        this.source = source;
    }

    public String getName() {
        return source.getName();
    }
}

public void processWrapper(Wrapper wrapper) {
}
Run Code Online (Sandbox Code Playgroud)

我不是这种用法的忠实拥护者,map但是在处理大型流时,它可能有助于提高性能,并避免Wrapper为每个流创建不必要的内容Source

这肯定有其局限性,例如对并行流和终端操作几乎没有用collect

更新- 问题不是关于“怎么做”,而是“我可以这样吗”。例如,我可以有一个仅适用于Wrapper的代码,我想在其中调用它,forEach但又希望避免为每个Source元素创建一个新实例。

基准结果

使用可重复使用的包装器后,展示了大约8倍的改进-

基准(N)模式Cnt得分误差单位

BenchmarkTest.noReuse 10000000 avgt 5 870.253 ±122.495 ms / op

BenchmarkTest.withReuse 10000000 avgt 5 113.694 ± 2.528 ms / op

基准代码-

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class BenchmarkTest {

    @Param({"10000000"})
    private int N;

    private List<Source> data;

    public static void main(String[] args) throws Exception {
        Options opt = new OptionsBuilder()
            .include(BenchmarkTest.class.getSimpleName())
            .forks(1)
            .build();
        new Runner(opt).run();
    }

    @Setup
    public void setup() {
        data = createData();
    }

    @Benchmark
    public void noReuse(Blackhole bh) {
        data.stream()
            .map( s -> new Wrapper1( s.getName() ) )
            .forEach( t -> processTarget(bh, t) );
    }

    @Benchmark
    public void withReuse(Blackhole bh) {
        Wrapper2 wrapper = new Wrapper2();
        data.stream()
            .map( s -> { wrapper.setSource(s); return wrapper; } )
            .forEach( w -> processTarget(bh, w) );
    }

    public void processTarget(Blackhole bh, Wrapper t) {
        bh.consume(t);
    }

    private List<Source> createData() {
        List<Source> data = new ArrayList<>();
        for (int i = 0; i < N; i++) {
            data.add( new Source("Number : " + i) );
        }
        return data;
    }

    public static class Source {
        private final String name;

        public Source(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public interface Wrapper {
        public String getName();
    }

    public static class Wrapper1 implements Wrapper {
        private final String name;

        public Wrapper1(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public static class Wrapper2 implements Wrapper {
        private Source source = null;

        public void setSource(Source source) {
            this.source = source;
        }

        public String getName() {
            return source.getName();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

完整基准测试报告-

# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.noReuse
# Parameters: (N = 10000000)

# Run progress: 0.00% complete, ETA 00:03:20
# Fork: 1 of 1
# Warmup Iteration   1: 1083.656 ms/op
# Warmup Iteration   2: 846.485 ms/op
# Warmup Iteration   3: 901.164 ms/op
# Warmup Iteration   4: 849.659 ms/op
# Warmup Iteration   5: 903.805 ms/op
Iteration   1: 847.008 ms/op
Iteration   2: 895.800 ms/op
Iteration   3: 892.642 ms/op
Iteration   4: 825.901 ms/op
Iteration   5: 889.914 ms/op


Result "BenchmartTest.noReuse":
  870.253 ±(99.9%) 122.495 ms/op [Average]
  (min, avg, max) = (825.901, 870.253, 895.800), stdev = 31.812
  CI (99.9%): [747.758, 992.748] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.withReuse
# Parameters: (N = 10000000)

# Run progress: 50.00% complete, ETA 00:01:58
# Fork: 1 of 1
# Warmup Iteration   1: 113.780 ms/op
# Warmup Iteration   2: 113.643 ms/op
# Warmup Iteration   3: 114.323 ms/op
# Warmup Iteration   4: 114.258 ms/op
# Warmup Iteration   5: 117.351 ms/op
Iteration   1: 114.526 ms/op
Iteration   2: 113.944 ms/op
Iteration   3: 113.943 ms/op
Iteration   4: 112.930 ms/op
Iteration   5: 113.124 ms/op


Result "BenchmarkTest.withReuse":
  113.694 ±(99.9%) 2.528 ms/op [Average]
  (min, avg, max) = (112.930, 113.694, 114.526), stdev = 0.657
  CI (99.9%): [111.165, 116.222] (assumes normal distribution)


# Run complete. Total time: 00:03:40

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                     (N)  Mode  Cnt    Score     Error  Units
BenchmarkTest.noReuse    10000000  avgt    5  870.253 ± 122.495  ms/op
BenchmarkTest.withReuse  10000000  avgt    5  113.694 ±   2.528  ms/op
Run Code Online (Sandbox Code Playgroud)

Hol*_*ger 11

您的方法之所以起作用,是因为流管道仅包含无状态操作。在这样的星座中,顺序流评估可以一次处理一个元素,因此对包装实例的访问不会重叠,如此处所示。但是请注意,这不是保证的行为。

它绝对不适用于像sorted和这样的有状态操作distinct。它也可以不与减少操作的工作,因为他们总是不得不搁置了处理,其中包括至少两个元素reduceminmax。在这种情况下collect,取决于具体情况CollectorforEachOrdered由于需要缓冲,因此不适用于并行流。

请注意,即使您用于TheadLocal创建线程限制包装器,并行处理也存在问题,因为无法保证在一个工作线程中创建的对象仍位于该线程本地。辅助线程可以在处理其他无关的工作负载之前将部分结果移交给另一个线程。

所以这个共享的可变包装工程与一组特定的无状态操作,如mapfilterforEachfindFirst/Anyall/any/noneMatch,在特定实现的顺序执行。您没有获得API的灵活性,因为您必须限制自己,无法将流传递给期望a的任意代码,Stream也不能使用任意Collector实现。您还没有接口的封装,因为您要假设特定的实现行为。

换句话说,如果您想使用这样的可变包装器,则最好使用实现特定操作的循环。您确实已经具有这种手动实施的缺点,所以为什么不实施它便具有优势。


要考虑的另一个方面是,从重用这种可变的包装器中可以获得什么。它仅在类似循环的用法中起作用,在这种情况下,无论如何应用Escape Analysis后,临时对象可能会被优化掉。在这种情况下,重用对象会延长其寿命,实际上可能会降低性能。

当然,对象标定不是保证的行为。在某些情况下,例如长流管道超过了JVM的内联限制,这些对象就不会消失。但是,临时对象不一定昂贵。

答案已对此进行了解释。临时对象便宜地分配。垃圾回收的主要成本是由仍然存在的对象引起的。在为新分配腾出空间时,需要遍历这些元素,并移动它们。临时对象的负面影响是,它们可能会缩短垃圾收集之间的时间间隔。但这是分配速率和可用分配空间的函数,因此这确实是一个问题,可以通过向其添加更多RAM来解决。更多的RAM单元GC周期之间有更多的时间更多的死亡对象时,GC发生,这使得GC的净成本较小。

尽管如此,避免过度分配临时对象还是一个有效的问题。的存在IntStreamLongStream以及DoubleStream显示这一点。但是这些都是特殊的,因为使用原始类型是使用包装对象的可行替代方法,而没有重用可变包装器的缺点。它也有所不同,因为它适用于原始类型和包装器类型在语义上等效的问题。相反,您要解决操作需要包装器类型的问题。对于原始流也适用,当您需要解决问题的对象时,装箱是没有办法的,装箱将为不同的值创建不同的对象,而不共享可变对象。

因此,如果您同样遇到一个问题,即存在一个语义上等效的避免包装对象的替代方案,而又没有实质性问题,例如仅使用Comparator.comparingInt而不是Comparator.comparing在可行的情况下,您可能还是更喜欢它。但是只有这样。


简而言之,在大多数情况下,节省此类对象重用(如果有的话)将无法证明其缺点。在特殊情况下,这是有益和重要的,最好使用循环或完全控制下的任何其他构造,而不要使用Stream


Ale*_*lov 9

您可以使用一些方便的功能,也可以使用线程安全版本来并行处理。

Function<T,U> threadSafeReusableWrapper(Supplier<U> newWrapperInstanceFn, BiConsumer<U,T> wrapFn) {
   final ThreadLocal<T> wrapperStorage = ThreadLocal.withInitial(newWrapperInstanceFn);
   return item -> {
      T wrapper = wrapperStorage.get();
      wrapFn.consume(wrapper, item);
      return wrapper;
   }
}

Function<T,U> reusableWrapper(U wrapper, BiConsumer<U,T> wrapFn) {
   return item -> {
      wrapFn.consume(wrapper, item);
      return wrapper;
   };
}

list.stream()
    .map(reusableWrapper(new Wrapper(), Wrapper::setSource))
    .forEach( w -> processWrapper(w) );
list.stream()
    .map(threadSafeReusableWrapper(Wrapper::new, Wrapper::setSource))
     .parallel()
    .forEach( w -> processWrapper(w) );
Run Code Online (Sandbox Code Playgroud)

但是,我认为这不值得。这些包装纸是短命的,因此不太可能离开年轻一代,因此将很快被垃圾收集。虽然,我认为这个想法值得在微基准测试库JMH中进行验证