如果我执行以下代码"连接"两个流
Stream<Stream<Integer>>Stream<Stream<Integer>>使用Stream.concat()在两种情况下我都获得了相同的正确结果,但过滤操作的数量是不同的.
public class FlatMapVsReduce {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
Predicate<Integer> predicate1 = i -> {
System.out.println("testing first condition with " + i);
return i == 3;
};
Predicate<Integer> predicate2 = i -> {
System.out.println("testing second condition with " + i);
return i == 7;
};
System.out.println("Testing with flatMap");
Integer result1 =
Stream.of(list.stream().filter(predicate1),
list.stream().filter(predicate2))
.flatMap(Function.identity())
.peek(i -> System.out.println("peeking " …Run Code Online (Sandbox Code Playgroud) 假设我有这个代码:
Collections.singletonList(10)
.parallelStream() // .stream() - nothing changes
.flatMap(x -> Stream.iterate(0, i -> i + 1)
.limit(x)
.parallel()
.peek(m -> {
System.out.println(Thread.currentThread().getName());
}))
.collect(Collectors.toSet());
Run Code Online (Sandbox Code Playgroud)
输出是相同的线程名称,因此这里没有任何好处parallel- 我的意思是,有一个线程可以完成所有工作.
里面flatMap有这个代码:
result.sequential().forEach(downstream);
Run Code Online (Sandbox Code Playgroud)
我理解强制sequential属性如果"外部"流将是并行的(它们可能会阻塞),"外部"将不得不等待"flatMap"完成,反过来(因为使用相同的公共池)但为什么总是强迫吗?
这是那些在以后的版本中可能发生变化的事情之一吗?
我想使用java8迭代嵌套列表streams,并在第一次匹配时提取列表的一些结果.不幸的是,如果子元素与过滤器匹配,我还必须从父内容中获取值.
我怎么能这样做?
// java7
Result result = new Result();
//find first match and pupulate the result object.
for (FirstNode first : response.getFirstNodes()) {
for (SndNode snd : first.getSndNodes()) {
if (snd.isValid()) {
result.setKey(first.getKey());
result.setContent(snd.getContent());
return;
}
}
}
Run Code Online (Sandbox Code Playgroud)
// java8
response.getFirstNodes().stream()
.flatMap(first -> first.getSndNodes())
.filter(snd -> snd.isValid())
.findFirst()
.ifPresent(???); //cannot access snd.getContent() here
Run Code Online (Sandbox Code Playgroud) 我正在尝试实现一个在其实现中使用自身的另一个实例的流.该流有一些常量元素(使用IntStream.concat),因此只要连接流懒惰地创建非常量部分,这应该有效.我认为使用StreamSupport.intStream重载使用IntStream.concat的供应商("创建一个延迟连接的流")应该足够懒,只能在需要元素时创建第二个分裂器,但是甚至创建流(不评估)它)溢出堆栈.我如何懒洋洋地连接流?
我正试图将这个答案的流媒体素数筛选器移植到Java中.此筛使用其自身的另一个实例(ps = postponed_sieve()在Python代码中).如果我将最初的四个常量元素(yield 2; yield 3; yield 5; yield 7;)分解为它们自己的流,那么很容易将生成器实现为一个分裂器:
/**
* based on https://stackoverflow.com/a/10733621/3614835
*/
static class PrimeSpliterator extends Spliterators.AbstractIntSpliterator {
private static final int CHARACTERISTICS = Spliterator.DISTINCT | Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED | Spliterator.SORTED;
private final Map<Integer, Supplier<IntStream>> sieve = new HashMap<>();
private final PrimitiveIterator.OfInt postponedSieve = primes().iterator();
private int p, q, c = 9;
private Supplier<IntStream> s;
PrimeSpliterator() {
super(105097564 /* according to …Run Code Online (Sandbox Code Playgroud) Spliterator从Stream管道中获取 a可能会返回StreamSpliterators.WrappingSpliterator的实例。例如,得到以下内容Spliterator:
Spliterator<String> source = new Random()
.ints(11, 0, 7) // size, origin, bound
.filter(nr -> nr % 2 != 0)
.mapToObj(Integer::toString)
.spliterator();
Run Code Online (Sandbox Code Playgroud)
鉴于上述情况Spliterator<String> source,当我们通过 的tryAdvance (Consumer<? super P_OUT> consumer)方法(Spliterator在本例中为StreamSpliterators.WrappingSpliterator的实例)单独遍历元素时,它将首先将项目累积到内部缓冲区中,然后再消费这些项目,如我们在StreamSpliterators.java中所见#298。从简单的角度来看,doAdvance()首先将项目插入到buffer,然后获取下一个项目并将其传递给consumer.accept (…)。
public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
Run Code Online (Sandbox Code Playgroud)
但是,我没有弄清楚 this 的需要buffer。
在这种情况下,为什么不简单地将 的 …
我认为我遇到了一个问题,我做了一个假设:如果一个分裂者的项目没有被一个流消耗,分裂者仍然可以前进到它.看来情况并非如此.
这里有一些代码来演示:
import java.util.Spliterator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Created by dsmith on 7/21/15.
*/
public class SpliteratorTest {
public static void main(String[] args) {
System.out.println("Test 1");
test1();
System.out.println("Test 2");
test2();
}
public static void test1() {
final Spliterator<String> spliterator1 = Stream.of("a", "b", "c", "d", "e", "f").spliterator();
StreamSupport.stream(spliterator1, false).
limit(3).
collect(Collectors.toList());
System.out.println("spliterator1.estimateSize() = " + spliterator1.estimateSize());
}
public static void test2() {
final Spliterator<String> spliterator1 = Stream.of("a", "b", "c", "d", "e", "f").spliterator();
final Spliterator<String> spliterator2 …Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个方法,在列表列表中查找对象的索引并利用并行性.这是我的代码.
// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present.
public static int[] indices(List<? extends List<?>> lists, Object o) {
return IntStream.range(0, lists.size())
.boxed()
.flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j}))
.parallel()
.filter(a -> {
System.out.println(Arrays.toString(a)); // For testing only
return Objects.equals(o, lists.get(a[0]).get(a[1]));
})
.findAny()
.orElse(null);
}
Run Code Online (Sandbox Code Playgroud)
当我运行以下代码时
List<List<String>> lists = Arrays.asList(
Arrays.asList("A", "B", "C"),
Arrays.asList("D", "E", "F", "G"),
Arrays.asList("H", "I"),
Collections.nCopies(5, "J")
);
System.out.println("Indices are " + Arrays.toString(indices(lists, "J")));
Run Code Online (Sandbox Code Playgroud)
输出是这样的
[0, 0] …Run Code Online (Sandbox Code Playgroud) javadoc Spliterator提到:
Spliterator可以单独遍历元素(tryAdvance())或顺序遍历元素(forEachRemaining()).
然后我们转到javadoc,tryAdvance()其中说:
如果存在剩余元素,则对其执行给定操作,返回true; else返回false.
也许我在某个地方误读,但对我来说似乎只要剩下一个或多个元素,Consumer作为一个参数应该只.accept()返回前的每个参数true,如果,比方说,我有两个参数立即可用,那么我不能:
action.accept(arg1);
action.accept(arg2);
return true;
Run Code Online (Sandbox Code Playgroud)
在这个项目中,我重写了广泛的第一个分裂器,现在它读取:
// deque is a Deque<Iterator<T>>
@Override
public boolean tryAdvance(final Consumer<? super T> action)
{
Iterator<T> iterator;
T element;
while (!deque.isEmpty()) {
iterator = deque.removeFirst();
while (iterator.hasNext()) {
element = iterator.next();
deque.add(fn.apply(element));
action.accept(element);
}
}
return false;
}
Run Code Online (Sandbox Code Playgroud)
简而言之,我action接受所有参数,然后返回false ...而测试虽然很简单,但仍然成功(链接).
请注意,.trySplit()始终返回null; 分裂器具有特征DISTINCT,ORDERED并且NONNULL …
我正在使用Async Http Client 库(使用 Netty)向 RESTful API 发出异步 Http Get 请求。由于我想保留非阻塞行为,因此我将CompletableFuture<T>作为 Http Get 请求的结果返回的实例。因此,在 RESTful API 端点返回一个 Json 数组的地方,我返回一个CompletableFuture<T[]>.
然而,根据 Erik Meijer 对编程中的四种基本效果所做的分类,我认为这Stream<T>更适合于发出异步 Http Get 请求并返回 Json 数组的 Java 方法的结果。在这种情况下,我们可以将Stream<T>视为Observable<T>等价物,它是返回许多值的异步计算的结果。
所以,考虑到这resp持有响应,那么我可以得到CompletableFuture<Stream<T>>如下:
CompletableFuture<T[]> resp = …
return resp.thenApply(Arrays::stream);
Run Code Online (Sandbox Code Playgroud)
但是,我想知道如何将 the 转换CompletableFuture<Stream<T>> resp为 a Stream<T>,而无需等待计算完成(即我不想在get()调用时阻塞)?
我希望得到与以下表达式相同的结果,但不阻塞get():
return resp.thenApply(Arrays::stream).get();
Run Code Online (Sandbox Code Playgroud) 我一直试图从官方Java文档中找到明确的合同,关于Java流的顺序,一旦调用终端操作,就处理元素并调用中间操作.
例如,让我们看看这些使用Java流版本和普通迭代版本的示例(两者都产生相同的结果).
例1:
List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5);
Function<Integer, Integer> map1 = i -> i;
Predicate<Integer> f1 = i -> i > 2;
public int findFirstUsingStreams(List<Integer> ints){
return ints.stream().map(map1).filter(f1).findFirst().orElse(-1);
}
public int findFirstUsingLoopV1(List<Integer> ints){
for (int i : ints){
int mappedI = map1.apply(i);
if ( f1.test(mappedI) ) return mappedI;
}
return -1;
}
public int findFirstUsingLoopV2(List<Integer> ints){
List<Integer> mappedInts = new ArrayList<>( ints.size() );
for (int i : ints){
int mappedI = map1.apply(i);
mappedInts.add(mappedI);
} …Run Code Online (Sandbox Code Playgroud) java-8 ×10
java ×9
java-stream ×9
asynchronous ×1
java-9 ×1
lambda ×1
optional ×1
spliterator ×1