小编Dea*_*ool的帖子

Spring Boot Kafka中多个消费者如何监听多个主题?

当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。在下面的示例中,我有 2 个消费者工厂,它们将接受 2 个不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试访问 topic1 中的事件消息时,我无法访问,但可以访问用户主题消息。

前任:

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {      
@Autowired
private Environment environment;

@Bean
public ConsumerFactory<String,User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(User.class));

}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers")); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot

7
推荐指数
2
解决办法
2万
查看次数

Spring Boot 2.1:未从 application-test.yml 加载属性

我试图在我的单元测试执行期间从 application-test.yml 读取一个属性,但相反,正在读取 application-dev.yml 中的属性。我没有 application.yml 文件。感谢帮助。

应用属性.java

@Component
@ConfigurationProperties(prefix="app")
public class AppProperties {

    private String test;    

    public String getTest() {
        return this.test;
    }

    public void setTest(String test) {
        this.test = test;
    }
}
Run Code Online (Sandbox Code Playgroud)

应用程序-dev.yml

spring:
  profiles: dev
  application:
    name: testApplication
app:
  test: 1
Run Code Online (Sandbox Code Playgroud)

应用程序-test.yml

spring:
  profiles: test
  application:
    name: testApplication
app:
  test: 2
Run Code Online (Sandbox Code Playgroud)

应用服务测试.java

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {AppProperties.class}, initializers= ConfigFileApplicationContextInitializer.class)
@EnableConfigurationProperties
@ActiveProfiles("test")
public class AppServiceTest{

@Autowired
AppProperties appProperties;

@Test
public void test(){    
    appProperties.getTest();  
    //This returns "1" instead of the …
Run Code Online (Sandbox Code Playgroud)

java spring-boot

7
推荐指数
1
解决办法
5067
查看次数

如何从 Spring Boot 中的资源读取 JSON 文件

使用 Spring Boot 2.1.5 Release,创建了以下示例 Spring Boot 微服务:

Maven项目结构:

MicroService
    ?
    pom.xml
    src
    ?
    ????main
        ? 
        ????java
        ?   ? 
        ?   ????com
        ?       ????microservice
        ?           ?
        ?           ????MicroServiceApplication.java  
        ?  
        ????resources
            ?
            ????data.json
                ?                    
                application.properties
Run Code Online (Sandbox Code Playgroud)

有以下 JSON 文件(在 src/main/resources/data.json 中):

{"firstName": "John", "lastName": "Doe"}
Run Code Online (Sandbox Code Playgroud)

微服务应用:

@SpringBootApplication
public class MicroServiceApplication {

    @Bean
    CommandLineRunner runner() {
        return args -> {
            String data = FilePathUtils.readFileToString("../src/main/resources/data.json", MicroServiceApplication.class);
            System.out.println(data);
        };
    }

    public static void main(String[] args) {
        SpringApplication.run(MicroServiceApplication.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

抛出以下异常:

  java.lang.IllegalStateException: Failed to execute CommandLineRunner …
Run Code Online (Sandbox Code Playgroud)

java spring-boot spring-micrometer

7
推荐指数
3
解决办法
4万
查看次数

如何在java文本块中为变量值添加占位符

如何将变量放入java文本块中?
喜欢

"""
{
  ...
  "someKey": "someValue",
  "date": "${LocalDate.now()}",
  ...
}
"""
Run Code Online (Sandbox Code Playgroud)

java java-14 java-text-blocks

7
推荐指数
1
解决办法
2194
查看次数

如何指定 micronaut 中标头参数是强制的还是可选的

我在控制器中传递两个标头,如下所示

@Header("x-correlationId") String correlationId,
@Header(name = "x-consumedBy") String consumedBy
Run Code Online (Sandbox Code Playgroud)

其中x-correlationId是强制性的,x-consumedBy是可选的。我无法具体说明这一点。

在 Spring 中我们可以指定required=false.

告诉我们会发生什么。
它将两者视为强制性的。

如果我指定,@Nullable那么即使我传递该值,它也始终将该值视为 null

correlationId::12345:consumedBy:null
Run Code Online (Sandbox Code Playgroud)

java micronaut

7
推荐指数
1
解决办法
1894
查看次数

ZooKeeper 会话在测试中过期

我正在使用 EmbeddedKafka 通过以下注释配置来测试我的模块:

@ExtendWith(SpringExtension.class)
@SpringBootTest
@TestPropertySource(locations = "classpath:test.properties")
@EmbeddedKafka(partitions = 1,
    topics = {"topic"},
    brokerProperties = {
        "auto.create.topics.enable=${topics.autoCreate:false}",
        "delete.topic.enable=${topic.delete:true}",
        "broker.id=2"})
Run Code Online (Sandbox Code Playgroud)

它在大多数情况下都有效。

有时,它在创建 spring 上下文时失败,因为 zookeeper 会话超时:

java.lang.IllegalStateException:无法加载 ApplicationContext

...

在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 在 org.gradle.internal .concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) 在 java.base/java.lang.Thread.run(Thread.java:834) 引起:org.springframework.beans.factory.BeanCreationException:创建 bean 时出错名称为“embeddedKafka”:init 方法调用失败;嵌套异常是 org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1745) at org.springframework.beans.factory.support 过期。 org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1631) at kafka.zk 过期.KafkaZkClient.registerBroker(KafkaZkClient.scala:87) at kafka.server.KafkaServer.startup(KafkaServer.scala:257) at kafka.utils.TestUtils$.createServer(TestUtils.scala:132) at kafka.utils.TestUtils.createServer (TestUtils.scala) 在 org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:215) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:215) at 1 beans.factory.support.AbstractAutowireCapableBeanFactory。initializeBean(AbstractAutowireCapableBeanFactory.java:1741) ... 82 …

apache-kafka spring-boot apache-zookeeper spring-kafka embedded-kafka

6
推荐指数
0
解决办法
705
查看次数

如何决定spring kafka设置的并发数?

我正在使用 @KafkaListener 注释编写一个 kafka 消费者,我知道有一种方法可以使用 ConcurrentKafkaListenerContainerFactory 中的方法增加来自不同分区的并发 kafka 消费者的数量

e.g. factory.setConcurrency(3);
Run Code Online (Sandbox Code Playgroud)

setconcurrency 的 Javadoc 是这样说的:-

KafkaMessageListenerContainer 运行的最大并发数。来自同一分区内的消息将按顺序处理。

现在我的问题是

我有一个带有 144 个分区的 kafka 主题,我们的应用程序需要使用消息,并且 3 个应用程序实例正在并行运行。

我想知道如何决定需要设置的并发值

ConcurrentKafkaListenerContainerFactory.setconcurrency (<Value>) 
Run Code Online (Sandbox Code Playgroud)

这样我们就可以在消费消息时实现高吞吐量。

我应该使用 144/3 = 48 作为并发系数还是有公式可以得出这个数字?

apache-kafka kafka-consumer-api spring-kafka

6
推荐指数
1
解决办法
1万
查看次数

Spring Boot @WebMvcTest 与 @SpringBootTest

我有一个简单的健康控制器定义如下:

@RestController
@RequestMapping("/admin")
public class AdminController {

    @Value("${spring.application.name}")
    String serviceName;

    @GetMapping("/health")
    String getHealth() {
        return serviceName + " up and running";
    }
}
Run Code Online (Sandbox Code Playgroud)

以及测试它的测试类:

@WebMvcTest(RedisController.class)
class AdminControllerTest {

    @Autowired
    private MockMvc mockMvc;

    @Test
    public void healthShouldReturnDefaultMessage() throws Exception {
        this.mockMvc.perform(get("/admin/health"))
                .andDo(print())
                .andExpect(status().isOk())
                .andExpect(content().string(containsString("live-data-service up and running")));
    }
}
Run Code Online (Sandbox Code Playgroud)

运行测试时,我收到以下错误:

***************************
APPLICATION FAILED TO START
***************************

Description:

Field configuration in com.XXXX.LiveDataServiceApplication required a bean of type 'com.XXXXX.AppConfiguration' that could not be found.

The injection point has the following annotations:
    - @org.springframework.beans.factory.annotation.Autowired(required=true) …
Run Code Online (Sandbox Code Playgroud)

java spring-test-mvc spring-boot

6
推荐指数
2
解决办法
6184
查看次数

使用带有属性的 spring SPEL 调用方法

是否可以在赋值时使用属性值来调用方法?

例如,我知道我可以这样做:

@Value("${name}")
private String name; // will have the value of the `name` property
Run Code Online (Sandbox Code Playgroud)

我想知道是否可以做这样的事情:

@Value("#{myMethod(${name})}")
private String myModifiedVariable; // will have the result of invoking myMethod
Run Code Online (Sandbox Code Playgroud)

java spring-el spring-boot

6
推荐指数
1
解决办法
8843
查看次数

如何在函数中返回可选参数

我希望有一个函数返回我给它的参数的多个结果函数。

我已经尝试过谷歌搜索等,但没有任何运气

例如:

def function(x, i_add = False):
    if i_add == True:
        y = x+1

    return x, (Y)?
Run Code Online (Sandbox Code Playgroud)

给出的例子我想要的结果是:

function(3) -> 3
function(3, True) -> 3, 4
function(3, False) -> 3
Run Code Online (Sandbox Code Playgroud)

我正在使用 python 2.7

python return function

5
推荐指数
1
解决办法
4062
查看次数