标签: trident

风暴与三叉戟:何时不使用三叉戟?

我正在使用Storm,它适用于很多用例.最近我看了一下Trident,它是Storm的高级抽象.它支持一次性处理,使状态处理更容易.

但现在我想知道..为什么我不能总是使用Trident而不是Storm?

到目前为止我读到的内容:

  • Trident批量处理消息,因此吞吐时间可能更长.
  • Trident尚无法在拓扑中处理循环.

使用Trident而不是Storm时还有其他缺点吗?因为现在,我认为上面列出的缺点是微不足道的.

Trident无法实现哪些用例?


后果:

自从我问到我的公司决定先去三叉戟这个问题.当出现性能问题时,我们只会使用纯粹的Storm.可悲的是,这不是一个积极的决定,它只是成为默认行为(当时我不在身边).

他们的假设是,在大多数用例中,我们需要状态或仅一次处理,否则我们将在不久的将来需要它.我理解他们的推理是因为从Storm转到Trident或者回来并不是一个简单的转换,但在我个人看来,没有状态的流处理的概念并没有被所有人理解,这是使用Trident的主要原因.

trident apache-storm

39
推荐指数
2
解决办法
2万
查看次数

如何在Storm Trident拓扑中关闭由IBackingMap实现打开的数据库连接?

我正在为我的Trident拓扑实现一个IBackingMap,以便将元组存储到ElasticSearch(我知道GitHub上已存在多个Trident/ElasticSearch集成实现,但我决定实现一个更适合我的任务的自定义实现).

所以我的实现是一个经典的工厂:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) …
Run Code Online (Sandbox Code Playgroud)

trident apache-storm

10
推荐指数
1
解决办法
343
查看次数

什么是风暴中的三叉戟状态?

我是风暴中的三叉戟的新手.我在TridentState上打破了我的头脑.至于我的理解三叉戟维护每个批处理的状态(即元数据)(批处理中的所有元组是否通过在数据库中维护事务ID完全处理)并且我不完全确定以下语句是做什么的

TridentState urlToTweeters =
   topology.newStaticState(getUrlToTweetersState());
Run Code Online (Sandbox Code Playgroud)

任何人都可以解释在我们定义上述代码时实际发生了什么?

trident apache-storm

7
推荐指数
2
解决办法
3218
查看次数

如何在Trident中映射具有持久状态的元组?

我正在学习Trident框架.有三叉戟几种方法Stream小号的聚集元组间歇内,包括这一个,其允许预成型件使用所述元组的状态的映射Aggregator的接口.但不幸的是,内置的对应物另外持久化地图状态,就像其他9个重载一样persistentAggregate(),仅Aggregator作为参数,不存在.

那么如何通过结合较低级别的Trident和Storm抽象和工具来实现所需的功能呢?探索API非常困难,因为几乎没有Javadoc文档.

换句话说,persistentAggregate()方法允许通过更新某些持久状态来结束流处理:

stream of tuples ---> persistent state
Run Code Online (Sandbox Code Playgroud)

我希望更新持久状态并发出不同的元组:

stream of tuples ------> stream of different tuples
                  with
            persistent state
Run Code Online (Sandbox Code Playgroud)

Stream.aggregate(Fields, Aggregator, Fields) 不提供容错:

stream of tuples ------> stream of different tuples
                  with
          simple in-memory state
Run Code Online (Sandbox Code Playgroud)

state stream trident apache-storm

7
推荐指数
1
解决办法
2529
查看次数

Prometheus 和 nfs 存储

根据 prometheus storage.md ,建议不要使用 nfs 存储作为 prometheus 的持久卷。

但像 prometheus Operator 和 openshift 这样的解决方案展示了使用 nfs 作为 prometheus 持久卷的示例。

那么我在这里缺少什么?如果不推荐使用 nfs,那么为什么这些工具会分享使用 nfs 作为 prometheus 存储选项的示例?

有谁知道 Prometheus 的 NetApp/Trident 的 nfs 替代品是什么?

netapp trident kubernetes prometheus

4
推荐指数
1
解决办法
5553
查看次数

在Redis上写入的三叉戟或Storm拓扑

我有一个拓扑问题.我尝试解释工作流程...我有一个源每2分钟发出~500k元组,这些元组必须由一个喷口读取,并像一个对象一样繁琐处理(我认为是三叉戟中的批处理).之后,一个bolt/function /还有什么?...必须附加一个时间戳并将元组保存到Redis中.

我尝试使用一个函数实现Trident拓扑,该函数使用Jedis对象(Redis库for Java)将所有元组保存到Redis中,但是当我部署时,我会在此对象上收到NotSerializable异常.

我的问题是.如何实现一个在Redis上写这批元组的函数?在网上阅读我找不到任何从函数写入Redis的示例或在Trident中使用State对象的任何示例(可能我必须使用它...)

我的简单拓扑:

TridentTopology topology = new TridentTopology();
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379"));
Run Code Online (Sandbox Code Playgroud)

提前致谢

redis trident apache-storm

3
推荐指数
1
解决办法
3965
查看次数

标签 统计

trident ×6

apache-storm ×5

kubernetes ×1

netapp ×1

prometheus ×1

redis ×1

state ×1

stream ×1