rge*_*man 21 java parallel-processing java-8 java-stream
我有一Record
节课:
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
Run Code Online (Sandbox Code Playgroud)
我创建了很多记录的大清单.仅第二和第五值,i / 10000
并且i
,将在后面使用的,由吸气剂getCategory2()
和getValue1()
分别.
List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}
Run Code Online (Sandbox Code Playgroud)
需要注意的是第一10,000条记录,有一个category2
的0
,那么接下来的1万人1
等,而value1
值0-114999顺序.
我创造了一个Stream
既是parallel
和sorted
.
Stream<Record> stream = list.stream()
.parallel()
.sorted(
//(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
)
//.parallel()
;
Run Code Online (Sandbox Code Playgroud)
我有一个ForkJoinPool
维护8
线程,这是我在PC上的核心数量.
ForkJoinPool pool = new ForkJoinPool(8);
Run Code Online (Sandbox Code Playgroud)
我使用这里描述ForkJoinPool
ForkJoinPool
的技巧将流处理任务提交给我自己而不是常见的.
List<Record> output = pool.submit(() ->
stream.collect(Collectors.toList()
)).get();
Run Code Online (Sandbox Code Playgroud)
我期望并行sorted
操作会尊重流的遭遇顺序,并且它将是一个稳定的排序,因为Spliterator
返回的ArrayList
是ORDERED
.
但是,List
output
按顺序打印出结果元素的简单代码表明情况并非如此.
for (Record record : output)
{
System.out.println(record.getValue1());
}
Run Code Online (Sandbox Code Playgroud)
输出,浓缩:
0
1
2
3
...
69996
69997
69998
69999
71875 // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000 // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062 // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999
Run Code Online (Sandbox Code Playgroud)
的size()
的output
是115000
,所有的元素都出现在那里,只是在顺序稍有不同.
所以我写了一些检查代码,看看它是否sort
稳定.如果它稳定,则所有value1
值应保持有序.此代码验证订单,打印任何差异.
int prev = -1;
boolean verified = true;
for (Record record : output)
{
int curr = record.getValue1();
if (prev != -1)
{
if (prev + 1 != curr)
{
System.out.println("Warning: " + prev + " followed by " + curr + "!");
verified = false;
}
}
prev = curr;
}
System.out.println("Verified: " + verified);
Run Code Online (Sandbox Code Playgroud)
输出:
Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false
Run Code Online (Sandbox Code Playgroud)
如果我执行以下任何操作,此情况仍然存在:
替换为ForkJoinPool
a ThreadPoolExecutor
.
ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
Run Code Online (Sandbox Code Playgroud)ForkJoinPool
通过Stream
直接处理使用common .
List<Record> output = stream.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)打电话parallel()
后拨打电话sorted
.
Stream<Record> stream = list.stream().sorted().parallel();
Run Code Online (Sandbox Code Playgroud)打电话parallelStream()
而不是stream().parallel()
.
Stream<Record> stream = list.parallelStream().sorted();
Run Code Online (Sandbox Code Playgroud)使用a排序Comparator
.请注意,此排序标准与我为Comparable
界面定义的"自然"顺序不同,尽管从开头按顺序开始,结果应该仍然相同.
Stream<Record> stream = list.stream().parallel().sorted(
(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
);
Run Code Online (Sandbox Code Playgroud)我只能得到这个保存遭遇订单,如果我不这样做就下列操作之一Stream
:
parallel()
.sorted
.有趣的是,parallel()
没有一种保留了秩序.
在上述两种情况中,输出为:
Verified: true
Run Code Online (Sandbox Code Playgroud)
我的Java版本是1.8.0_05.这种异常也发生在Ideone上,它似乎运行Java 8u25.
更新
在撰写本文时,我已将JDK升级到最新版本1.8.0_45,问题仍然没有改变.
题
结果List
(output
)中的记录顺序是不按顺序的,因为排序在某种程度上不稳定,因为遇到的顺序没有保留,或者其他原因?
当我创建并行流并对其进行排序时,如何确保保存遭遇顺序?
Stu*_*rks 11
Arrays.parallelSort
在某些情况下看起来不稳定.好眼力.流并行排序是按照实现的Arrays.parallelSort
,因此它也会影响流.这是一个简化的例子:
public class StableSortBug {
static final int SIZE = 50_000;
static class Record implements Comparable<Record> {
final int sortVal;
final int seqNum;
Record(int i1, int i2) { sortVal = i1; seqNum = i2; }
@Override
public int compareTo(Record other) {
return Integer.compare(this.sortVal, other.sortVal);
}
}
static Record[] genArray() {
Record[] array = new Record[SIZE];
Arrays.setAll(array, i -> new Record(i / 10_000, i));
return array;
}
static boolean verify(Record[] array) {
return IntStream.range(1, array.length)
.allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);
}
public static void main(String[] args) {
Record[] array = genArray();
System.out.println(verify(array));
Arrays.sort(array);
System.out.println(verify(array));
Arrays.parallelSort(array);
System.out.println(verify(array));
}
}
Run Code Online (Sandbox Code Playgroud)
在我的机器上(2核x 2线程),这将打印以下内容:
true
true
false
Run Code Online (Sandbox Code Playgroud)
当然,它应该打印true
三次.这是在当前的JDK 9开发版本上.如果它出现在迄今为止的所有JDK 8版本中,鉴于您的尝试,我不会感到惊讶.奇怪的是,减小大小或除数会改变行为.大小为20,000,除数为10,000是稳定的,大小为50,000,除数为1,000也是稳定的.似乎问题与比较相等和平行分割尺寸的足够大的值运行有关.
OpenJDK问题JDK-8076446涵盖了这个bug.
归档时间: |
|
查看次数: |
1562 次 |
最近记录: |