关于深化认识Java流spliterators的第一个问题后,这里为什么执行:约溪流另一个微妙的问题.flatMap()在Java中是如此低效(非懒惰)?
通常,流应该尽可能地保持懒惰,但.flatMap()方法不是.
例如:
stream.flatMap(this::getStreamWith10HeavyComputationElems).firstFirst() 在返回第一个繁重的计算结果之前,将消耗10个元素(10次重计算).
stream.flatMap(this::getStreamWith10HeavyComputationElems).limit(11).count() 在返回11之前将消耗20个元素(2x10重计算).
该问题是为什么Java使用非懒实施?
@Test
void flatMap_native() throws Exception {
AtomicInteger count = new AtomicInteger();
Stream<Long> stream = LongStream.range(0, 5).boxed()
.flatMap(num -> LongStream.range(0, 10).boxed()
.peek(x -> count.incrementAndGet()))
.limit(11);
assertThat(stream).hasSize(11);
assertThat(count).hasValue(20); //!why? - should be 11!
}
Run Code Online (Sandbox Code Playgroud)
作为解决方法,我创建了自己的flatMap实现,但与本机调用相比,它缺乏流畅性:flatMap(stream, mapper)vs native stream.flatMap(mapper).
public static <T, R> Stream<R> flatMap(Stream<? extends T> stream, Function<? super T, ? extends Stream<? extends R>> mapper) {
// Outside the class …Run Code Online (Sandbox Code Playgroud) 我试图将多个由大量数据支持的流合并为一个,然后缓冲它们。我可以毫无问题地将这些流折叠成一个项目流。但是,当我尝试缓冲/分块流时,它会尝试完全缓冲第一个流,这会立即填满我的内存。
我花了一段时间将问题缩小到最小测试用例,但下面有一些代码。
我可以重构一些东西,这样我就不会遇到这个问题,但是在不理解为什么会爆炸的情况下,我觉得使用流只是一个定时炸弹。
我从Java 8 Streams 上的 Buffer Operator那里获得了缓冲的灵感。
import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class BreakStreams
{
//@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
/**
* Batch a stream into chunks
*/
public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
{
final Iterator<T> streamIterator = stream.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
{
@Override public boolean hasNext()
{
return streamIterator.hasNext();
}
@Override public List<T> next()
{
List<T> intermediate = new ArrayList<>();
for (long v = 0; v < count …Run Code Online (Sandbox Code Playgroud) 考虑以下简单代码:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
Run Code Online (Sandbox Code Playgroud)
很长一段时间,我认为即使在flatMap. 但是上面的代码打印了所有的“Thread:main”,证明我的想法是错误的。
一种使其并行的简单方法flatMap是收集然后再次流式传输:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.collect(Collectors.toList())
.parallelStream()
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
Run Code Online (Sandbox Code Playgroud)
我想知道是否有更好的方法,以及flatMap仅在调用之前并行化流的设计选择,而不是在调用之后并行化。
========关于问题的更多说明========
从一些答案来看,我的问题似乎没有完全表达出来。正如@Andreas 所说,如果我从 …
I have multiple streams of Student Object. I have to merge them in sorted order.
class Student {
private String branch;
private Integer rollNo;
private Date doj;
// Getters and Setters
}
Run Code Online (Sandbox Code Playgroud)
In another class, I have a method
private Stream<Student> mergeAndSort(Stream<Student> s1, Stream<Student> s2, Stream<Student> s3) {
return Stream.of(s1, s2, s3).sorted(...
// I tried this logic multiple times but is not working. How can I inject Student comparator here.
// I have to sort based on branch and then …Run Code Online (Sandbox Code Playgroud) 我有一个带有其他对象列表的对象,每个其他对象都有一个列表等。我需要在层次结构中找到层次结构中具有匹配某个值的属性的第一个(也是唯一的)最后一个元素。看到我现在的代码会更清楚:
@Override
public Poste findByNumeroAndMillesime(String numero, Millesime millesime) {
return millesime
.getDivisions()
.stream()
.filter(
division -> division
.getGroupes()
.stream()
.filter(
groupe -> groupe
.getClasses()
.stream()
.filter(
classe -> classe
.getSousClasses()
.stream()
.filter(
sousClasse -> sousClasse
.getPostes()
.stream()
.filter(poste -> numero.equals(poste.getNumero()))
.findFirst()
.get()
))));
}
Run Code Online (Sandbox Code Playgroud)
我需要返回与作为参数传递的数字相同的 Poste。
提前致谢。
出于某种原因,Java Stream 会生成更多的值(调用迭代器的 hasNext() 和 next() 方法。
这是合成示例。
我有一个迭代器形式的生成器:
@RequiredArgsConstructor
static class TestIterator implements Iterator<Integer> {
private final int bound;
private final Random rnd = new Random();
private int current = 0;
@Override public boolean hasNext() {
return current < bound;
}
@Override public Integer next() {
current = rnd.nextInt(20);
System.out.println("Generated: " + current);
return current;
}
}
Run Code Online (Sandbox Code Playgroud)
现在,我正在尝试创建一个由几个迭代器组成的扁平流
public static void main(String... args) {
List<Iterator<Integer>> iterators = asList(
new TestIterator(18),
new TestIterator(18),
new TestIterator(18));
Stream<Integer> streams = iterators.stream()
.map(iter …Run Code Online (Sandbox Code Playgroud) java-stream ×6
java ×4
java-8 ×3
compare ×1
foreach ×1
iterator ×1
merge ×1
sorting ×1
spliterator ×1