使用流和减少消费者链的订单保证

Nam*_*man 6 java reduce consumer java-8 java-stream

因此,在当前场景中,我们有一组 API,如下所示:

Consumer<T> start();
Consumer<T> performDailyAggregates();
Consumer<T> performLastNDaysAggregates();
Consumer<T> repopulateScores();
Consumer<T> updateDataStore();
Run Code Online (Sandbox Code Playgroud)

通过这些,我们的调度程序之一执行任务,例如

private void performAllTasks(T data) {
    start().andThen(performDailyAggregates())
            .andThen(performLastNDaysAggregates())
            .andThen(repopulateScores())
            .andThen(updateDataStore())
            .accept(data);
}
Run Code Online (Sandbox Code Playgroud)

在回顾这一点时,我想到了一个更灵活的实现1来执行如下任务:

// NOOP in the context further stands for  'anything -> {}'
private void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    consumerList.reduce(NOOP, Consumer::andThen).accept(data);
}
Run Code Online (Sandbox Code Playgroud)

现在让我想到的一点是 Javadoc 清楚地指出

accumulator - 用于组合两个值的关联的、无干扰的、无状态的函数

接下来我在想如何确保 java8 流中的处理顺序?订购(处理顺序是一样的遭遇顺序)!

好的,流产生了 List将被排序,除非在以下实现parallel之前创建流reduce2

private void performAllTasks(List<Consumer<T>> consumerList, T data) {
    consumerList.stream().reduce(NOOP, Consumer::andThen).accept(data);
}
Run Code Online (Sandbox Code Playgroud)

问:这个假设2是否成立?是否可以保证始终按照原始代码拥有的顺序执行消费者?

Q. 是否有可能以某种方式暴露 向被调用者1以执行任务?

Hol*_*ger 9

正如Andreas 指出的Consumer::andThen是一个关联函数,虽然由此产生的消费者可能具有不同的内部结构,但它仍然是等价的。

但是让我们调试一下

public static void main(String[] args) {
    performAllTasks(IntStream.range(0, 10)
        .mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
    private final Consumer<Object> first, second;
    private final boolean leaf;
    DebuggableConsumer(String name) {
        this(x -> System.out.println(name), x -> {}, true);
    }
    DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
        first = a; second = b;
        leaf = l;
    }
    public void accept(Object t) {
        first.accept(t);
        second.accept(t);
    }
    @Override public Consumer<Object> andThen(Consumer<? super Object> after) {
        return new DebuggableConsumer(this, after, false);
    }
    public @Override String toString() {
        if(leaf) return first.toString();
        return toString(new StringBuilder(200), 0, 0).toString();
    }
    private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
        int myHandle = sb.length()-2;
        sb.append(leaf? first: "combined").append('\n');
        if(!leaf) {
            int nPreS=sb.length();
            ((DebuggableConsumer)first).toString(
                sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
            nPreS=sb.length();
            sb.append(sb, preS, preEnd);
            int lastItemHandle=sb.length();
            ((DebuggableConsumer)second).toString(sb.append("  "), nPreS, sb.length());
            sb.setCharAt(lastItemHandle, '\u2514');
        }
        if(myHandle>0) {
            sb.setCharAt(myHandle, '\u251c');
            sb.setCharAt(myHandle+1, '\u2500');
        }
        return sb;
    }
}
Run Code Online (Sandbox Code Playgroud)

将打印

public static void main(String[] args) {
    performAllTasks(IntStream.range(0, 10)
        .mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
    private final Consumer<Object> first, second;
    private final boolean leaf;
    DebuggableConsumer(String name) {
        this(x -> System.out.println(name), x -> {}, true);
    }
    DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
        first = a; second = b;
        leaf = l;
    }
    public void accept(Object t) {
        first.accept(t);
        second.accept(t);
    }
    @Override public Consumer<Object> andThen(Consumer<? super Object> after) {
        return new DebuggableConsumer(this, after, false);
    }
    public @Override String toString() {
        if(leaf) return first.toString();
        return toString(new StringBuilder(200), 0, 0).toString();
    }
    private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
        int myHandle = sb.length()-2;
        sb.append(leaf? first: "combined").append('\n');
        if(!leaf) {
            int nPreS=sb.length();
            ((DebuggableConsumer)first).toString(
                sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
            nPreS=sb.length();
            sb.append(sb, preS, preEnd);
            int lastItemHandle=sb.length();
            ((DebuggableConsumer)second).toString(sb.append("  "), nPreS, sb.length());
            sb.setCharAt(lastItemHandle, '\u2514');
        }
        if(myHandle>0) {
            sb.setCharAt(myHandle, '\u251c');
            sb.setCharAt(myHandle+1, '\u2500');
        }
        return sb;
    }
}
Run Code Online (Sandbox Code Playgroud)

而将减少代码更改为

private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
Run Code Online (Sandbox Code Playgroud)

在我的机器上打印

0
1
2
3
4
5
6
7
8
9
combined
??combined
? ??combined
? ? ??combined
? ? ? ??combined
? ? ? ? ??combined
? ? ? ? ? ??combined
? ? ? ? ? ? ??combined
? ? ? ? ? ? ? ??combined
? ? ? ? ? ? ? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
? ? ? ? ? ? ? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
? ? ? ? ? ? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
? ? ? ? ? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
? ? ? ? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
? ? ? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
? ? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
? ? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
? ??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
??SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a
Run Code Online (Sandbox Code Playgroud)

说明了 Andreas 答案的要点,但也突出了一个完全不同的问题。您可以通过使用将其最大化,例如IntStream.range(0, 100)在示例代码中。

并行评估的结果实际上比顺序评估要好,因为顺序评估会创建一个不平衡的树。当接受任意的消费者流时,这可能是一个实际的性能问题,甚至StackOverflowError在尝试评估结果消费者时会导致问题。

对于任何非平凡数量的消费者,您实际上需要一个平衡的消费者树,但为此使用并行流并不是正确的解决方案,因为 a)Consumer::andThen是一种廉价的操作,并没有从并行评估中获得真正的好处,并且 b) 平衡将取决于不相关的属性,例如流源的性质和 CPU 内核的数量,这些属性决定了减少何时回退到顺序算法。

当然,最简单的解决方案是

private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
    consumers.forEachOrdered(c -> c.accept(data));
}
Run Code Online (Sandbox Code Playgroud)

但是当你想构建一个复合物Consumer以供重复使用时,你可以使用

private static final int ITERATION_THRESHOLD = 16; // tune yourself

public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
    List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
    if(consumerList.isEmpty()) return t -> {};
    if(consumerList.size() == 1) return consumerList.get(0);
    if(consumerList.size() < ITERATION_THRESHOLD)
        return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
    return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
    if(end-start>2) {
        int mid=(start+end)>>>1;
        return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
    }
    T t = l.get(start++);
    if(start<end) t = f.apply(t, l.get(start));
    assert start==end || start+1==end;
    return t;
}
Run Code Online (Sandbox Code Playgroud)

Consumer当消费者数量超过阈值时,该代码将仅使用循环提供单个。对于大量消费者来说,这是最简单、最有效的解决方案,事实上,对于较小的消费者,您可以放弃所有其他方法,但仍然可以获得合理的性能……

请注意,这仍然不会阻碍消费者流的并行处理,如果他们的构建真的从中受益的话。

  • @Eugene,Naman 好吧,当我发表最后一条评论时,我已经在写答案了。当然,如果以前遇到过这样的问题,这会有所帮助。比如说 [`AWTEventMulticaster`](https://docs.oracle.com/javase/8/docs/api/java/awt/AWTEventMulticaster.html),即使在 Java 1.1 中,也有太多的侦听器遇到这样的问题。然后,当 Java 8 是新的并且我使用归约来构建评估树时,我偶然发现了它,所以我已经有了平衡归约代码的变体,只需要针对这个答案进行调整和测试。 (3认同)