Mig*_*boa 7 java java-8 java-stream
Spliterator从Stream管道中获取 a可能会返回StreamSpliterators.WrappingSpliterator的实例。例如,得到以下内容Spliterator:
Spliterator<String> source = new Random()
.ints(11, 0, 7) // size, origin, bound
.filter(nr -> nr % 2 != 0)
.mapToObj(Integer::toString)
.spliterator();
Run Code Online (Sandbox Code Playgroud)
鉴于上述情况Spliterator<String> source,当我们通过 的tryAdvance (Consumer<? super P_OUT> consumer)方法(Spliterator在本例中为StreamSpliterators.WrappingSpliterator的实例)单独遍历元素时,它将首先将项目累积到内部缓冲区中,然后再消费这些项目,如我们在StreamSpliterators.java中所见#298。从简单的角度来看,doAdvance()首先将项目插入到buffer,然后获取下一个项目并将其传递给consumer.accept (…)。
public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
Run Code Online (Sandbox Code Playgroud)
但是,我没有弄清楚 this 的需要buffer。
在这种情况下,为什么不简单地将 的consumer参数tryAdvance用作Sink管道的终端?
请记住,这是该方法Spliterator返回的publicStream.spliterator(),因此不能对调用者做出任何假设(只要它在合约内)。
该tryAdvance方法可能会为每个stream\xe2\x80\x99s 元素调用一次,并再次调用以检测流的结尾,实际上,即使在到达结尾之后,它也可能会被调用任意次数。并且不能保证调用者总是会经过同一个消费者。
要将消费者直接传递到源分割器而不进行缓冲,您必须编写一个将执行所有管道阶段的消费者,即调用映射函数并使用其结果或测试谓词,并且如果为负则不调用下游消费者等等。传递给源 spliterator 的使用者还将负责WrappingSpliterator以某种方式通知过滤器拒绝的值,因为在这种情况下源 spliterator\xe2\x80\x99stryAdvance方法仍然返回true,并且必须重复该操作。
正如尤金正确提到的那样,这是一种万能的实现,它不考虑有多少个或何种类型的管道阶段。组成这样的消费者的成本可能会很高,并且可能必须为每个tryAdvance调用重新应用,为每个流元素读取,例如,当传递到不同的消费者时tryAdvance或当相等检查不起作用时。请记住,消费者通常被实现为 lambda 表达式,并且 lambda 表达式生成的实例的同一性或相等性是未指定的。
因此,该tryAdvance实现通过在第一次调用时仅组合一个使用者实例来避免这些成本,该实例始终将元素存储到同一缓冲区中,如果没有被过滤器拒绝,该实例也会在第一次调用时分配。请注意,正常情况下,缓冲区只会保存一个元素。Afaik,flatMap这是唯一可以将更多元素推送到缓冲区的操作。但请注意,这种非惰性行为的存在flatMap也是需要这种缓冲策略的原因,至少在flatMap涉及时,以确保方法Spliterator分发的实现public将履行最多将一个元素传递给消费者在一次调用tryAdvance.
相比之下,当你调用时forEachRemaining,这些问题就不存在。整个操作期间只有一个Consumer实例,并且 \xe2\x80\x99 的非惰性flatMap也不重要,因为无论如何所有元素都会被消耗。因此,只要之前没有tryAdvance进行可能导致某些元素缓冲的调用,就会尝试非缓冲传输:
public void forEachRemaining(Consumer<? super P_OUT> consumer) {\n if (buffer == null && !finished) {\n Objects.requireNonNull(consumer);\n init();\n\n ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);\n finished = true;\n }\n else {\n do { } while (tryAdvance(consumer));\n }\n }\nRun Code Online (Sandbox Code Playgroud)\n\n正如你所看到的,只要buffer尚未初始化,即之前没有tryAdvance进行任何调用,consumer::accept 就绑定为Sink并进行完整的直接传输。
这是我在 Holger 的几篇文章中读到的内容,我将在这里总结一下;如果有某个完全相同的重复项(我会尝试找到一个)-我将关闭并删除我对该答案的回答。
首先,是为什么WrappingSpliterator首先需要 - 对于像 、 等有状态操作sorted-distinct但我认为你已经理解了这一点。我也假设flatMap- 因为它很渴望。
现在,当您调用spliterator, IFF 时,没有状态操作,没有真正的理由将其包装成WrappingSpliterator明显的,但目前尚未完成。stateful operations这可能会在未来的版本中更改 - 他们可以在您调用之前检测是否有spliterator;但他们现在不这样做,只是将每个操作视为有状态的,从而将其包装到WrappingSpliterator
我基本上同意 @Holger 的精彩回答,但我会以不同的方式强调口音。我认为您很难理解对缓冲区的需求,因为您对 Stream API 所允许的功能有非常简单的心理模型。如果将 Stream 视为map和的序列filter,则不需要额外的缓冲区,因为这些操作有 2 个重要的“好”属性:
然而,在一般情况下,这些情况并非如此。正如@Holger(以及我在我原来的答案中)提到的那样,flatMapJava 8 中已经存在违反规则#2 的情况,而在 Java 9 中,他们最终添加了takeWhileStream ,它实际上是在整个->上进行转换Stream,而不是在每个元素的基础上进行转换(并且这是 AFAIK 第一个中间衬衫电路操作)。
我不太同意@Holger的另一点是,我认为最根本的原因与他在第二段中提出的有点不同(即a),你可以在多次tryAdvance结束后调用Stream它,b )“不能保证调用者总是传递同一个消费者”)。我认为最重要的原因是Spliterator 功能上相同Stream必须支持短路和惰性(即不处理整体的能力Stream,否则它不能支持未绑定的流)。换句话说,即使 Spliterator API(很奇怪)要求您必须对Consumer给定 的所有方法的所有调用使用相同的对象Spliterator,您仍然需要tryAdvance并且该tryAdvance实现仍然必须使用一些缓冲区。forEachRemaining(Consumer<? super T> )如果你所拥有的一切只是让你无法实现任何类似findFirst或使用它的东西,那么你就无法停止处理数据takeWhile。Sink实际上,这就是 JDK 内部实现使用接口而不是Consumer(以及“wrap”代表的内容)的原因之一wrapAndCopyInto:Sink有额外的boolean cancellationRequested()方法。
总结一下:需要一个缓冲区,因为我们想要Spliterator:
Consumer方法无法报告处理/取消的后端请注意,这两个要求实际上有些矛盾。
示例和一些代码
在这里,我想提供一些代码示例,我认为在给定当前 API 协定(接口)的情况下,如果没有额外的缓冲区,这些代码是不可能实现的。这个例子是基于你的例子。
有一个简单的Collatz 整数序列,推测它最终总是会达到 1。据我所知,这个猜想尚未得到证明,但已针对许多整数进行了验证(至少对于整个 32 位 int 范围)。
因此,假设我们要解决的问题如下:从 1 到 1,000,000 范围内的随机起始数的 Collatz 序列流中,找到第一个在十进制表示中包含“123”的序列。
这是一个仅使用Stream(而不是Spliterator)的解决方案:
static String findGoodNumber() {
return new Random()
.ints(1, 1_000_000) // unbound!
.flatMap(nr -> collatzSequence(nr))
.mapToObj(Integer::toString)
.filter(s -> s.contains("123"))
.findFirst().get();
}
Run Code Online (Sandbox Code Playgroud)
其中collatzSequence是一个返回Stream包含 Collatz 序列直到第一个 1 的函数(对于挑剔者来说,当当前值大于此值时,让它也停止,Integer.MAX_VALUE /3这样我们就不会溢出)。
每个Stream返回的此类都受到collatzSequence约束。标准Random还将最终生成所提供范围内的每个数字。这意味着我们保证流中最终会有一些“好的”数字(例如123)并且findFirst是短路的,因此整个操作实际上将终止。然而,没有合理的 Stream API 实现可以预测这一点。
现在让我们假设由于某种奇怪的原因您想使用 middle 执行相同的操作Spliterator。即使您只有一个逻辑并且不需要不同的Consumers,您也不能使用forEachRemaining. 所以你必须做这样的事情:
static Spliterator<String> createCollatzRandomSpliterator() {
return new Random()
.ints(1, 1_000_000) // unbound!
.flatMap(nr -> collatzSequence(nr))
.mapToObj(Integer::toString)
.spliterator();
}
static String findGoodNumberWithSpliterator() {
Spliterator<String> source = createCollatzRandomSpliterator();
String[] res = new String[1]; // work around for "final" closure restriction
while (source.tryAdvance(s -> {
if (s.contains("123")) {
res[0] = s;
}
})) {
if (res[0] != null)
return res[0];
}
throw new IllegalStateException("Impossible");
}
Run Code Online (Sandbox Code Playgroud)
同样重要的是,对于某些起始数字,Collatz 序列将包含多个匹配数字。例如,41123and 123370(= 41123*3+1) 都包含“123”。这意味着我们真的不希望我们Consumer在第一个匹配的命中后被调用。但由于Consumer没有公开任何报告处理结束的方法,因此WrappingSpliterator不能仅将 our 传递Consumer给内部Spliterator. 唯一的解决方案是将内部的所有结果flatMap(以及所有后处理)累积到某个缓冲区中,然后一次迭代一个元素。
| 归档时间: |
|
| 查看次数: |
906 次 |
| 最近记录: |