我编写了一个单一的Kafka使用者(使用Spring Kafka),该使用者从单个主题中读取内容,并且是使用者组的一部分。消耗完一条消息后,它将执行所有下游操作,并移至下一个消息偏移。我将其打包为WAR文件,并且我的部署管道将其推送到单个实例。使用部署管道,我可以将该工件部署到部署池中的多个实例。
但是,当我希望多个消费者作为基础架构的一部分时,我无法理解以下内容:
实际上,我可以在部署池中定义多个实例,并在所有这些实例上运行此WAR。这意味着,他们所有人都在听同一个话题,属于同一个消费者群体,并且实际上将划分自己。下游逻辑将按原样工作。这对于我的用例来说效果很好,但是,我不确定这是否是最佳方法吗?
在线阅读时,我在这里和这里遇到了各种资源,人们在其中定义了一个消费者线程,但在内部却创建了多个工作线程。在某些示例中,我们可以定义执行下游逻辑的多个使用者线程。考虑这些方法并将它们映射到部署环境,我们可以达到相同的结果(如我上面的理论解决方案所能达到的),但是使用的机器数量更少。
我个人认为我的解决方案简单,可扩展,但可能不是最佳选择,而第二种方法可能是最佳选择,但想了解您的经验,建议或我应该考虑的其他指标/限制?另外,我正在考虑我的理论解决方案,实际上我可以作为Kafka的消费者使用简单的简单机器。
据我所知,我尚未发布任何代码,如果需要将此问题移至另一个论坛,请告诉我。如果您需要特定的代码示例,我也可以提供它们,但就我的问题而言,我认为它们并不重要。
我能够测试自动装配原型bean,在单例bean中导致只创建一个原型bean.
作为解决方案,我读到我可以为原型bean定义AOP作用域代理或使用Spring的查找方法注入.
这是我试过的 -
PrototypeBean.java
@Component
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.INTERFACES)
public class PrototypeBean implements Prototype {
private String welcomeMessage;
public String getWelcomeMessage() {
return welcomeMessage;
}
public void setWelcomeMessage(final String welcomeMessage) {
this.welcomeMessage = welcomeMessage;
}
}
Run Code Online (Sandbox Code Playgroud)
SingletonBean.java
@Component
public class SingletonBean implements Singleton{
@Autowired private Prototype prototype;
public Prototype getPrototype() {
return prototype;
}
public void greet() {
System.out.println(prototype.getWelcomeMessage());
}
}
Run Code Online (Sandbox Code Playgroud)
测试类
public class AutowiredDependenciesDemo {
@Autowired private Singleton autowiredSingleton;
@Autowired ConfigurableApplicationContext context;
@Test
public void testPrototypeBeanWithAopScopedProxy(){ …
Run Code Online (Sandbox Code Playgroud) 我正在编写一个基于Java的Kafka Consumer应用程序.我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring启动.虽然Spring启动让我轻松编写Kafka使用者(没有真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/定制一些属性.但是,我找不到使用Spring启动的简单方法.例如:我有兴趣设置的一些属性是 -
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
我接过一看春天开机前定义的属性在这里.
此外,基于前面一个问题在这里,我想设置对消费者的并发性,但无法找到一个配置,为application.properties驱动的方式做到这一点使用Spring启动.
一种显而易见的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory
在我的Spring Context中再次定义类并从那里开始工作.我想了解是否有更简洁的方法,特别是因为我使用的是Spring Boot.
版本 -
java apache-kafka spring-boot kafka-consumer-api spring-kafka
java ×3
apache-kafka ×2
spring-kafka ×2
deployment ×1
proxy ×1
spring ×1
spring-aop ×1
spring-boot ×1