Tag*_*eev 17 java parallel-processing java-8 java-stream
通常,并不十分清楚并行流如何将输入分成块以及块连接的顺序.有没有办法可视化任何流源的整个过程,以更好地了解正在发生的事情?假设我创建了一个这样的流:
Stream<Integer> stream = IntStream.range(0, 100).boxed().parallel();
Run Code Online (Sandbox Code Playgroud)
我想看到一些树状的结构:
[0..99]
_____/ \_____
| |
[0..49] [50..99]
__/ \__ __/ \__
| | | |
[0..24] [25..49] [50..74] [75..99]
Run Code Online (Sandbox Code Playgroud)
这意味着整个输入范围[0..99]被拆分为范围,[0..49]而[50..99]范围又进一步分裂.当然这样的图应该反映Stream API的实际工作,所以如果我用这样的流执行一些实际操作,则应该以相同的方式执行拆分.
Tag*_*eev 18
当前流API实现使用收集器组合器以与先前拆分的方式完全相同的方式组合中间结果.另外,拆分策略取决于源和公共池并行水平,但不依赖于使用精确还原操作(同为reduce,collect,forEach,count,等).依靠这一点,创建可视化收集器并不是很困难:
public static Collector<Object, ?, List<String>> parallelVisualize() {
class Range {
private String first, last;
private Range left, right;
void accept(Object obj) {
if (first == null)
first = obj.toString();
else
last = obj.toString();
}
Range combine(Range that) {
Range p = new Range();
p.first = first == null ? that.first : first;
p.last = Stream
.of(that.last, that.first, this.last, this.first)
.filter(Objects::nonNull).findFirst().orElse(null);
p.left = this;
p.right = that;
return p;
}
String pad(String s, int left, int len) {
if (len == s.length())
return s;
char[] result = new char[len];
Arrays.fill(result, ' ');
s.getChars(0, s.length(), result, left);
return new String(result);
}
public List<String> finish() {
String cur = toString();
if (left == null) {
return Collections.singletonList(cur);
}
List<String> l = left.finish();
List<String> r = right.finish();
int len1 = l.get(0).length();
int len2 = r.get(0).length();
int totalLen = len1 + len2 + 1;
int leftAdd = 0;
if (cur.length() < totalLen) {
cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
} else {
leftAdd = (cur.length() - totalLen) / 2;
totalLen = cur.length();
}
List<String> result = new ArrayList<>();
result.add(cur);
char[] dashes = new char[totalLen];
Arrays.fill(dashes, ' ');
Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
+ leftAdd, '_');
int mid = totalLen / 2;
dashes[mid] = '/';
dashes[mid + 1] = '\\';
result.add(new String(dashes));
Arrays.fill(dashes, ' ');
dashes[len1 / 2 + leftAdd] = '|';
dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
result.add(new String(dashes));
int maxSize = Math.max(l.size(), r.size());
for (int i = 0; i < maxSize; i++) {
String lstr = l.size() > i ? l.get(i) : String.format("%"
+ len1 + "s", "");
String rstr = r.size() > i ? r.get(i) : String.format("%"
+ len2 + "s", "");
result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
}
return result;
}
public String toString() {
if (first == null)
return "(empty)";
else if (last == null)
return "[" + first + "]";
return "[" + first + ".." + last + "]";
}
}
return Collector.of(Range::new, Range::accept, Range::combine,
Range::finish);
}
Run Code Online (Sandbox Code Playgroud)
这是使用4核机器的这个收集器获得的一些有趣的结果(结果将在机器上具有不同数量的不同availableProcessors()).
拆分简单范围:
IntStream.range(0, 100)
.boxed().parallel().collect(parallelVisualize())
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
甚至分成16个任务:
[0..99]
___________________________________/\________________________________
| |
[0..49] [50..99]
_________________/\______________ _________________/\________________
| | | |
[0..24] [25..49] [50..74] [75..99]
________/\_____ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99]
___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
Run Code Online (Sandbox Code Playgroud)
拆分两个流串联:
IntStream
.concat(IntStream.range(0, 10), IntStream.range(10, 100))
.boxed().parallel().collect(parallelVisualize())
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
如您所见,首先拆分取消连接流:
[0..99]
_______________________________________________________________________/\_____
| |
[0..9] [10..99]
__/\__ ___________________________________/\__________________________________
| | | |
[0..4] [5..9] [10..54] [55..99]
_________________/\________________ _________________/\________________
| | | |
[10..31] [32..54] [55..76] [77..99]
________/\_______ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[10..20] [21..31] [32..42] [43..54] [55..65] [66..76] [77..87] [88..99]
___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[10..14] [15..20] [21..25] [26..31] [32..36] [37..42] [43..48] [49..54] [55..59] [60..65] [66..70] [71..76] [77..81] [82..87] [88..93] [94..99]
Run Code Online (Sandbox Code Playgroud)
在串联之前执行中间操作(boxed())的两个流连接的拆分:
Stream.concat(IntStream.range(0, 50).boxed().parallel(), IntStream.range(50, 100).boxed())
.collect(parallelVisualize())
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
如果其中一个输入流在连接之前没有变为并行模式,则它根本拒绝拆分:
[0..99]
___/\_________________________________
| |
[0..49] [50..99]
_________________/\______________
| |
[0..24] [25..49]
________/\_____ ________/\_______
| | | |
[0..11] [12..24] [25..36] [37..49]
___/\_ ___/\___ ___/\___ ___/\___
| | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49]
Run Code Online (Sandbox Code Playgroud)
拆分平面图:
Stream.of(0, 50)
.flatMap(start -> IntStream.range(start, start+50).boxed().parallel())
.parallel().collect(parallelVisualize())
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
平面映射从不在嵌套流内并行化:
[0..99]
____/\__
| |
[0..49] [50..99]
Run Code Online (Sandbox Code Playgroud)
来自7000个元素的未知大小的迭代器的流(请参阅上面的答案):
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
IntStream.range(0, 7000).iterator(),
Spliterator.ORDERED), true)
.collect(parallelVisualize()).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
分裂真的很糟糕,每个人都在等待最大的部分[3072..6143]:
[0..6999]
_______________________/\___
| |
[0..1023] [1024..6999]
________________/\____
| |
[1024..3071] [3072..6999]
_________/\_____
| |
[3072..6143] [6144..6999]
___/\____
| |
[6144..6999] (empty)
Run Code Online (Sandbox Code Playgroud)
已知大小的迭代器源:
StreamSupport
.stream(Spliterators.spliterator(IntStream.range(0, 7000)
.iterator(), 7000, Spliterator.ORDERED), true)
.collect(parallelVisualize()).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
提供尺寸可以更好地解锁进一步的分裂:
[0..6999]
______________________________________________________________________________________________/\________
| |
[0..1023] [1024..6999]
_____/\__ ____________________________________________________________________/\________________________
| | | |
[0..511] [512..1023] [1024..3071] [3072..6999]
____________/\___________ ________________/\__________________________________________________
| | | |
[1024..2047] [2048..3071] [3072..6143] [6144..6999]
_____/\_____ _____/\_____ _________________________/\________________________ ___/\___________
| | | | | | | |
[1024..1535] [1536..2047] [2048..2559] [2560..3071] [3072..4607] [4608..6143] [6144..6999] (empty)
____________/\___________ ____________/\___________ _____/\_____
| | | | | |
[3072..3839] [3840..4607] [4608..5375] [5376..6143] [6144..6571] [6572..6999]
_____/\_____ _____/\_____ _____/\_____ _____/\_____
| | | | | | | |
[3072..3455] [3456..3839] [3840..4223] [4224..4607] [4608..4991] [4992..5375] [5376..5759] [5760..6143]
Run Code Online (Sandbox Code Playgroud)
这种收集器的进一步改进可以生成图形图像(如svg),跟踪处理每个节点的线程,显示每个组的元素数量等等.如果你愿意,可以使用它.
Hol*_*ger 10
我想通过一个解决方案来增强Tagir的优秀答案,该解决方案用于监视源端的分割,甚至是中间操作(当前流API实现强加了一些限制):
public static <E> Stream<E> proxy(Stream<E> src) {
Class<Stream<E>> sClass=(Class)Stream.class;
Class<Spliterator<E>> spClass=(Class)Spliterator.class;
return proxy(src, sClass, spClass, StreamSupport::stream);
}
public static IntStream proxy(IntStream src) {
return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream);
}
public static LongStream proxy(LongStream src) {
return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream);
}
public static DoubleStream proxy(DoubleStream src) {
return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream);
}
static final Object EMPTY=new StringBuilder("empty");
static <E,S extends BaseStream<E,S>, Sp extends Spliterator<E>> S proxy(
S src, Class<S> sc, Class<Sp> spc, BiFunction<Sp,Boolean,S> f) {
final class Node<T> implements InvocationHandler,Runnable,
Consumer<Object>, IntConsumer, LongConsumer, DoubleConsumer {
final Class<? extends Spliterator> type;
Spliterator<T> src;
Object first=EMPTY, last=EMPTY;
Node<T> left, right;
Object currConsumer;
public Node(Spliterator<T> src, Class<? extends Spliterator> type) {
this.src = src;
this.type=type;
}
private void value(Object t) {
if(first==EMPTY) first=t;
last=t;
}
public void accept(Object t) {
value(t); ((Consumer)currConsumer).accept(t);
}
public void accept(int t) {
value(t); ((IntConsumer)currConsumer).accept(t);
}
public void accept(long t) {
value(t); ((LongConsumer)currConsumer).accept(t);
}
public void accept(double t) {
value(t); ((DoubleConsumer)currConsumer).accept(t);
}
public void run() {
System.out.println();
finish().forEach(System.out::println);
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Node<T> curr=this; while(curr.right!=null) curr=curr.right;
if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) {
curr.currConsumer=args[0];
args[0]=curr;
}
if(method.getName().equals("trySplit")) {
Spliterator s=curr.src.trySplit();
if(s==null) return null;
Node<T> pfx=new Node<>(s, type);
pfx.left=curr.left; curr.left=pfx;
curr.right=new Node<>(curr.src, type);
src=null;
return pfx.create();
}
return method.invoke(curr.src, args);
}
Object create() {
return Proxy.newProxyInstance(null, new Class<?>[]{type}, this);
}
String pad(String s, int left, int len) {
if (len == s.length())
return s;
char[] result = new char[len];
Arrays.fill(result, ' ');
s.getChars(0, s.length(), result, left);
return new String(result);
}
public List<String> finish() {
String cur = toString();
if (left == null) {
return Collections.singletonList(cur);
}
List<String> l = left.finish();
List<String> r = right.finish();
int len1 = l.get(0).length();
int len2 = r.get(0).length();
int totalLen = len1 + len2 + 1;
int leftAdd = 0;
if (cur.length() < totalLen) {
cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
} else {
leftAdd = (cur.length() - totalLen) / 2;
totalLen = cur.length();
}
List<String> result = new ArrayList<>();
result.add(cur);
char[] dashes = new char[totalLen];
Arrays.fill(dashes, ' ');
Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
+ leftAdd, '_');
int mid = totalLen / 2;
dashes[mid] = '/';
dashes[mid + 1] = '\\';
result.add(new String(dashes));
Arrays.fill(dashes, ' ');
dashes[len1 / 2 + leftAdd] = '|';
dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
result.add(new String(dashes));
int maxSize = Math.max(l.size(), r.size());
for (int i = 0; i < maxSize; i++) {
String lstr = l.size() > i ? l.get(i) : String.format("%"
+ len1 + "s", "");
String rstr = r.size() > i ? r.get(i) : String.format("%"
+ len2 + "s", "");
result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
}
return result;
}
private Object first() {
if(left==null) return first;
Object o=left.first();
if(o==EMPTY) o=right.first();
return o;
}
private Object last() {
if(right==null) return last;
Object o=right.last();
if(o==EMPTY) o=left.last();
return o;
}
public String toString() {
Object o=first(), p=last();
return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]");
}
}
Node<E> n=new Node<>(src.spliterator(), spc);
Sp sp=(Sp)Proxy.newProxyInstance(null, new Class<?>[]{n.type}, n);
return f.apply(sp, true).onClose(n);
}
Run Code Online (Sandbox Code Playgroud)
它允许使用代理包装spliterator,该代理将监视拆分操作和遇到的对象.块处理的逻辑类似于Tagir,事实上,我复制了他的结果打印例程.
您可以传入流的源或已附加相同操作的流.(在后一种情况下,您应该尽早申请.parallel()流).正如Tagir所解释的,在大多数情况下,拆分行为取决于源和配置的并行性,因此,在大多数情况下,监视中间状态可能会更改值,但不会更改已处理的块:
try(IntStream is=proxy(IntStream.range(0, 100).parallel())) {
is.filter(i -> i/20%2==0)
.mapToObj(ix->"\""+ix+'"')
.forEach(s->{});
}
Run Code Online (Sandbox Code Playgroud)
将打印
[0..99]
___________________________________/\________________________________
| |
[0..49] [50..99]
_________________/\______________ _________________/\________________
| | | |
[0..24] [25..49] [50..74] [75..99]
________/\_____ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99]
___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
Run Code Online (Sandbox Code Playgroud)
而
try(Stream<String> s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0)
.mapToObj(ix->"\""+ix+'"'))) {
s.forEach(str->{});
}
Run Code Online (Sandbox Code Playgroud)
将打印
["0".."99"]
___________________________________________/\___________________________________________
| |
["0".."49"] ["50".."99"]
____________________/\______________________ ______________________/\___________________
| | | |
["0".."19"] ["40".."49"] ["50".."59"] ["80".."99"]
____________/\_________ ____________/\______ _______/\___________ ____________/\________
| | | | | | | |
["0".."11"] ["12".."19"] (empty) ["40".."49"] ["50".."59"] (empty) ["80".."86"] ["87".."99"]
_____/\___ _____/\_____ ___/\__ _____/\_____ _____/\_____ ___/\__ _____/\__ _____/\_____
| | | | | | | | | | | | | | | |
["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]
Run Code Online (Sandbox Code Playgroud)
正如我们在这里看到的,我们正在监视结果,.filter(…).mapToObj(…)但是块明确地由源确定,可能根据过滤器的条件在下游产生空块.
请注意,我们可以将源监控与Tagir的收集器监控结合起来:
try(IntStream s=proxy(IntStream.range(0, 100))) {
s.parallel().filter(i -> i/20%2==0)
.boxed().collect(parallelVisualize())
.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
这将打印(请注意collect首先打印输出):
[0..99]
________________________________/\_______________________________
| |
[0..49] [50..99]
________________/\______________ _______________/\_______________
| | | |
[0..19] [40..49] [50..59] [80..99]
________/\_____ ________/\______ _______/\_______ ________/\_____
| | | | | | | |
[0..11] [12..19] (empty) [40..49] [50..59] (empty) [80..86] [87..99]
___/\_ ___/\___ ___/\__ ___/\___ ___/\___ ___/\__ ___/\_ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99]
[0..99]
___________________________________/\________________________________
| |
[0..49] [50..99]
_________________/\______________ _________________/\________________
| | | |
[0..24] [25..49] [50..74] [75..99]
________/\_____ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99]
___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
Run Code Online (Sandbox Code Playgroud)
我们可以清楚地看到处理的块如何匹配,但是在过滤之后,一些块具有较少的元素,其中一些是完全空的.
这是展示的地方,两种监测方式可以产生显着差异:
try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) {
is.boxed()
.collect(parallelVisualize())
.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
[0.0..99.0]
___________________________________________________/\________________________________________________
| |
[0.0..49.0] [50.0..99.0]
_________________________/\______________________ _________________________/\________________________
| | | |
[0.0..24.0] [25.0..49.0] [50.0..74.0] [75.0..99.0]
____________/\_________ ____________/\___________ ____________/\___________ ____________/\___________
| | | | | | | |
[0.0..11.0] [12.0..24.0] [25.0..36.0] [37.0..49.0] [50.0..61.0] [62.0..74.0] [75.0..86.0] [87.0..99.0]
_____/\___ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____
| | | | | | | | | | | | | | | |
[0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0]
[0.0..10239.0]
_____________________________/\_____
| |
[0.0..1023.0] [1024.0..10239.0]
____________________/\_______
| |
[1024.0..3071.0] [3072.0..10239.0]
____________/\______
| |
[3072.0..6143.0] [6144.0..10239.0]
___/\_______
| |
[6144.0..10239.0] (empty)
Run Code Online (Sandbox Code Playgroud)
这证明了Tagir已经解释过的,未知大小的流分裂得很差,甚至事实limit(…)提供了良好估计的可能性(实际上,无限+限制在理论上是可预测的),实现并没有利用它.
使用批量大小将源拆分为块1024,1024在每次拆分后增加,创建超出范围的块limit.我们还可以看到每次分离前缀的方式.
但是当我们查看终端分割输出时,我们可以看到这些多余的块之间已经被丢弃,并且第一个块的另一个分裂发生了.由于这个块是由第一个拆分中的默认实现填充的中间数组的后端,我们在源处没有注意到它,但我们可以在终端操作中看到该数组已被拆分(不出所料)很平衡.
所以我们需要两种监控方式来全面了解......
| 归档时间: |
|
| 查看次数: |
1281 次 |
| 最近记录: |