Tho*_*per 5 java spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka
我有一个流处理器,用于处理从 KafkaInputTopic
到OutputTopic
. 此外,我有多个租户需要进行此处理。我们将它们称为租户 A 和租户 B,但应用程序可能需要处理十多个租户。输入和输出主题遵循命名约定:A-输入,B-输入,...和A-输出,B-输出...
函数定义如下:
@Configuration
public class StreamProcessorConfig {
@Bean
public Function<KStream<String, InputType>, KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
Run Code Online (Sandbox Code Playgroud)
我的 application.yaml 现在为租户 A 配置流应用程序:
tenant: A
spring.cloud.function.definition: myfunctiondefinition
spring.cloud.stream.kafka.streams.binder.functions.myfunctiondefinition:
applicationId: ${spring.application.name}-myfunctiondefinition
spring.cloud.stream.bindings.myfunctiondefinition-in-0:
destination: ${tenant}-input
spring.cloud.stream.bindings.myfunctiondefinition-out-0:
destination: ${tenant}-output
Run Code Online (Sandbox Code Playgroud)
如何修改配置为租户B添加实例?当然,我可以复制 myfunctiondefinition() 以及所有配置键,但我正在寻找一种仅通过配置即可快速且干净地动态添加租户的方法。这可能吗?
注意:遗憾的是,无法为租户 B 和其他租户运行应用程序的另一个实例。
我们通过手动注册功能bean找到了解决这个问题的方法。遗憾的是,这并不像我们想象的那么容易。FunctionDetectorCondition
(https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud /stream/binder/kafka/streams/function/FunctionDetectorCondition.java)需要一个AnnotatedBeanDefinition
用作实际流处理 bean 的模板。这可以看作是对 Spring Cloud Streams 的一个建议,用于注册可以多次使用的函数定义模板。
为了达到这个目标,我们初始化一个工厂 bean 而不是流处理器函数本身:
@Configuration
public class StreamProcessorConfig {
@Bean
public MyFunctionDefinitionFactory myFunctionDefinitionFactory() {
return new MyFunctionDefinitionFactory();
}
}
Run Code Online (Sandbox Code Playgroud)
工厂创建流处理器函数:
public class MyFunctionDefinitionFactory {
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
Run Code Online (Sandbox Code Playgroud)
现在我们需要一个 Spring Cloud Streams 所需的虚拟 Bean 接口来应用其逻辑来创建流处理器:
// Behaves as dummy bean for spring cloud stream
// Has to be the same name as the original streaming function in the factory.
// In this case we named the method "myfunctiondefinition",
// so the dummy-bean has to get the name "Myfunctiondefinition".
public class Myfunctiondefinition implements Function<KStream<String, InputType>,
KStream<String, OutputType>> {
// !!! It could be that changes are needed if spring cloud streams changes the logic
// Method myfunctiondefinition() is needed, because spring cloud streams searches for
// a method with the same name as the class in
// FunctionDetectorCondition:pruneFunctionBeansForKafkaStreams
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return null;
}
// Needed for the interface implementation. Spring cloud streams needs
// the class Function to identify a stream processor candidate.
@Override
public KStream<String, OutputType> apply(KStream<String, InputType> input) {
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
现在一切准备就绪,我们可以为每个租户注册一个 bean。我们在ApplicationContextInitializer
使用工厂方法创建 bean 定义的 an 中执行此操作,并迭代functions
我们将在配置文件中定义的application.yaml
。
public class StreamProcessorInitializer
implements ApplicationContextInitializer<GenericWebApplicationContext> {
@Override
public void initialize(GenericWebApplicationContext context) {
String functionDefinitions = context.getEnvironment()
.getProperty("spring.cloud.function.definition");
String splitter = context.getEnvironment()
.getProperty("spring.cloud.function.definition.splitter");
String factoryName = CaseFormat.UPPER_CAMEL.
.to(CaseFormat.LOWER_CAMEL, MyFunctionDefinitionFactory.class.getSimpleName());
String factoryMethodName =
MyFunctionDefinitionFactory.class.getMethods()[0].getName();
AnnotatedGenericBeanDefinition def =
new AnnotatedGenericBeanDefinition(Myfunctiondefinition.class);
def.setFactoryBeanName(factoryName);
def.setFactoryMethodName(factoryMethodName);
Arrays.stream(functionDefinitions.split(splitter))
.forEach(function -> context.registerBeanDefinition(function, def));
}
}
Run Code Online (Sandbox Code Playgroud)
最后我们可以在application.yaml
. 这可以通过 helm oder kustomize 来配置特定的租户环境来完成:
#--------------------------------------------------------------------------------------------------------------------------------------
# streaming processor functions (going to be filled by helm)
#--------------------------------------------------------------------------------------------------------------------------------------
spring.cloud.function.definition: <name1>,<name2>,...
#--Note-- required as spring cloud streams has changed the splitter in the past
spring.cloud.function.definition.splitter: ;
# Properties per function (<name>)
spring.cloud.stream.kafka.streams.binder.functions.<name>.applicationId: ${tenant}-${spring.application.name}-<name>
# configuring dlq (if you have one)
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.dlqName: ${tenant}-<name>-dlq
# configuring in- and output topics
spring.cloud.stream.bindings.<name>-in-0.destination: ${tenant}-<inputname>
spring.cloud.stream.bindings.<name>-out-0.destination: ${tenant}-<outputname>
Run Code Online (Sandbox Code Playgroud)