我正在使用Storm,它适用于很多用例.最近我看了一下Trident,它是Storm的高级抽象.它支持一次性处理,使状态处理更容易.
但现在我想知道..为什么我不能总是使用Trident而不是Storm?
到目前为止我读到的内容:
使用Trident而不是Storm时还有其他缺点吗?因为现在,我认为上面列出的缺点是微不足道的.
Trident无法实现哪些用例?
后果:
自从我问到我的公司决定先去三叉戟这个问题.当出现性能问题时,我们只会使用纯粹的Storm.可悲的是,这不是一个积极的决定,它只是成为默认行为(当时我不在身边).
他们的假设是,在大多数用例中,我们需要状态或仅一次处理,否则我们将在不久的将来需要它.我理解他们的推理是因为从Storm转到Trident或者回来并不是一个简单的转换,但在我个人看来,没有状态的流处理的概念并没有被所有人理解,这是使用Trident的主要原因.
我正在为我的Trident拓扑实现一个IBackingMap,以便将元组存储到ElasticSearch(我知道GitHub上已存在多个Trident/ElasticSearch集成实现,但我决定实现一个更适合我的任务的自定义实现).
所以我的实现是一个经典的工厂:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) …Run Code Online (Sandbox Code Playgroud) 我是风暴中的三叉戟的新手.我在TridentState上打破了我的头脑.至于我的理解三叉戟维护每个批处理的状态(即元数据)(批处理中的所有元组是否通过在数据库中维护事务ID完全处理)并且我不完全确定以下语句是做什么的
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
Run Code Online (Sandbox Code Playgroud)
任何人都可以解释在我们定义上述代码时实际发生了什么?
我正在学习Trident框架.有三叉戟几种方法Stream小号的聚集元组间歇内,包括这一个,其允许预成型件使用所述元组的状态的映射Aggregator的接口.但不幸的是,内置的对应物另外持久化地图状态,就像其他9个重载一样persistentAggregate(),仅Aggregator作为参数,不存在.
那么如何通过结合较低级别的Trident和Storm抽象和工具来实现所需的功能呢?探索API非常困难,因为几乎没有Javadoc文档.
换句话说,persistentAggregate()方法允许通过更新某些持久状态来结束流处理:
stream of tuples ---> persistent state
Run Code Online (Sandbox Code Playgroud)
我希望更新持久状态并发出不同的元组:
stream of tuples ------> stream of different tuples
with
persistent state
Run Code Online (Sandbox Code Playgroud)
Stream.aggregate(Fields, Aggregator, Fields) 不提供容错:
stream of tuples ------> stream of different tuples
with
simple in-memory state
Run Code Online (Sandbox Code Playgroud) 根据 prometheus storage.md ,建议不要使用 nfs 存储作为 prometheus 的持久卷。
但像 prometheus Operator 和 openshift 这样的解决方案展示了使用 nfs 作为 prometheus 持久卷的示例。
那么我在这里缺少什么?如果不推荐使用 nfs,那么为什么这些工具会分享使用 nfs 作为 prometheus 存储选项的示例?
有谁知道 Prometheus 的 NetApp/Trident 的 nfs 替代品是什么?
我有一个拓扑问题.我尝试解释工作流程...我有一个源每2分钟发出~500k元组,这些元组必须由一个喷口读取,并像一个对象一样繁琐处理(我认为是三叉戟中的批处理).之后,一个bolt/function /还有什么?...必须附加一个时间戳并将元组保存到Redis中.
我尝试使用一个函数实现Trident拓扑,该函数使用Jedis对象(Redis库for Java)将所有元组保存到Redis中,但是当我部署时,我会在此对象上收到NotSerializable异常.
我的问题是.如何实现一个在Redis上写这批元组的函数?在网上阅读我找不到任何从函数写入Redis的示例或在Trident中使用State对象的任何示例(可能我必须使用它...)
我的简单拓扑:
TridentTopology topology = new TridentTopology();
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379"));
Run Code Online (Sandbox Code Playgroud)
提前致谢