什么是风暴中的三叉戟状态?

Ezh*_*hil 7 trident apache-storm

我是风暴中的三叉戟的新手.我在TridentState上打破了我的头脑.至于我的理解三叉戟维护每个批处理的状态(即元数据)(批处理中的所有元组是否通过在数据库中维护事务ID完全处理)并且我不完全确定以下语句是做什么的

TridentState urlToTweeters =
   topology.newStaticState(getUrlToTweetersState());
Run Code Online (Sandbox Code Playgroud)

任何人都可以解释在我们定义上述代码时实际发生了什么?

bop*_*cat 9

我希望回答永远不会太晚,至少其他人可能会觉得我的答案很有用:)

因此,topology.newStaticState()是Trident对可查询数据存储的抽象.参数newStaticState()应该是一个实现 - 基于方法的契约 - storm.trident.state.StateFactory.反过来,工厂应该实现makeState()返回实例的方法storm.trident.state.State.但是,如果你打算查询你的状态,你应该返回一个istance storm.trident.state.map.ReadOnlyMapState,因为plain storm.trident.state.State没有查询实际数据源的方法(如果你试图使用任何东西,你实际上会得到一个类转换异常ReadOnlyMapState).

那么,让我们试一试吧!

虚拟状态实现:

public static class ExampleStaticState implements ReadOnlyMapState<String> {

    private final Map<String, String> dataSourceStub;

    public ExampleStaticState() {
        dataSourceStub = new HashMap<>();
        dataSourceStub.put("tuple-00", "Trident");
        dataSourceStub.put("tuple-01", "definitely");
        dataSourceStub.put("tuple-02", "lacks");
        dataSourceStub.put("tuple-03", "documentation");
    }

    @Override
    public List<String> multiGet(List<List<Object>> keys) {

        System.out.println("DEBUG: MultiGet, keys is " + keys);

        List<String> result = new ArrayList<>();

        for (List<Object> inputTuple : keys) {
            result.add(dataSourceStub.get(inputTuple.get(0)));
        }

        return result;
    }

    @Override
    public void beginCommit(Long txid) {
        // never gets executed...
        System.out.println("DEBUG: Begin commit, txid=" + txid);
    }

    @Override
    public void commit(Long txid) {
        // never gets executed...
        System.out.println("DEBUG: Commit, txid=" + txid);
    }
}
Run Code Online (Sandbox Code Playgroud)

一个工厂:

public static class ExampleStaticStateFactory implements StateFactory {
    @Override
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
        return new ExampleStaticState();
    }
}
Run Code Online (Sandbox Code Playgroud)

简单psvm(又名public static void main):

public static void main(String... args) {
    TridentTopology tridentTopology = new TridentTopology();
    FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
            "foo"
    }));
    TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
    tridentTopology
            .newStream("spout", spout)
            .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
            .each(new Fields("foo", "bar"), new Debug())
            ;

    Config conf = new Config();
    conf.setNumWorkers(6);

    LocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());

    spout.feed(Arrays.asList(new Values[]{
            new Values("tuple-00"),
            new Values("tuple-01"),
            new Values("tuple-02"),
            new Values("tuple-03")
    }));

    localCluster.shutdown();
}
Run Code Online (Sandbox Code Playgroud)

最后,输出:

DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
Run Code Online (Sandbox Code Playgroud)

你看,stateQuery()从输入批处理获取值并将它们映射到'数据存储'中找到的值.

深入潜水,您可以查看MapGet类的来源(其实例用于在拓扑中查询的人)并在那里找到以下内容:

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
    @Override
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
        return map.multiGet((List) keys);
    }    

    @Override
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
        collector.emit(new Values(result));
    }    
}
Run Code Online (Sandbox Code Playgroud)

因此,它只需调用multiGet()您的ReadOnlyMapState实现方法,然后发出数据存储中的值,将它们添加到已存在的元组中.你可以(虽然它可能不是最好的事情)创建自己的BaseQueryFunction<ReadOnlyMapState, Object>做更复杂的实现.


lor*_*can 0

Storm wiki 上有关于 Trident 状态的详细文档。您的问题的简单答案是这urlToTweeters是一个可以查询的状态对象。我假设上面的陈述来自trident 教程,转载如下:

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
  .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
  /* At this point we have the tweeters for each url passed in args */
  .shuffle()        
  .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
  .parallelismHint(200)
  .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
  .groupBy(new Fields("follower"))
  .aggregate(new One(), new Fields("one"))
  .parallelismHint(20)
  .aggregate(new Count(), new Fields("reach"));
Run Code Online (Sandbox Code Playgroud)

在此示例中,urlToTweeters将存储 URL 到 Tweeters 的映射,并且reach在下一行定义的 DRPC 查询(将 URL 作为其参数)最终将产生到达范围。但在途中(用内联注释标记)您会看到每个 url 的推特流,即urlToTweeters.