我是singleOrEmpty流媒体运营商的忠实粉丝.它不在std lib中,但我发现它非常有用.如果流只有一个值,则返回该值Optional.如果它没有值或多于一个值,则返回Optional.empty().
Optional<Int> value = someList.stream().{singleOrEmpty}
[] -> Optional.empty()
[1] -> Optional.of(1)
[1, 1] -> Optional.empty()
etc.
Run Code Online (Sandbox Code Playgroud)
我之前问了一个关于它的问题,而@ThomasJungblut 想出了这个伟大的实现:
public static <T> Optional<T> singleOrEmpty(Stream<T> stream) {
return stream.limit(2)
.map(Optional::ofNullable)
.reduce(Optional.empty(),
(a, b) -> a.isPresent() ^ b.isPresent() ? b : Optional.empty());
}
Run Code Online (Sandbox Code Playgroud)
唯一的问题是,你必须把它放在通话的开头
singleOrEmpty(someList.stream().filter(...).map(...))
Run Code Online (Sandbox Code Playgroud)
而不是顺序结束
someList.stream().filter().map().singleOrEmpty()
Run Code Online (Sandbox Code Playgroud)
这使得它比其他流机制更难阅读.
那么作为这个流处理的新手,有没有人有任何技巧可以singleOrEmpty在一系列流转换结束时建立一个短路机制?
我想将以下for语句转换为Java 8流(即Stream<Class<?>>).理想的解决方案将是很简单的,我可以很容易地适应它的各种遍历一个链表的情况(例如File.getParentFile(),Component.getParent()).
Class<?> clazz;
Object value;
value = ...;
for (clazz = value.getClass(); clazz != null; clazz = clazz.getSuperclass())
...
Run Code Online (Sandbox Code Playgroud)
我意识到创建流的几行代码不会比单个for语句简单.但是,流使for循环体更简单,因此需要流.
订购
流可能有也可能没有已定义的遭遇顺序.流是否具有遭遇顺序取决于源和中间操作.某些流源(例如List或数组)本质上是有序的,而其他流(例如HashSet)则不是.某些中间操作(例如sorted())可能会在其他无序流上强制执行遭遇顺序,而其他中间操作可能会呈现无序的有序流,例如BaseStream.unordered().此外,一些终端操作可以忽略遭遇顺序,例如forEach().
HashSet?unordered将在并行计算的每个流上的中间操作?我最近发现了一个错误
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.limit(20)
Run Code Online (Sandbox Code Playgroud)
被调用Spliterator.ofInt.tryAdvance超过20次。当我把它改成
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.sequential()
.limit(20)
Run Code Online (Sandbox Code Playgroud)
问题就消失了。为什么会出现这种情况?除了将并行流构建到 中之外,是否有任何方法可以在tryAdvance有副作用时实现对并行流的严格限制Spliterator?(这是为了测试一些返回无限流的方法,但测试需要达到最终结束,而不需要“循环 X 毫秒”构造的复杂性。)
我想在Java中将a转换Spliterator<T>为a List<T>。
这样做的最佳习惯用法是什么?我目前正在使用以下代码:
List<T> list = new ArrayList<>();
spliterator.forEachRemaining(list::add);
Run Code Online (Sandbox Code Playgroud)
有没有更简单/更快的方法?
我对我所有的研究感到有点困惑。我有一个名为 TabularResultSet 的自定义接口(为了示例,我已经淡化了它),它遍历本质上为表格的任何数据集。它有一个类似于迭代器的 next() 方法,它可以循环遍历 QueryResultSet、剪贴板中的选项卡式表格、CSV 等...
但是,我正在尝试创建一个 Spliterator 来包装我的 TabularResultSet 并轻松地将其转换为流。我无法想象一种安全的并行化方法,因为 TabularResultSet 可能会遍历 QueryResultSet,并且同时调用 next() 可能会造成严重破坏。我认为可以安全地完成并行化的唯一方法是让单个工作线程调用 next() ,并将数据传递给并行线程来处理它。
所以我认为并行化不是一个简单的选择。我如何在不并行化的情况下让这个东西流式传输?这是我到目前为止的工作...
public final class SpliteratorTest {
public static void main(String[] args) {
TabularResultSet rs = null; /* instantiate an implementation; */
Stream<TabularResultSet> rsStream = StreamSupport.stream(new TabularSpliterator(rs), false);
}
public static interface TabularResultSet {
public boolean next();
public List<Object> getData();
}
private static final class TabularSpliterator implements Spliterator<TabularResultSet> {
private final TabularResultSet rs;
public TabularSpliterator(TabularResultSet rs) {
this.rs = rs;
}
@Override …Run Code Online (Sandbox Code Playgroud) 我正在查看Spliterator的文档,根据它,Spliterator不是线程安全的:
尽管它们在并行算法中具有明显的实用性,但预计分裂器不是线程安全的.相反,使用分裂器的并行算法的实现应确保分裂器一次仅由一个线程使用.这通常很容易通过串行线程限制来实现,这通常是通过递归分解工作的典型并行算法的自然结果.
但是,在其进一步的文件中,其中陈述了对上述陈述的矛盾陈述:
源的结构干扰可以通过以下方式进行管理(大致降低合意性的顺序):
源管理并发修改.例如,java.util.concurrent.ConcurrentHashMap的键集是并发源.从源创建的Spliterator报告CONCURRENT的特征.
那么这是否意味着从线程安全的集合生成的Spliterator是线程安全的?这样对吗?
关于分裂者的问题乍一看并不简单.
在流中,.parallel()更改流处理的行为.但是我期望从顺序和并行流创建的分裂器是相同的.例如,在顺序流中,通常.trySplit()从不调用,而在并行流中,为了将拆分分裂器移交给另一个线程.
stream.spliterator()vs 之间的差异stream.parallel().spliterator():
它们可能具有不同的特征:
Stream.of(1L, 2L, 3L).limit(2); // ORDERED
Stream.of(1L, 2L, 3L).limit(2).parallel(); // SUBSIZED, SIZED, ORDERED
Run Code Online (Sandbox Code Playgroud)这里讨论了另一个无意义的流分裂器特征策略(并行似乎更好地计算):在java 8和java 9中理解深层分裂器特征
它们在拆分方面可能有不同的行为.trySplit():
Stream.of(1L, 2L, 3L); // NON NULL
Stream.of(1L, 2L, 3L).limit(2); // NULL
Stream.of(1L, 2L, 3L).limit(2).parallel(); // NON NULL
Run Code Online (Sandbox Code Playgroud)为什么最后两个有不同的行为?如果我愿意,为什么我不能拆分连续流?(例如,丢弃其中一个分割以进行快速处理可能很有用).
将分裂器转换为流时的重大影响:
spliterator = Stream.of(1L, 2L, 3L).limit(2).spliterator();
stream = StreamSupport.stream(spliterator, true); // No parallel processing!
Run Code Online (Sandbox Code Playgroud)在这种情况下,从顺序流创建了一个spliterator,它禁用了split(.trySplit()返回null)的能力.稍后,需要转换回流,该流不会受益于并行处理.丢人现眼.
最大的问题:作为一种解决方法,在调用之前始终将流转换为并行的主要影响是什么?.spliterator()
// Supports activation of parallel …Run Code Online (Sandbox Code Playgroud) 我正在使用的代码
package com.skimmer;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import java.util.stream.Stream;
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
Spliterator<String> spliterator = test.parallel().spliterator();
List<Callable<Long>> callableList = new ArrayList<Callable<Long>>();
// Creating a future for each split to process concurrently
int totalSplits = 0;
while ((spliterator = spliterator.trySplit()) …Run Code Online (Sandbox Code Playgroud) 这可能是非常基本的,但我不是 Java 人。这是我的处理代码,它只是打印和休眠:
private static void myProcessings(int value)
{
System.out.println("Processing " + value);
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Finished processing " + value);
}
Run Code Online (Sandbox Code Playgroud)
现在,这个并行流似乎并行工作:
IntStream iit = IntStream.rangeClosed(1,3);
iit.parallel().forEach(Main::myProcessings);
// output:
// Processing 2
// Processing 1
// Processing 3
// Finished processing 3
// Finished processing 2
// Finished processing 1
Run Code Online (Sandbox Code Playgroud)
但是这个(由迭代器制成)没有:
static class MyIter implements Iterator<Integer>
{
private int max;
private int current;
public MyIter(int maxVal)
{
max = maxVal;
current = …Run Code Online (Sandbox Code Playgroud) 它是如何工作的?怎么可以Consumer<? super Integer>演员IntConsumer?
default boolean tryAdvance(Consumer<? super Integer> action) {
if (action instanceof IntConsumer) {
return tryAdvance((IntConsumer) action);
}
else {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} calling Spliterator.OfInt.tryAdvance((IntConsumer) action::accept)");
return tryAdvance((IntConsumer) action::accept);
}
}
Run Code Online (Sandbox Code Playgroud) 我有一个1到10,000的数字存储在一个数组中long.按顺序添加它们将得到50,005,000的结果.
我编写了一个Spliterator,如果一个数组的大小超过1000,它将被拆分为另一个数组.这是我的代码.但是当我运行它时,添加的结果远远超过50,005,000.有人能告诉我我的代码有什么问题吗?
非常感谢.
import java.util.Arrays;
import java.util.Optional;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class SumSpliterator implements Spliterator<Long> {
private final long[] numbers;
private int currentPosition = 0;
public SumSpliterator(long[] numbers) {
super();
this.numbers = numbers;
}
@Override
public boolean tryAdvance(Consumer<? super Long> action) {
action.accept(numbers[currentPosition++]);
return currentPosition < numbers.length;
}
@Override
public long estimateSize() {
return numbers.length - currentPosition;
}
@Override
public int characteristics() {
return SUBSIZED;
}
@Override
public Spliterator<Long> trySplit() { …Run Code Online (Sandbox Code Playgroud)