小编Ren*_*upt的帖子

未能刷新状态存储

我正在尝试在 Kafka Streams 中创建一个 leftJoin ,它可以正常工作大约 10 条记录,然后它会因NullPointerException使用此类代码引起的异常而崩溃:

private static KafkaStreams getKafkaStreams() {
    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    KStreamBuilder builder = new KStreamBuilder();

    KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN);
    KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER);

    KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable,
            (tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String());

    tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI);

    return new KafkaStreams(builder, streamsConfig);
}
Run Code Online (Sandbox Code Playgroud)

StreamsConfig 看起来像这样:

private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION);
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

3
推荐指数
1
解决办法
2329
查看次数

spring bean 配置中的 microstream EmbeddedStorageManager

我想将 microstream 的 EmbeddedStorageManager 配置为 Spring Boot 应用程序(2.5.0)中的 bean。

@Configuration
public class MicrostreamConfig {

    @Value("${microstream.store.location}")
    String location;

    @Bean
    DataRoot dataRoot() {
        DataRoot dataRoot = new DataRoot();
        dataRoot.setProjectList(new ArrayList<>());
        return dataRoot;
    }
    
    @Bean
    public EmbeddedStorageManager storageManager() {

        EmbeddedStorageManager storageManager = EmbeddedStorage.start(
                dataRoot(),          // root object
                Paths.get(location) // storage directory
        );
        return storageManager;
    }
}
Run Code Online (Sandbox Code Playgroud)

并将其注入存储库类中

@Component
public class DataRepository {

    @Autowired
    private DataRoot dataRoot;

    @Autowired
    private EmbeddedStorageManager storageManager;

    public void addProject(Project project) {
        dataRoot.getProjectList().add(project);
        storageManager.storeAll(dataRoot.getProjectList());
    }

    public List<Project> getProjectList() {
        return …
Run Code Online (Sandbox Code Playgroud)

java spring-boot microstream

2
推荐指数
1
解决办法
523
查看次数

ksqldb - select * from stream' 结果在 io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor 找不到

我尝试了 ksqldb 并制作了一个这样的 docker-compose.yml:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 10000000

  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - …
Run Code Online (Sandbox Code Playgroud)

ksqldb

0
推荐指数
1
解决办法
327
查看次数