小编use*_*182的帖子

如何编写Kafka使用者-单线程与多线程

我编写了一个单一的Kafka使用者(使用Spring Kafka),该使用者从单个主题中读取内容,并且是使用者组的一部分。消耗完一条消息后,它将执行所有下游操作,并移至下一个消息偏移。我将其打包为WAR文件,并且我的部署管道将其推送到单个实例。使用部署管道,我可以将该工件部署到部署池中的多个实例。

但是,当我希望多个消费者作为基础架构的一部分时,我无法理解以下内容:

  • 实际上,我可以在部署池中定义多个实例,并在所有这些实例上运行此WAR。这意味着,他们所有人都在听同一个话题,属于同一个消费者群体,并且实际上将划分自己。下游逻辑将按原样工作。这对于我的用例来说效果很好,但是,我不确定这是否是最佳方法吗?

  • 在线阅读时,我在这里这里遇到了各种资源,人们其中定义了一个消费者线程,但在内部却创建了多个工作线程。在某些示例中,我们可以定义执行下游逻辑的多个使用者线程。考虑这些方法并将它们映射到部署环境,我们可以达到相同的结果(如我上面的理论解决方案所能达到的),但是使用的机器数量更少。

我个人认为我的解决方案简单,可扩展,但可能不是最佳选择,而第二种方法可能是最佳选择,但想了解您的经验,建议或我应该考虑的其他指标/限制?另外,我正在考虑我的理论解决方案,实际上我可以作为Kafka的消费者使用简单的简单机器。

据我所知,我尚未发布任何代码,如果需要将此问题移至另一个论坛,请告诉我。如果您需要特定的代码示例,我也可以提供它们,但就我的问题而言,我认为它们并不重要。

java deployment multithreading apache-kafka spring-kafka

10
推荐指数
1
解决办法
7693
查看次数

使用AOP作用域代理在单例bean中自动装配原型bean

我能够测试自动装配原型bean,在单例bean中导致只创建一个原型bean.

作为解决方案,我读到我可以为原型bean定义AOP作用域代理或使用Spring的查找方法注入.

这是我试过的 -

PrototypeBean.java

@Component
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode =   ScopedProxyMode.INTERFACES)
public class PrototypeBean implements Prototype {

private String welcomeMessage;

public String getWelcomeMessage() {
    return welcomeMessage;
}

public void setWelcomeMessage(final String welcomeMessage) {
    this.welcomeMessage = welcomeMessage;
}
}
Run Code Online (Sandbox Code Playgroud)

SingletonBean.java

@Component
public class SingletonBean implements Singleton{
@Autowired private Prototype prototype;

public Prototype getPrototype() {
    return prototype;
}

public void greet() {
    System.out.println(prototype.getWelcomeMessage());
}
}
Run Code Online (Sandbox Code Playgroud)

测试类

public class AutowiredDependenciesDemo {

@Autowired private Singleton autowiredSingleton;
@Autowired ConfigurableApplicationContext context;

@Test
public void testPrototypeBeanWithAopScopedProxy(){ …
Run Code Online (Sandbox Code Playgroud)

java proxy spring dependency-injection spring-aop

2
推荐指数
1
解决办法
3029
查看次数

如何使用spring boot设置kafka使用者并发性

我正在编写一个基于Java的Kafka Consumer应用程序.我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring启动.虽然Spring启动让我轻松编写Kafka使用者(没有真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/定制一些属性.但是,我找不到使用Spring启动的简单方法.例如:我有兴趣设置的一些属性是 -

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

我接过一看春天开机前定义的属性在这里.

此外,基于前面一个问题在这里,我想设置对消费者的并发性,但无法找到一个配置,为application.properties驱动的方式做到这一点使用Spring启动.

一种显而易见的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory在我的Spring Context中再次定义类并从那里开始工作.我想了解是否有更简洁的方法,特别是因为我使用的是Spring Boot.

版本 -

  • kafka-clients - 0.10.0.0-SASL
  • spring-kafka - 1.1.0.RELEASE
  • 春季靴子 - 1.5.10.RELEASE

java apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
6728
查看次数