Kafka Stream 自定义状态存储

Maa*_*mon 1 apache-kafka-streams

我一直在准备关于国有商店的文档,但我仍然不清楚它是否符合我的目的。我想使用一些分布式图形数据库作为其他外部应用程序可以使用的状态存储。这可能吗?这涉及什么工作?任何人都可以向我指出需要扩展才能实现该功能的类/代码吗?

Nis*_*yal 5

您可以使用处理器 API 实现自定义状态存储,如下所述: https:
//docs.confluence.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores

  • 您的自定义状态存储必须实现 StateStore。
  • 您必须有一个界面来表示商店中可用的操作。
  • 您必须提供 StoreBuilder 的实现来创建商店的实例。
  • 建议您提供一个限制只读操作访问的接口。这可以防止此 API 的用户在带外改变正在运行的 Kafka Streams 应用程序的状态。

实施将如下所示:

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
  // implementation of the actual store
}

// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
  void write(K Key, V value);
}

// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
  V read(K key);
}

public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>> {
  // implementation of the supplier for MyCustomStore
}
Run Code Online (Sandbox Code Playgroud)

为了使其可查询;

  • 提供 QueryableStoreType 的实现。
  • 提供一个包装类,该类可以访问存储的所有底层实例并用于查询。

例子 :

public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {

  // Only accept StateStores that are of type MyCustomStore
  public boolean accepts(final StateStore stateStore) {
    return stateStore instanceOf MyCustomStore;
  }

  public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
      return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
  }

}
Run Code Online (Sandbox Code Playgroud)