Gnx*_*nxR 28 java java-8 java-stream
我Stream处理了几百万个元素.它背后的Map-Reduce算法需要几毫秒,因此任务完成大约需要20分钟.
Stream<MyData> myStream = readData();
MyResult result = myStream
.map(row -> process(row))
.peek(stat -> System.out.println("Hi, I processed another item"))
.reduce(MyStat::aggregate);
Run Code Online (Sandbox Code Playgroud)
我想要一种显示整体进度的方法,而不是每个元素打印一行(这导致每秒数千行,需要时间,并且不提供有关整体进度的任何有用信息).我想展示类似于:
5% (08s)
10% (14s)
15% (20s)
...
Run Code Online (Sandbox Code Playgroud)
最好(和/或最简单)的方法是什么?
Yas*_*jaj 16
首先,Streams并不是要实现这些任务(而不是传统的数据结构).如果你已经知道你的流将处理多少元素,你可以使用以下选项,我重申,这不是流的目标.
Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
.map(row -> process(row))
.peek(stat -> {
if (loader.incrementAndGet() % fivePercent == 0) {
System.out.println(loader.get() + " elements on " + elementsCount + " treated");
System.out.println((5*(loader.get() / fivePercent)) + "%");
}
})
.reduce(MyStat::aggregate);
Run Code Online (Sandbox Code Playgroud)
正如其他人所指出的:这有一些警告.首先,流不应该用于这样的事情.
在更技术层面,人们可以进一步争论:
filter或等操作而失真flatMap但是,记住这一点,对您的应用案例可能合理的一种方法是:
您可以创建一个Function<T,T>传递给map流的一个.(至少,我更喜欢peek在流上使用,如另一个答案所示).此功能可以使用AtomicLong计数元素来跟踪进度.为了将单独的事物分开,这个进展可以只转发给a Consumer<Long>,它将负责演示
这里的"演示"是指将此进度打印到控制台,标准化或百分比,指的是在创建消费者的任何地方都可以知道的大小.但是,消费者也可以仅处理打印,例如,每10个元素,或者如果自上一个元素以来已经过了至少5秒,则仅打印消息.
import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamProgress
{
public static void main(String[] args)
{
int size = 250;
Stream<Integer> stream = readData(size);
LongConsumer progressConsumer = progress ->
{
// "Filter" the output here: Report only every 10th element
if (progress % 10 == 0)
{
double relative = (double) progress / (size - 1);
double percent = relative * 100;
System.out.printf(Locale.ENGLISH,
"Progress %8d, relative %2.5f, percent %3.2f\n",
progress, relative, percent);
}
};
Integer result = stream
.map(element -> process(element))
.map(progressMapper(progressConsumer))
.reduce(0, (a, b) -> a + b);
System.out.println("result " + result);
}
private static <T> Function<T, T> progressMapper(
LongConsumer progressConsumer)
{
AtomicLong counter = new AtomicLong(0);
return t ->
{
long n = counter.getAndIncrement();
progressConsumer.accept(n);
return t;
};
}
private static Integer process(Integer element)
{
return element * 2;
}
private static Stream<Integer> readData(int size)
{
Iterator<Integer> iterator = new Iterator<Integer>()
{
int n = 0;
@Override
public Integer next()
{
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return n++;
}
@Override
public boolean hasNext()
{
return n < size;
}
};
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
iterator, Spliterator.ORDERED), false);
}
}
Run Code Online (Sandbox Code Playgroud)