有我的代码
public class Test {
private static boolean running = true;
public static void main(String[] args) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "get running:" + running);
while (running) {
}
System.out.println(Thread.currentThread().getName() + "end");
}, "t1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
running = false;
System.out.println(Thread.currentThread().getName() + "change running to:" + running);
}, "t2").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "get running:" + running);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) { …
Run Code Online (Sandbox Code Playgroud) 我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean,一个名为“ commonDSLBuilder
”,另一个名为“ ”,propertyDSLBuilder
带有props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4)
. 所以“ commonDSLBuilde
”只创建一个消费者,而“ propertyDSLBuilder
”创建四个消费者。
@Configuration
@EnableKafka
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Value("${spring.kafka.stream.application-id}")
private String applicationId;
@Bean(name = "commonDSLBuilder")
public StreamsBuilderFactoryBean commonDSLBuilder() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig streamsConfig = new StreamsConfig(props);
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilder.setSingleton(Boolean.FALSE);
return streamsBuilder;
}
@Bean(name = "propertyDSLBuilder")
public StreamsBuilderFactoryBean propertyDSLBuilder() {
Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)