Tob*_*itt 5 java-8 java-stream
我有一个常见的Streams API问题,我想"有效地"解决.假设我有一个(可能非常大,可能是无限的)流.我想以某种方式预处理它,例如,过滤掉一些项目,并改变一些项目.让我们假设这个预处理是复杂的,时间和计算密集的,所以我不想做两次.
接下来,我想对项目序列执行两组不同的操作,并使用不同的流类型构造处理每个不同序列的远端.对于无限的流,这将是一个forEach,对于有限的一个,它可能是一个收集器或其他什么.
显然,我可能会将中间结果收集到一个列表中,然后从该列表中拖出两个单独的流,分别处理每个流.这对于有限的流来说是有效的,尽管a)它看起来"丑陋"而且b)对于非常大的流来说它可能是不切实际的,而对于无限流来说它是不可行的.
我想我可以用偷看作为一种"发球".然后,我可以对peek下游的结果执行一个处理链,并以某种方式强制消费者查看"其他"工作,但现在第二条路径不再是流.
我发现我可以创建一个BlockingQueue,使用peek将东西推入该队列,然后从队列中获取一个流.这似乎是一个好主意,实际上工作得很好,虽然我无法理解流是如何关闭的(它实际上是这样,但我看不出如何).这是示例代码:
List<Student> ls = Arrays.asList(
new Student("Fred", 2.3F)
// more students (and Student definition) elided ...
);
BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();
ls.stream()
.peek(s -> {
try {
pipe.put(s);
} catch (InterruptedException ioe) {
ioe.printStackTrace();
}
})
.forEach(System.out::println);
new Thread(
new Runnable() {
public void run() {
Map<String, Double> map =
pipe.stream()
.collect(Collectors.groupingBy(s->s.getName(),
Collectors.averagingDouble(s->s.getGpa())));
map.forEach(
(k,v)->
System.out.println(
"Students called " + k
+ " average " + v));
}
}).start();
Run Code Online (Sandbox Code Playgroud)
所以,第一个问题是:有没有"更好"的方法来做到这一点?
第二个问题,BlockingQueue上的流如何关闭?
干杯,托比
有趣的问题。我先回答第二个问题,因为这是一个更简单的问题。
第二个问题,BlockingQueue 上的流到底是如何关闭的?
我认为“关闭”的意思是,流具有一定数量的元素,然后完成,忽略将来可能添加到队列中的任何元素。原因是队列上的流仅表示创建流时队列的当前内容。它不代表任何未来的元素,即其他线程将来可能添加的元素。
如果您想要一个代表队列当前和未来内容的流,那么您可以使用另一个答案中描述的技术。基本上都是用来Stream.generate()打电话的queue.take()。不过,我认为这不是您想要做的,所以我不会在这里进一步讨论。
现在谈谈你的更大问题。
您有一个对象源,您想要对其进行一些处理,包括过滤。然后,您想要获取结果并将其发送到两个不同的下游处理步骤。本质上你有一个生产者和两个消费者。
您必须处理的基本问题之一是如何处理不同处理步骤以不同速率发生的情况。假设我们已经解决了如何从队列中获取流而不使流过早终止的问题。如果生产者生成元素的速度比消费者处理此队列中的元素的速度快,则队列将累积元素,直到填满所有可用内存。
您还必须决定如何以不同的速率处理不同的消费者处理元素。如果一个消费者比另一个消费者慢得多,则可能需要缓冲任意数量的元素(这可能会填满内存),或者必须放慢较快消费者的速度以匹配较慢消费者的平均速率。
让我草拟一下你将如何进行的草图。不过我不知道你的实际要求,所以我不知道这是否能让你满意。需要注意的一件事是,在此类应用程序中使用并行流可能会出现问题,因为并行流不能很好地处理阻塞和负载平衡。
首先,我将从生产者的流处理元素开始,并将它们累积到ArrayBlockingQueue:
BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
producer.map(...)
.filter(...)
.forEach(queue::put);
Run Code Online (Sandbox Code Playgroud)
(注意putthrows InterruptedException,所以你不能只放在这里。你必须在这里放一个 try-catch 块,或者写一个辅助方法。但是如果被捕获queue::put该怎么办并不明显。)InterruptedException
如果队列已满,这将阻塞管道。要么在自己的线程中顺序运行,要么在专用线程池中并行运行,以避免阻塞公共池。
接下来,消费者:
while (true) {
// wait until the queue is full, or a timeout has expired,
// depending upon how frequently you want to continue
// processing elements emitted by the producer
List<T> list = new ArrayList<>();
queue.drainTo(list);
downstream1 = list.stream().filter(...).map(...).collect(...);
downstream2 = list.stream().filter(...).map(...).collect(...);
// deal with results downstream1 and downstream2
}
Run Code Online (Sandbox Code Playgroud)
这里的关键是从生产者到消费者的切换是通过该drainTo方法批量完成的,该方法将队列的元素添加到目的地并自动清空队列。这样,消费者就不必等待生产者完成其处理(如果是无限的,则不会发生这种情况)。此外,消费者正在处理已知数量的数据,并且不会在处理过程中发生阻塞。因此,如果有帮助的话,每个消费者流都可以并行运行。
在这里,我让消费者步调一致。如果您希望消费者以不同的速率运行,则必须构建额外的队列(或其他东西)来独立缓冲他们的工作负载。
如果消费者总体上比生产者慢,队列最终将填满并被阻塞,从而将生产者减慢到消费者可以接受的速度。如果消费者平均比生产者更快,那么也许您不需要担心消费者的相对处理速率。您可以让它们循环并获取生产者已设法放入队列的任何内容,甚至让它们阻塞直到有可用的东西。
我应该说,我所概述的是一种非常简单的多级流水线方法。如果您的应用程序对性能至关重要,您可能会发现自己在调整内存消耗、负载平衡、提高吞吐量和减少延迟方面做了很多工作。还有其他框架可能更适合您的应用程序。例如,您可以看看LMAX Disruptor,尽管我自己没有任何使用经验。
| 归档时间: |
|
| 查看次数: |
789 次 |
| 最近记录: |