我正在尝试在 Kafka Streams 中创建一个 leftJoin ,它可以正常工作大约 10 条记录,然后它会因NullPointerException使用此类代码引起的异常而崩溃:
private static KafkaStreams getKafkaStreams() {
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
KStreamBuilder builder = new KStreamBuilder();
KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN);
KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER);
KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable,
(tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String());
tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI);
return new KafkaStreams(builder, streamsConfig);
}
Run Code Online (Sandbox Code Playgroud)
StreamsConfig 看起来像这样:
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); …Run Code Online (Sandbox Code Playgroud) 我想将 microstream 的 EmbeddedStorageManager 配置为 Spring Boot 应用程序(2.5.0)中的 bean。
@Configuration
public class MicrostreamConfig {
@Value("${microstream.store.location}")
String location;
@Bean
DataRoot dataRoot() {
DataRoot dataRoot = new DataRoot();
dataRoot.setProjectList(new ArrayList<>());
return dataRoot;
}
@Bean
public EmbeddedStorageManager storageManager() {
EmbeddedStorageManager storageManager = EmbeddedStorage.start(
dataRoot(), // root object
Paths.get(location) // storage directory
);
return storageManager;
}
}
Run Code Online (Sandbox Code Playgroud)
并将其注入存储库类中
@Component
public class DataRepository {
@Autowired
private DataRoot dataRoot;
@Autowired
private EmbeddedStorageManager storageManager;
public void addProject(Project project) {
dataRoot.getProjectList().add(project);
storageManager.storeAll(dataRoot.getProjectList());
}
public List<Project> getProjectList() {
return …Run Code Online (Sandbox Code Playgroud) 我尝试了 ksqldb 并制作了一个这样的 docker-compose.yml:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:5.3.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: 10000000
KAFKA_MAX_PARTITION_FETCH_BYTES: 10000000
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- …Run Code Online (Sandbox Code Playgroud)