Java 8流的无干扰要求

Fra*_*rra 5 java-8 java-stream

我是Java 8的初学者.

对于具有一致的Java流行为,非干扰非常重要.想象一下,我们处理大量数据流,并在此过程中更改源.结果将是不可预测的.这与流并行或顺序的处理模式无关.

可以修改源,直到调用语句终端操作.除此之外,在流执行完成之前不应修改源.因此,处理流源中的并发修改对于获得一致的流性能至关重要.

以上引文取自此处.

有人可以做一些简单的例子来阐明为什么改变流源会产生如此大的问题吗?

Eug*_*ene 8

那么oracle的例子在这里是不言自明的.首先是:

List<String> l = new ArrayList<>(Arrays.asList("one", "two"));
 Stream<String> sl = l.stream();
 l.add("three");
 String s = l.collect(Collectors.joining(" "));
Run Code Online (Sandbox Code Playgroud)

如果调用终端操作()之前l通过向其添加一个元素进行更改,则可以.但请注意,它由三个元素组成,而不是两个元素; 在您创建Stream via时.Collectors.joiningStreaml.stream()

另一方面这样做:

  List<String> list = new ArrayList<>();
  list.add("test");
  list.forEach(x -> list.add(x));
Run Code Online (Sandbox Code Playgroud)

将抛出一个,ConcurrentModificationException因为你无法改变源.

现在假设您有一个可以处理并发添加的基础源:

ConcurrentHashMap<String, Integer> cMap = new ConcurrentHashMap<>();
cMap.put("one", 1);
cMap.forEach((key, value) -> cMap.put(key + key, value + value));
System.out.println(cMap);
Run Code Online (Sandbox Code Playgroud)

输出应该在这里?当我运行它时,它是:

 {oneoneoneoneoneoneoneone=8, one=1, oneone=2, oneoneoneone=4}
Run Code Online (Sandbox Code Playgroud)

将键更改为zx(cMap.put("zx", 1)),结果现在是:

{zxzx=2, zx=1}
Run Code Online (Sandbox Code Playgroud)

结果不一致.

  • 我只是想到了`list.forEach(x - > list.add(x));`的歧义,如果它没有抛出`ConcurrentModificationException`,但通过与`ConcurrentHashMap进行比较来展示它是一个好主意`... (6认同)
  • 在没有其他线程进行更新的情况下,当容量增加时,CHM`forEach`将不会注意到新的状态.您可以尝试`ConcurrentHashMap <Integer,Integer> cMap = new ConcurrentHashMap <>(1000,1f); cMap.put(1,1); cMap.forEach((key,value) - > cMap.put(key + 1,0));`并播放初始容量.您将看到当容量增加时迭代停止(它向上舍入到下一个2的幂). (5认同)
  • 如果你想要超过65535次迭代,你必须使密钥适应CHM的`spread`函数的效果,例如`ConcurrentHashMap <Integer,Integer> cMap = new ConcurrentHashMap <>(200_000_000,1f); cMap.put(1,1); cMap.forEach((key,value) - > cMap.put(++ value ^(value >>> 16),value));`现在在我的机器上运行几分钟,已经消耗了几GB,不知道是否或什么时候会拯救...... (5认同)
  • @Didier L:[规范](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentMap.html#forEach-java.util.function.BiConsumer-)说这相当于循环`for((Map.Entry <K,V> entry:map.entrySet())action.accept(entry.getKey(),entry.getValue());`,这意味着行为应该[与*入口集迭代器一样弱*一致](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#entrySet--). (4认同)
  • @Eugene:嗯,你也可能在函数之间产生干扰,但是在Stream API中讨论过,通常会阻止"状态行为参数"和"行为参数中的副作用". (4认同)
  • 值得注意的是,`ConcurrentHashMap`允许这样做,因为它的迭代顺序是[弱一致的](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html #Weakly) - 虽然没有为`forEach()`明确指定 (3认同)
  • @Federico Peralta Schaffner:当您重写示例以使用`stream().forEach(...)`resp时,行为不会改变.`entrySet().stream().forEach(...)`而不是`forEach(...)`.这些只是干扰的简化示例.当您使用`CopyOnWriteArrayList`时,您没有干扰.甚至在[其类文档]中也有说明(https://docs.oracle.com/javase/8/docs/api/?java/util/concurrent/ CopyOnWriteArrayList.html)"*干扰是不可能的*". (3认同)
  • @Federico Peralta Schaffner:干扰是一个广义的术语,但OP提供了一个链接/引用,表明"*干扰数据源*"的意思,就像[stream的包文档](https:// docs. oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#NonInterference).请记住,Stream操作*是对源的单次迭代,取决于源的迭代策略. (2认同)
  • @Holger这是我缺少的确切信息:*Stream操作是对源的单次迭代,取决于源的迭代策略*.现在,考虑到这一点,一切都很有意义.感谢你和尤金! (2认同)