环境:Ubuntu x86_64(14.10),Oracle JDK 1.8u25
我尝试使用并行流Files.lines()
但我想要.skip()
第一行(它是带有标题的CSV文件).所以我试着这样做:
try (
final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
.skip(1L).parallel();
) {
// etc
}
Run Code Online (Sandbox Code Playgroud)
但是后来一列未能解析成一个int ...
所以我尝试了一些简单的代码.文件问题很简单:
$ cat info.csv
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$
Run Code Online (Sandbox Code Playgroud)
代码同样简单:
public static void main(final String... args)
{
final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
我系统地得到以下结果(好吧,我只运行了大约20次):
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
Run Code Online (Sandbox Code Playgroud)
我在这里错过了什么?
编辑似乎问题或误解比这更根深蒂固(下面的两个例子是由FreeNode的## java编写的):
public static void main(final String... args)
{
new BufferedReader(new StringReader("Hello\nWorld")).lines()
.skip(1L).parallel()
.forEach(System.out::println);
final Iterator<String> iter
= Arrays.asList("Hello", "World").iterator();
final Spliterator<String> spliterator
= Spliterators.spliteratorUnknownSize(iter, …
Run Code Online (Sandbox Code Playgroud) 由于我使用了很多流,其中一些处理大量数据,我认为预先分配基于集合的收集器大小是一个好主意,以防止随着集合的增长而进行昂贵的重新分配.所以我想出了这个,以及类似的其他集合类型:
public static <T> Collector<T, ?, Set<T>> toSetSized(int initialCapacity) {
return Collectors.toCollection(()-> new HashSet<>(initialCapacity));
}
Run Code Online (Sandbox Code Playgroud)
像这样使用
Set<Foo> fooSet = myFooStream.collect(toSetSized(100000));
Run Code Online (Sandbox Code Playgroud)
我担心的是,实现Collectors.toSet()
设置一个没有的Characteristics
枚举Collectors.toCollection()
:Characteristics.UNORDERED
.没有方便的变体Collectors.toCollection()
来设置超出默认值的所需特性,并且Collectors.toSet()
由于可见性问题我无法复制实现.所以,为了设置这个UNORDERED
特性我不得不这样做:
static<T> Collector<T,?,Set<T>> toSetSized(int initialCapacity){
return Collector.of(
() -> new HashSet<>(initialCapacity),
Set::add,
(c1, c2) -> {
c1.addAll(c2);
return c1;
},
new Collector.Characteristics[]{IDENTITY_FINISH, UNORDERED});
}
Run Code Online (Sandbox Code Playgroud)
所以这是我的问题:1.这是我唯一的选择,为简单的自定义toSet()
2 创建无序收集器.如果我希望这个理想地工作,是否有必要应用无序特征?我在这个论坛上读到了一个问题,在那里我了解到无序特征不再向后传播到Stream中.它仍然有用吗?
假设我有这个自定义收藏家:
public class CustomToListCollector<T> implements Collector<T, List<T>, List<T>> {
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
@Override
public BinaryOperator<List<T>> combiner() {
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
}
Run Code Online (Sandbox Code Playgroud)
这正是Collectors#toList实现的一个细微差别:还添加了UNORDERED特性.
我会假设运行此代码:
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
for (int i = 0; …
Run Code Online (Sandbox Code Playgroud) 我已经通过像前几次问题了在Java流相遇为了保鲜,这个答案Brian Goetz撰写,还有的javadoc的Stream.reduce(),和java.util.stream包的javadoc,但我仍然可以” t掌握以下内容:
采取这段代码:
public static void main(String... args) {
final String[] alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("");
System.out.println("Alphabet: ".concat(Arrays.toString(alphabet)));
System.out.println(new HashSet<>(Arrays.asList(alphabet))
.parallelStream()
.unordered()
.peek(System.out::println)
.reduce("", (a,b) -> a + b, (a,b) -> a + b));
}
Run Code Online (Sandbox Code Playgroud)
为什么减少总是*保留相遇顺序?
我已经解决了相关问题,例如如何确保 java8 流中的处理顺序?,我仍然不完全清楚输出元素的顺序。因此,请澄清我的以下疑问。
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
new ArrayList<>(Arrays.asList(intArray));
listOfIntegers
.parallelStream()
.unordered()
.forEachOrdered(e -> System.out.print(e + " "));
Run Code Online (Sandbox Code Playgroud)
我认为至少在理论上(或根据 java 规范)它可以按比 1、2、3、4、5、6、7、8 的随机顺序打印。我说得对吗?
还有一个相关的问题——相遇顺序保留的决定是在什么执行点做出的?更准确地说 - 整个流管道 ORDER 特性的评估是否在执行开始之前通过源、中间操作和终端操作的特性完成?
当我最初问这个问题,2015年2月,在报道的行为所链接的问题是反直觉的,但那种规范允许(尽管在文档一些小的矛盾).
然而,Tagir Valeev在2015年6月问了一个新问题,我认为他清楚地证明了这个问题中报告的行为实际上是一个错误.Brain Goetz回答了他的问题,并承认,当一个终端操作触发时,不会阻止on 的特性的反向传播是一个错误,这个终端操作不会被迫尊重元素的遭遇顺序(例如) .此外,在他自己的回答的评论中,他分享了JDK的错误跟踪系统中发布的问题的链接.UNORDERED
Stream
skip()
forEach()
该问题的状态现已解决,其修复版本为9,这意味着该修复程序将在JDK9中可用.但是,它也被反向移植到JDK8 update 60,build 22.
所以从JDK8u60-b22开始,这个问题再也没有意义了,因为现在skip()
表现得像直觉一样,甚至在并行流上也是如此.
我原来的问题是......
最近我和一些同事讨论了这件事.我说skip()
在并行流上使用它是没用的,因为它似乎没有一个很好的用例.他们告诉我有关性能提升,FJ池处理,jvm可用的核心数量等等,但他们无法给我任何实际的使用示例.
skip()
并行流是否存在良好的用例?