Mig*_*boa 7 java-8 java-stream
关于如何跳过从Files.lines获得的偶数行的问题,我遵循接受的答案filterEven()方法,基于Spliterator<T>接口实现我自己的方法,例如:
public static <T> Stream<T> filterEven(Stream<T> src) {
Spliterator<T> iter = src.spliterator();
AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
{
@Override
public boolean tryAdvance(Consumer<? super T> action) {
iter.tryAdvance(item -> {}); // discard
return iter.tryAdvance(action); // use
}
};
return StreamSupport.stream(res, false);
}
Run Code Online (Sandbox Code Playgroud)
我可以通过以下方式使用:
Stream<DomainObject> res = Files.lines(src)
filterEven(res)
.map(line -> toDomainObject(line))
Run Code Online (Sandbox Code Playgroud)
然而,测量这种方法对下一个使用filter()副作用的方法的性能时,我注意到下一个方法表现更好:
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
.filter(line -> isEvenLine ())
.map(line -> toDomainObject(line))
Run Code Online (Sandbox Code Playgroud)
我用JMH测试了性能,我没有在基准测试中包含文件负载.我之前将它加载到一个数组中.然后每个基准测试首先创建一个Stream<String>from previous数组,然后过滤偶数行,然后应用a mapToInt()来提取int字段的值,最后是一个max()操作.这是它的基准之一(你可以在Program 这里检查整个,这里有大约186行的数据文件):
@Benchmark
public int maxTempFilterEven(DataSource src){
Stream<String> content = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1); // Skip line: Not available
return filterEven(content) // Filter daily info and skip hourly
.mapToInt(line -> parseInt(line.substring(14, 16)))
.max()
.getAsInt();
}
Run Code Online (Sandbox Code Playgroud)
我不明白为什么这种filter()方法比(~50ops/ms)具有更好的性能filterEven()(~80ops/ms)?
介绍
我想我知道原因,但不幸的是我不知道如何提高Spliterator基于 - 的解决方案的性能(至少不重写整个 Streams API 功能)。
旁注 1:在设计Stream API时,性能并不是最重要的设计目标。如果性能至关重要,那么很可能在不使用 Stream API 的情况下重写代码将使代码更快。(例如,Stream API 不可避免地会增加内存分配,从而增加 GC 压力)。另一方面,在大多数场景中,Stream API 提供了更好的高级 API,但代价是相对较小的性能下降。
第1部分或简短的理论答案
Stream旨在实现一种内部迭代作为消耗的主要手段,而外部迭代(即Spliterator基于)是一种“模拟”的附加手段。因此外部迭代涉及一些开销。惰性给外部迭代的效率增加了一些限制,并且需要支持flatMap使得在此过程中需要使用某种动态缓冲区。
旁注 2在某些情况下Spliterator,基于迭代可能与内部迭代一样快(即filter在本例中)。特别是当您Spliterator直接从包含数据的Stream. 要查看它,您可以修改测试以将第一个过滤器具体化为Strings 数组:
String[] filteredData = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1)
.toArray(String[]::new);
Run Code Online (Sandbox Code Playgroud)
然后比较maxTempFilter和的性能maxTempFilterEven以接受预过滤的String[] filteredData. 如果您想知道为什么会这样,您可能应该阅读这个长答案的其余部分或至少第 2 部分。
第2部分或更长的理论答案:
流被设计为主要由某些终端操作整体消耗。尽管支持逐个迭代元素,但并不是将其设计为消费流的主要方式。
请注意,使用“功能”Stream API(例如map、flatMap、filter、reduce),collect您不能在某个步骤说“我已经有足够的数据,停止迭代源并推送值”。您可以丢弃一些传入数据(正如filter所做的那样),但无法停止迭代。(take并且skip转换实际上是使用Spliterator内部实现的;并且anyMatch、allMatch、noneMatch、findFirst、findAny等使用非公共 API j.u.s.Sink.cancellationRequested,而且它们更容易,因为不能有多个终端操作)。如果管道中的所有转换都是同步的,您可以将它们组合成一个聚合函数 ( Consumer) 并在一个简单循环中调用它(可以选择将循环执行拆分到多个线程)。这就是我的基于状态的过滤器的简化版本所代表的内容(请参阅“向我显示一些代码”部分中的代码)。如果有的话,事情会变得有点复杂flatMap,但想法仍然相同。
Spliterator基于 的转换有着根本的不同,因为它向管道添加了异步消费者驱动的步骤。现在,是驱动迭代过程的因素Spliterator而不是源。如果您直接在源上Stream请求,它可能会返回一些仅迭代其内部数据结构的实现,这就是为什么具体化预过滤数据应该消除性能差异。但是,如果您为某些非空管道创建一个,除了要求源将元素一个接一个地推送到管道中,直到某个元素通过所有过滤器(另请参见显示中的第二个示例)之外,没有其他(简单)选择我一些代码部分)。源元素被逐一推送而不是分批推送这一事实是使s 变得懒惰这一基本决策的结果。需要一个缓冲区而不仅仅是一个元素是支持以下内容的结果:从源中推送一个元素可以产生许多元素SpliteratorStreamSpliteratorStreamflatMapSpliterator。
第3部分或显示一些代码
本部分试图为“理论”部分中描述的内容提供一些代码支持(包括到真实代码和模拟代码的链接)。
首先,您应该知道当前的 Streams API 实现将非终端(中间)操作累积到单个延迟管道中(请参阅jusAbstractPipeline及其子项,例如jusReferencePipeline。然后,当应用终端操作时,原始操作中的所有元素Stream被“推”过管道。
您所看到的结果是两件事的结果:
Spliterator内部有基于 - 的步骤时,流管道是不同的。OddLines并不是管道中的第一步带有状态过滤器的代码或多或少类似于以下简单的代码:
static int similarToFilter(String[] data)
{
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
int skip = 1;
boolean reduceEmpty = true;
int reduceState = 0;
for (String outerEl : data)
{
if (outerEl.charAt(0) != '#')
{
if (skip > 0)
skip--;
else
{
if (isEvenLine.test(outerEl))
{
int intEl = parseInt(outerEl.substring(14, 16));
if (reduceEmpty)
{
reduceState = intEl;
reduceEmpty = false;
}
else
{
reduceState = Math.max(reduceState, intEl);
}
}
}
}
}
return reduceState;
}
Run Code Online (Sandbox Code Playgroud)
请注意,这实际上是一个内部有一些计算(过滤/转换)的单个循环。
Spliterator另一方面,当您将 a 添加到管道中时,情况会发生显着变化,即使经过简化,与实际发生的情况相当相似的代码也会变得更大,例如:
interface Sp<T>
{
public boolean tryAdvance(Consumer<? super T> action);
}
static class ArraySp<T> implements Sp<T>
{
private final T[] array;
private int pos;
public ArraySp(T[] array)
{
this.array = array;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (pos < array.length)
{
action.accept(array[pos]);
pos++;
return true;
}
else
{
return false;
}
}
}
static class WrappingSp<T> implements Sp<T>, Consumer<T>
{
private final Sp<T> sourceSp;
private final Predicate<T> filter;
private final ArrayList<T> buffer = new ArrayList<T>();
private int pos;
public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
{
this.sourceSp = sourceSp;
this.filter = filter;
}
@Override
public void accept(T t)
{
buffer.add(t);
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
while (true)
{
if (pos >= buffer.size())
{
pos = 0;
buffer.clear();
sourceSp.tryAdvance(this);
}
// failed to fill buffer
if (buffer.size() == 0)
return false;
T nextElem = buffer.get(pos);
pos++;
if (filter.test(nextElem))
{
action.accept(nextElem);
return true;
}
}
}
}
static class OddLineSp<T> implements Sp<T>, Consumer<T>
{
private Sp<T> sourceSp;
public OddLineSp(Sp<T> sourceSp)
{
this.sourceSp = sourceSp;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (sourceSp == null)
return false;
sourceSp.tryAdvance(this);
if (!sourceSp.tryAdvance(action))
{
sourceSp = null;
}
return true;
}
@Override
public void accept(T t)
{
}
}
static class ReduceIntMax
{
boolean reduceEmpty = true;
int reduceState = 0;
public int getReduceState()
{
return reduceState;
}
public void accept(int t)
{
if (reduceEmpty)
{
reduceEmpty = false;
reduceState = t;
}
else
{
reduceState = Math.max(reduceState, t);
}
}
}
static int similarToSpliterator(String[] data)
{
ArraySp<String> src = new ArraySp<>(data);
int[] skip = new int[1];
skip[0] = 1;
WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
{
if (s.charAt(0) == '#')
return false;
if (skip[0] != 0)
{
skip[0]--;
return false;
}
return true;
});
OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
final ReduceIntMax reduceIntMax = new ReduceIntMax();
while (oddLines.tryAdvance(s ->
{
int intValue = parseInt(s.substring(14, 16));
reduceIntMax.accept(intValue);
})) ; // do nothing in the loop body
return reduceIntMax.getReduceState();
}
Run Code Online (Sandbox Code Playgroud)
此代码较大,因为如果循环内没有一些重要的状态回调,则不可能(或至少很难)表示逻辑。这里的接口是和接口Sp的混合。j.u.s.Streamj.u.Spliterator
类ArraySp代表 的结果Arrays.stream。
类WrappingSp类似于jusStreamSpliterators.WrappingSpliterator,它在实际代码中表示Spliterator任何非空管道的接口实现,即Stream至少应用了一个中间操作(请参阅jusAbstractPipeline.spliterator 方法)。在我的代码中,我将其与StatelessOp子类合并,并将负责filter方法实现的逻辑放在那里。另外为了简单起见,我skip使用filter.
OddLineSp对应于您OddLines及其结果Stream
ReduceIntMax代表forReduceOps的终端操作Math.maxint
那么这个例子中重要的是什么?这里重要的是,由于您首先过滤原始流,因此您OddLineSp是从非空管道(即从WrappingSp. 如果您仔细查看WrappingSp,您会发现每次tryAdvance调用时,它都会将调用委托给sourceSp并将结果累积到buffer. flatMap而且,由于管道中没有元素,因此buffer将一一复制。即每次WrappingSp.tryAdvance被调用时,它都会调用ArraySp.tryAdvance,返回一个元素(通过回调),并将其进一步传递给consumer调用者提供的元素(除非该元素与过滤器不匹配,在这种情况下ArraySp.tryAdvance将被一次又一次地调用,但是仍然buffer一次不会填充多个元素)。
旁注3:如果你想看真正的代码,最有趣的地方是jusStreamSpliterators。WrappingSpliterator.tryAdvance它调用
jusStreamSpliterators。AbstractWrappingSpliterator.doAdvance它又调用jusStreamSpliterators。AbstractWrappingSpliterator.fillBuffer它又调用pusher在 jusStreamSpliterators 处初始化的函数。WrappingSpliterator.initPartialTraversalState
因此,影响性能的主要因素是复制到缓冲区。不幸的是,对于我们这些普通的 Java 开发人员来说,当前 Stream API 的实现几乎是封闭的,您无法使用继承或组合仅修改内部行为的某些方面。您可以使用一些基于反射的黑客技术来使复制到缓冲区对于您的特定情况更有效并获得一些性能(但牺牲了惰性),Stream但您无法完全避免这种复制,因此Spliterator基于 的代码无论如何都会变慢。
回到旁注 #2中的示例,Spliterator基于物化的测试工作得更快,因为之前的管道中filteredData没有,因此不会复制到中间缓冲区中。WrappingSpOddLineSp
| 归档时间: |
|
| 查看次数: |
448 次 |
| 最近记录: |