使用通过Flow API实现的处理器转换数据流

Nam*_*man 8 java concurrency data-stream java-8 java-9

我本来打算通过社区#DOC-1006738从Oracle相关的并发概念Flow.PublisherFlow.Subscriber.在那里可以找到使用具有这两行代码的处理器转换数据流示例代码,这让我有点困惑.

//Create Processor and Subscriber  
MyFilterProcessor<String, String> filterProcessor = 
                                      new MyFilterProcessor<>(s -> s.equals("x")); 
Run Code Online (Sandbox Code Playgroud)

问题1. MyFilterProcessor如何在<String, String>这里输入?

对于我最初的想法,这些可能已经<String, Boolean>取而代之,但那将无视下一行中订户定义的进一步定义: -

MyTransformProcessor<String, Integer> transformProcessor = 
                              new MyTransformProcessor<>(s -> Integer.parseInt(s));  
Run Code Online (Sandbox Code Playgroud)

另外请注意,除非我明确地将上述内容强制转换为(正确)

MyTransformProcessor<String, Integer>(s -> Integer.parseInt(s))
Run Code Online (Sandbox Code Playgroud)

我在parseInt阅读时遇到错误,无法应用Object.

- 为什么我需要在这里明确地投射RHS?-


虽然代码主要出现在共享链接中,但我使用的有用的构造函数定义是

public class MyTransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {       
    private Function function;
    MyTransformProcessor(Function<? super T, ? extends R> function) {  
        super();  
        this.function = function;  
    } 
    ...
} 
Run Code Online (Sandbox Code Playgroud)

以及相同的一个filterProcessor: -

public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
    private Function function;
    MyFilterProcessor(Function<? super T, ? extends R> function) {
        super();
        this.function = function;
    }
    ...
}
Run Code Online (Sandbox Code Playgroud)

题.现在进行这些更改(一个在解决问题1之后,另一个从附加注释中),如何正确实现样本?或者我只是错过了一些非常基本的东西?

Mar*_*cak 3

我相信您的主要错误是MyFilterProcessor作为(几乎)精确的MyTransformProcessor.

由于作者没有发布该类的代码,我尝试根据以下内容猜测其行为:

... = new MyFilterProcessor<>(s -> s.equals("x"));
Run Code Online (Sandbox Code Playgroud)

顾名思义Filter,该组件旨在仅接受然后重新发布某些值。此时,计算结果为 a boolean(或 a Predicate<T>)的函数在上下文中是完全可以接受的(因此s -> s.equals("x"))。

页面末尾的初始数据流

String[] items = {"1", "x", "2", "x", "3", "x"};  
Run Code Online (Sandbox Code Playgroud)

似乎证实了我的假设。作者只是想过滤掉"x"值,并且这个任务被赋予MyFilterProcessor必须在将其发布到管道的其余部分之前评估每种类型;并且输出类型必须与输入类型相同。


构造函数应该如下所示:

MyFilterProcessor(Predicate<? super R> predicate) { /* ... */ }
// or
MyFilterProcessor(Function<? super R, Boolean> function) { /* ... */ }
Run Code Online (Sandbox Code Playgroud)

据说onNext只转发某些元素:

if (! predicate.test(item)) {
    int max = submit(item); // get the estimated maximum lag
    subscription.request(max);
}
Run Code Online (Sandbox Code Playgroud)

我对 的定义有两个想法MyFilterProcessor

  • 1)public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<R, R>

as 的Flow.Processor意思是接受和转发相同的类型。

我似乎T在任何地方都不适合这种类型。这就是我被阻止的地方。

  • 2)public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R>

但是,在 中onNext,你必须转换<T><R> (丑陋,非常丑陋)

if (! predicate.test(item)) {
    int max = submit( (R) item);
    subscription.request(max);
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下你会测试 a Predicate<? super T>

如果您愿意稍微重构一下,因为已经继承了您可以让该类实现SubmissionPublisher的行为:Flow.PublisherFlow.Subscriber

public class MyFilterProcessor<R> extends SubmissionPublisher<R> implements Flow.Subscriber<R>
Run Code Online (Sandbox Code Playgroud)

所以

MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
// or, if you follow my example:
MyFilterProcessor<String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
Run Code Online (Sandbox Code Playgroud)

终于起作用了。


如果打印其中的值MyFilterProcessorMySubscriber您将得到以下输出:

Publishing Items...
FilterProcessor: Receiving: 1
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 2
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 3
FilterProcessor: Receiving: x
Got: 1
Got: 2
Got: 3
Run Code Online (Sandbox Code Playgroud)

这是预期的结果。


测试时,请记住在退出应用程序之前等待管道完成,因为SubmissionPublisher会发出另一个Thread.

另外,与文章相反,请具备改变的常识

private Function function; 
// ...
submit((R) function.apply(item));  
Run Code Online (Sandbox Code Playgroud)

private Function<? super T, ? extends R> function;
// ...
submit(function.apply(item));
Run Code Online (Sandbox Code Playgroud)

为什么我需要在这里显式转换 RHS?

我仍然很难理解你是如何得到这个cannot be applied to Object错误的。你用的是哪个号和IDE?