Car*_*ary 6 java project-reactor
Java 8 Streams 不允许重用。这就产生了一个关于如何在创建滑动窗口通量来计算像 x(i)*x(i-1) 这样的关系时重用流的难题。
以下代码基于移位运算符的思想。我使用 skip(1) 移动第一个流以创建第二个流。
Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed());
Flux<Integer> secondary = primary.skip(1);
primary.zipWith(secondary)
.map(t -> t.getT1() * t.getT2())
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
这是上述代码的可视化表示:
1 2 3 4 5 6 7 8 9 10
v v v v v v v v v v skip(1)
2 3 4 5 6 7 8 9 10
v v v v v v v v v v zipWith
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9 10 <- sliding window of length 2
v v v v v v v v v v multiples
2 6 12 20 30 42 56 72 90
Run Code Online (Sandbox Code Playgroud)
不幸的是,此代码错误为:
java.lang.IllegalStateException: stream has already been operated upon or closed
Run Code Online (Sandbox Code Playgroud)
显而易见的解决方法是缓存元素并确保缓存大小大于或等于流大小:
Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed()).cache(10);
Run Code Online (Sandbox Code Playgroud)
或使用流替换:
Flux<Integer> primary = Flux.range(0, 10);
Run Code Online (Sandbox Code Playgroud)
第二种解决方案只是重新执行skip(1) 序列的原始序列。
然而,一个有效的解决方案只需要一个大小为 2 的缓冲区。 如果流恰好是一个大文件,这将是一个大问题:
Files.lines(Paths.get(megaFile));
Run Code Online (Sandbox Code Playgroud)
如何有效地缓冲流,以便对主要 Flux 的多个订阅不会导致所有内容都被读入内存或导致重新执行?
我终于找到了一个解决方案,尽管它不是面向缓冲区的。灵感是先解决滑动窗口为2的问题:
Flux<Integer> primary = Flux.fromStream(IntStream.range(0, 10).boxed());
primary.flatMap(num -> Flux.just(num, num))
.skip(1)
.buffer(2)
.filter(list -> list.size() == 2)
.map(list -> Arrays.toString(list.toArray()))
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
该过程的可视化表示如下:
1 2 3 4 5 6 7 8 9
V V V V V V V V V Flux.just(num, num)
1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
V V V V V V V V V skip(1)
1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
V V V V V V V V V bufffer(2)
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9
V V V V V V V V V filter
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9
Run Code Online (Sandbox Code Playgroud)
这是输出:
[0, 1]
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5, 6]
[6, 7]
[7, 8]
[8, 9]
Run Code Online (Sandbox Code Playgroud)
然后我概括了上面的想法,为任意滑动窗口大小创建了一个解决方案:
public class SlidingWindow {
public static void main(String[] args) {
System.out.println("Different sliding windows for sequence 0 to 9:");
SlidingWindow flux = new SlidingWindow();
for (int windowSize = 1; windowSize < 5; windowSize++) {
flux.slidingWindow(windowSize, IntStream.range(0, 10).boxed())
.map(SlidingWindow::listToString)
.subscribe(System.out::print);
System.out.println();
}
//show stream difference: x(i)-x(i-1)
List<Integer> sequence = Arrays.asList(new Integer[]{10, 12, 11, 9, 13, 17, 21});
System.out.println("Show difference 'x(i)-x(i-1)' for " + listToString(sequence));
flux.slidingWindow(2, sequence.stream())
.doOnNext(SlidingWindow::printlist)
.map(list -> list.get(1) - list.get(0))
.subscribe(System.out::println);
System.out.println();
}
public <T> Flux<List<T>> slidingWindow(int windowSize, Stream<T> stream) {
if (windowSize > 0) {
Flux<List<T>> flux = Flux.fromStream(stream).map(ele -> Arrays.asList(ele));
for (int i = 1; i < windowSize; i++) {
flux = addDepth(flux);
}
return flux;
} else {
return Flux.empty();
}
}
protected <T> Flux<List<T>> addDepth(Flux<List<T>> flux) {
return flux.flatMap(list -> Flux.just(list, list))
.skip(1)
.buffer(2)
.filter(list -> list.size() == 2)
.map(list -> flatten(list));
}
protected <T> List<T> flatten(List<List<T>> list) {
LinkedList<T> newl = new LinkedList<>(list.get(1));
newl.addFirst(list.get(0).get(0));
return newl;
}
static String listToString(List list) {
return list.stream()
.map(i -> i.toString())
.collect(Collectors.joining(", ", "[ ", " ], "))
.toString();
}
static void printlist(List list) {
System.out.print(listToString(list));
}
}
Run Code Online (Sandbox Code Playgroud)
上述代码的输出如下:
Different sliding windows for sequence 0 to 9:
[ 0 ], [ 1 ], [ 2 ], [ 3 ], [ 4 ], [ 5 ], [ 6 ], [ 7 ], [ 8 ], [ 9 ],
[ 0, 1 ], [ 1, 2 ], [ 2, 3 ], [ 3, 4 ], [ 4, 5 ], [ 5, 6 ], [ 6, 7 ], [ 7, 8 ], [ 8, 9 ],
[ 0, 1, 2 ], [ 1, 2, 3 ], [ 2, 3, 4 ], [ 3, 4, 5 ], [ 4, 5, 6 ], [ 5, 6, 7 ], [ 6, 7, 8 ], [ 7, 8, 9 ],
[ 0, 1, 2, 3 ], [ 1, 2, 3, 4 ], [ 2, 3, 4, 5 ], [ 3, 4, 5, 6 ], [ 4, 5, 6, 7 ], [ 5, 6, 7, 8 ], [ 6, 7, 8, 9 ],
Show difference 'x(i)-x(i-1)' for [ 10, 12, 11, 9, 13, 17, 21 ],
[ 10, 12 ], 2
[ 12, 11 ], -1
[ 11, 9 ], -2
[ 9, 13 ], 4
[ 13, 17 ], 4
[ 17, 21 ], 4
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5325 次 |
| 最近记录: |