lev*_*tov 7 state stream trident apache-storm
我正在学习Trident框架.有三叉戟几种方法Stream小号的聚集元组间歇内,包括这一个,其允许预成型件使用所述元组的状态的映射Aggregator的接口.但不幸的是,内置的对应物另外持久化地图状态,就像其他9个重载一样persistentAggregate(),仅Aggregator作为参数,不存在.
那么如何通过结合较低级别的Trident和Storm抽象和工具来实现所需的功能呢?探索API非常困难,因为几乎没有Javadoc文档.
换句话说,persistentAggregate()方法允许通过更新某些持久状态来结束流处理:
stream of tuples ---> persistent state
Run Code Online (Sandbox Code Playgroud)
我希望更新持久状态并发出不同的元组:
stream of tuples ------> stream of different tuples
with
persistent state
Run Code Online (Sandbox Code Playgroud)
Stream.aggregate(Fields, Aggregator, Fields) 不提供容错:
stream of tuples ------> stream of different tuples
with
simple in-memory state
Run Code Online (Sandbox Code Playgroud)
您可以使用TridentState#newValuesStream()方法从状态创建新流。这将允许您检索聚合值的流。
为了便于说明,我们可以通过添加此方法和调试过滤器来改进Trident 文档中的示例:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.newValuesStream().each(new Fields("count"), new Debug());
Run Code Online (Sandbox Code Playgroud)
运行此拓扑将输出(到控制台)聚合计数。
希望能帮助到你
| 归档时间: |
|
| 查看次数: |
2529 次 |
| 最近记录: |