Nam*_*man 8 java concurrency data-stream java-8 java-9
我本来打算通过社区#DOC-1006738从Oracle相关的并发概念Flow.Publisher
和Flow.Subscriber
.在那里可以找到使用具有这两行代码的处理器转换数据流的示例代码,这让我有点困惑.
//Create Processor and Subscriber
MyFilterProcessor<String, String> filterProcessor =
new MyFilterProcessor<>(s -> s.equals("x"));
Run Code Online (Sandbox Code Playgroud)
问题1. MyFilterProcessor如何在<String, String>
这里输入?
对于我最初的想法,这些可能已经<String, Boolean>
取而代之,但那将无视下一行中订户定义的进一步定义: -
MyTransformProcessor<String, Integer> transformProcessor =
new MyTransformProcessor<>(s -> Integer.parseInt(s));
Run Code Online (Sandbox Code Playgroud)
另外请注意,除非我明确地将上述内容强制转换为(正确)
MyTransformProcessor<String, Integer>(s -> Integer.parseInt(s))
Run Code Online (Sandbox Code Playgroud)
我在parseInt
阅读时遇到错误,无法应用Object
.
- 为什么我需要在这里明确地投射RHS?-
虽然代码主要出现在共享链接中,但我使用的有用的构造函数定义是
public class MyTransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
MyTransformProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}
Run Code Online (Sandbox Code Playgroud)
以及相同的一个filterProcessor
: -
public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
MyFilterProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}
Run Code Online (Sandbox Code Playgroud)
题.现在进行这些更改(一个在解决问题1之后,另一个从附加注释中),如何正确实现样本?或者我只是错过了一些非常基本的东西?
我相信您的主要错误是MyFilterProcessor
作为(几乎)精确的MyTransformProcessor
.
由于作者没有发布该类的代码,我尝试根据以下内容猜测其行为:
... = new MyFilterProcessor<>(s -> s.equals("x"));
Run Code Online (Sandbox Code Playgroud)
顾名思义Filter
,该组件旨在仅接受然后重新发布某些值。此时,计算结果为 a boolean
(或 a Predicate<T>
)的函数在上下文中是完全可以接受的(因此s -> s.equals("x")
)。
页面末尾的初始数据流
String[] items = {"1", "x", "2", "x", "3", "x"};
Run Code Online (Sandbox Code Playgroud)
似乎证实了我的假设。作者只是想过滤掉"x"
值,并且这个任务被赋予MyFilterProcessor
必须在将其发布到管道的其余部分之前评估每种类型;并且输出类型必须与输入类型相同。
构造函数应该如下所示:
MyFilterProcessor(Predicate<? super R> predicate) { /* ... */ }
// or
MyFilterProcessor(Function<? super R, Boolean> function) { /* ... */ }
Run Code Online (Sandbox Code Playgroud)
据说onNext
只转发某些元素:
if (! predicate.test(item)) {
int max = submit(item); // get the estimated maximum lag
subscription.request(max);
}
Run Code Online (Sandbox Code Playgroud)
我对 的定义有两个想法MyFilterProcessor
:
public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<R, R>
as 的Flow.Processor
意思是接受和转发相同的类型。
我似乎T
在任何地方都不适合这种类型。这就是我被阻止的地方。
public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R>
但是,在 中onNext
,你必须转换<T>
为<R>
(丑陋,非常丑陋)
if (! predicate.test(item)) {
int max = submit( (R) item);
subscription.request(max);
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下你会测试 a Predicate<? super T>
。
如果您愿意稍微重构一下,因为已经继承了您可以让该类实现SubmissionPublisher
的行为:Flow.Publisher
Flow.Subscriber
public class MyFilterProcessor<R> extends SubmissionPublisher<R> implements Flow.Subscriber<R>
Run Code Online (Sandbox Code Playgroud)
所以
MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
// or, if you follow my example:
MyFilterProcessor<String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
Run Code Online (Sandbox Code Playgroud)
终于起作用了。
如果打印其中的值MyFilterProcessor
,MySubscriber
您将得到以下输出:
Publishing Items...
FilterProcessor: Receiving: 1
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 2
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 3
FilterProcessor: Receiving: x
Got: 1
Got: 2
Got: 3
Run Code Online (Sandbox Code Playgroud)
这是预期的结果。
测试时,请记住在退出应用程序之前等待管道完成,因为SubmissionPublisher
会发出另一个Thread
.
另外,与文章相反,请具备改变的常识
private Function function;
// ...
submit((R) function.apply(item));
Run Code Online (Sandbox Code Playgroud)
到
private Function<? super T, ? extends R> function;
// ...
submit(function.apply(item));
Run Code Online (Sandbox Code Playgroud)
为什么我需要在这里显式转换 RHS?
我仍然很难理解你是如何得到这个cannot be applied to Object
错误的。你用的是哪个jdk号和IDE?
归档时间: |
|
查看次数: |
304 次 |
最近记录: |