相关疑难解决方法(0)

流<Stream>:flatMap与reduce

如果我执行以下代码"连接"两个流

  • 首先通过flatMapping a Stream<Stream<Integer>>
  • 然后通过减少Stream<Stream<Integer>>使用Stream.concat()

在两种情况下我都获得了相同的正确结果,但过滤操作的数量是不同的.

public class FlatMapVsReduce {
    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

        Predicate<Integer> predicate1 = i -> {
            System.out.println("testing first condition with " + i);
            return i == 3;
        };

        Predicate<Integer> predicate2 = i -> {
            System.out.println("testing second condition with " + i);
            return i == 7;
        };

        System.out.println("Testing with flatMap");
        Integer result1 =
            Stream.of(list.stream().filter(predicate1),
                      list.stream().filter(predicate2))
                  .flatMap(Function.identity())
                  .peek(i -> System.out.println("peeking " …
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream

10
推荐指数
1
解决办法
1654
查看次数

并行flatMap总是顺序的

假设我有这个代码:

 Collections.singletonList(10)
            .parallelStream() // .stream() - nothing changes
            .flatMap(x -> Stream.iterate(0, i -> i + 1)
                    .limit(x)
                    .parallel()
                    .peek(m -> {
                        System.out.println(Thread.currentThread().getName());
                    }))
            .collect(Collectors.toSet());
Run Code Online (Sandbox Code Playgroud)

输出是相同的线程名称,因此这里没有任何好处parallel- 我的意思是,有一个线程可以完成所有工作.

里面flatMap有这个代码:

result.sequential().forEach(downstream);
Run Code Online (Sandbox Code Playgroud)

我理解强制sequential属性如果"外部"流将是并行的(它们可能会阻塞),"外部"将不得不等待"flatMap"完成,反过来(因为使用相同的公共池)但为什么总是强迫吗?

这是那些在以后的版本中可能发生变化的事情之一吗?

java java-8 java-stream java-9

9
推荐指数
1
解决办法
1683
查看次数

如何使用Java 8流迭代嵌套for循环引用父元素?

我想使用java8迭代嵌套列表streams,并在第一次匹配时提取列表的一些结果.不幸的是,如果子元素与过滤器匹配,我还必须从父内容中获取值.

我怎么能这样做?

// java7

Result result = new Result();

//find first match and pupulate the result object.
for (FirstNode first : response.getFirstNodes()) {
    for (SndNode snd : first.getSndNodes()) {
        if (snd.isValid()) {
            result.setKey(first.getKey());
            result.setContent(snd.getContent());
            return;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

// java8

 response.getFirstNodes().stream()
        .flatMap(first -> first.getSndNodes())
        .filter(snd -> snd.isValid())
        .findFirst()
        .ifPresent(???); //cannot access snd.getContent() here
Run Code Online (Sandbox Code Playgroud)

java optional java-8 java-stream

8
推荐指数
1
解决办法
3618
查看次数

我如何懒洋洋地连接流?

我正在尝试实现一个在其实现中使用自身的另一个实例的流.该流有一些常量元素(使用IntStream.concat),因此只要连接流懒惰地创建非常量部分,这应该有效.我认为使用StreamSupport.intStream重载使用IntStream.concat的供应商("创建一个延迟连接的流")应该足够懒,只能在需要元素时创建第二个分裂器,但是甚至创建流(不评估)它)溢出堆栈.我如何懒洋洋地连接流?


我正试图将这个答案的流媒体素数筛选器移植到Java中.此筛使用其自身的另一个实例(ps = postponed_sieve()在Python代码中).如果我将最初的四个常量元素(yield 2; yield 3; yield 5; yield 7;)分解为它们自己的流,那么很容易将生成器实现为一个分裂器:

/**
 * based on https://stackoverflow.com/a/10733621/3614835
 */
static class PrimeSpliterator extends Spliterators.AbstractIntSpliterator {
    private static final int CHARACTERISTICS = Spliterator.DISTINCT | Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED | Spliterator.SORTED;
    private final Map<Integer, Supplier<IntStream>> sieve = new HashMap<>();
    private final PrimitiveIterator.OfInt postponedSieve = primes().iterator();
    private int p, q, c = 9;
    private Supplier<IntStream> s;
    PrimeSpliterator() {
        super(105097564 /* according to …
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream

7
推荐指数
1
解决办法
1162
查看次数

为什么 stream.spliterator() 的 tryAdvance 可能会将项目累积到缓冲区中?

SpliteratorStream管道中获取 a可能会返回StreamSpliterators.WrappingSpliterator的实例。例如,得到以下内容Spliterator

Spliterator<String> source = new Random()
            .ints(11, 0, 7) // size, origin, bound
            .filter(nr -> nr % 2 != 0)
            .mapToObj(Integer::toString)
            .spliterator();
Run Code Online (Sandbox Code Playgroud)

鉴于上述情况Spliterator<String> source,当我们通过 的tryAdvance (Consumer<? super P_OUT> consumer)方法(Spliterator在本例中为StreamSpliterators.WrappingSpliterator的实例)单独遍历元素时,它将首先将项目累积到内部缓冲区中,然后再消费这些项目,如我们在StreamSpliterators.java所见#298。从简单的角度来看,doAdvance()首先将项目插入到buffer,然后获取下一个项目并将其传递给consumer.accept (…)

public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
    boolean hasNext = doAdvance();
    if (hasNext)
        consumer.accept(buffer.get(nextToConsume));
    return hasNext;
}
Run Code Online (Sandbox Code Playgroud)

但是,我没有弄清楚 this 的需要buffer

在这种情况下,为什么不简单地将 的 …

java java-8 java-stream

7
推荐指数
3
解决办法
906
查看次数

在Stream中"消耗"后的Spliterator状态

我认为我遇到了一个问题,我做了一个假设:如果一个分裂者的项目没有被一个流消耗,分裂者仍然可以前进到它.看来情况并非如此.

这里有一些代码来演示:

import java.util.Spliterator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * Created by dsmith on 7/21/15.
 */
public class SpliteratorTest {
    public static void main(String[] args) {
        System.out.println("Test 1");
        test1();

        System.out.println("Test 2");
        test2();
    }

    public static void test1() {
        final Spliterator<String> spliterator1 = Stream.of("a", "b", "c", "d", "e", "f").spliterator();

        StreamSupport.stream(spliterator1, false).
                limit(3).
                collect(Collectors.toList());

        System.out.println("spliterator1.estimateSize() = " + spliterator1.estimateSize());
    }

    public static void test2() {
        final Spliterator<String> spliterator1 = Stream.of("a", "b", "c", "d", "e", "f").spliterator();
        final Spliterator<String> spliterator2 …
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream

6
推荐指数
1
解决办法
121
查看次数

使用流来查找列表列表中的对象

我正在尝试编写一个方法,在列表列表中查找对象的索引并利用并行性.这是我的代码.

// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present.
public static int[] indices(List<? extends List<?>> lists, Object o) {
    return IntStream.range(0, lists.size())
                    .boxed()
                    .flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j}))
                    .parallel()
                    .filter(a -> {
                        System.out.println(Arrays.toString(a));     // For testing only
                        return Objects.equals(o, lists.get(a[0]).get(a[1]));
                    })
                    .findAny()
                    .orElse(null);
}
Run Code Online (Sandbox Code Playgroud)

当我运行以下代码时

List<List<String>> lists = Arrays.asList(
        Arrays.asList("A", "B", "C"),
        Arrays.asList("D", "E", "F", "G"),
        Arrays.asList("H", "I"),
        Collections.nCopies(5, "J")
);
System.out.println("Indices are " + Arrays.toString(indices(lists, "J")));
Run Code Online (Sandbox Code Playgroud)

输出是这样的

[0, 0] …
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream

6
推荐指数
1
解决办法
367
查看次数

在Spliterator的.tryAdance()的实现中,是否有任何危险来执行.accept()多个元素?

javadoc Spliterator提到:

Spliterator可以单独遍历元素(tryAdvance())或顺序遍历元素(forEachRemaining()).

然后我们转到javadoc,tryAdvance()其中说:

如果存在剩余元素,则对其执行给定操作,返回true; else返回false.

也许我在某个地方误读,但对我来说似乎只要剩下一个或多个元素,Consumer作为一个参数应该只.accept()返回前的每个参数true,如果,比方说,我有两个参数立即可用,那么我不能:

action.accept(arg1);
action.accept(arg2);
return true;
Run Code Online (Sandbox Code Playgroud)

这个项目中,我重写了广泛的第一个分裂器,现在它读取:

// deque is a Deque<Iterator<T>>

@Override
public boolean tryAdvance(final Consumer<? super T> action)
{
    Iterator<T> iterator;
    T element;

    while (!deque.isEmpty()) {
        iterator = deque.removeFirst();
        while (iterator.hasNext()) {
            element = iterator.next();
            deque.add(fn.apply(element));
            action.accept(element);
        }
    }
    return false;
}
Run Code Online (Sandbox Code Playgroud)

简而言之,我action接受所有参数,然后返回false ...而测试虽然很简单,但仍然成功(链接).

请注意,.trySplit()始终返回null; 分裂器具有特征DISTINCT,ORDERED并且NONNULL …

java java-8 spliterator

6
推荐指数
1
解决办法
292
查看次数

如何在不阻塞的情况下将 CompletableFuture&lt;Stream&lt;T&gt;&gt; 转换为 Stream&lt;T&gt;

我正在使用Async Http Client 库(使用 Netty)向 RESTful API 发出异步 Http Get 请求。由于我想保留非阻塞行为,因此我将CompletableFuture<T>作为 Http Get 请求的结果返回的实例。因此,在 RESTful API 端点返回一个 Json 数组的地方,我返回一个CompletableFuture<T[]>.

然而,根据 Erik Meijer 对编程中的四种基本效果所做的分类,我认为这Stream<T>更适合于发出异步 Http Get 请求并返回 Json 数组的 Java 方法的结果。在这种情况下,我们可以将Stream<T>视为Observable<T>等价物,它是返回许多值的异步计算结果

所以,考虑到这resp持有响应,那么我可以得到CompletableFuture<Stream<T>>如下:

 CompletableFuture<T[]> resp = …
 return resp.thenApply(Arrays::stream);
Run Code Online (Sandbox Code Playgroud)

但是,我想知道如何将 the 转换CompletableFuture<Stream<T>> resp为 a Stream<T>,而无需等待计算完成(即我不想在get()调用时阻塞)?

我希望得到与以下表达式相同的结果,但不阻塞get()

return resp.thenApply(Arrays::stream).get();
Run Code Online (Sandbox Code Playgroud)

lambda asynchronous java-8 java-stream completable-future

6
推荐指数
1
解决办法
675
查看次数

Java流在终端点执行的操作顺序

我一直试图从官方Java文档中找到明确的合同,关于Java流的顺序,一旦调用终端操作,就处理元素并调用中间操作.

例如,让我们看看这些使用Java流版本和普通迭代版本的示例(两者都产生相同的结果).

例1:

    List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5);
    Function<Integer, Integer> map1 = i -> i;
    Predicate<Integer> f1 = i -> i > 2;

    public int findFirstUsingStreams(List<Integer> ints){
        return ints.stream().map(map1).filter(f1).findFirst().orElse(-1);
    }

    public int findFirstUsingLoopV1(List<Integer> ints){
        for (int i : ints){
            int mappedI = map1.apply(i);
            if ( f1.test(mappedI) ) return mappedI;
        }
        return -1;
    }

    public int findFirstUsingLoopV2(List<Integer> ints){
        List<Integer> mappedInts = new ArrayList<>( ints.size() );

        for (int i : ints){
            int mappedI = map1.apply(i);
            mappedInts.add(mappedI);
        } …
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream

6
推荐指数
1
解决办法
306
查看次数