Vij*_*apu 8 java apache-kafka apache-kafka-streams
有没有人设法使用 IntelliJ IDEA 调试用 Java 8 编写的 kafkastreams 代码?我正在运行一个简单的 linesplit.java 代码,它从一个主题中获取流并将其拆分并将其发送到另一个主题,但我不知道在哪里保存调试指针以在每条消息流经 linesplit.java 时对其进行调试。
分割线
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
// ------- use the code below for Java 8 and uncomment the above ---
builder.stream("streams-input")
.flatMapValues(value -> Arrays.asList(value.toString().split("\\W+")))
.to("streams-output");
// -----------------------------------------------------------------
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
Run Code Online (Sandbox Code Playgroud)
你尝试过偷看吗?
您的示例如下(使用 peek 函数):
builder
.stream("streams-input")
.peek((k, v) -> log.info("Observed event: {}", v))
.flatMapValues(value -> Arrays.asList(value.toString().split("\\W+"))).
.peek((k, v) -> log.info("Transformed event: {}", v))
.to("streams-output");
Run Code Online (Sandbox Code Playgroud)
我没有运行代码,但这是我通常的做法。
| 归档时间: |
|
| 查看次数: |
2761 次 |
| 最近记录: |