当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。在下面的示例中,我有 2 个消费者工厂,它们将接受 2 个不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试访问 topic1 中的事件消息时,我无法访问,但可以访问用户主题消息。
前任:
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Autowired
private Environment environment;
@Bean
public ConsumerFactory<String,User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers")); …Run Code Online (Sandbox Code Playgroud) 我试图在我的单元测试执行期间从 application-test.yml 读取一个属性,但相反,正在读取 application-dev.yml 中的属性。我没有 application.yml 文件。感谢帮助。
应用属性.java
@Component
@ConfigurationProperties(prefix="app")
public class AppProperties {
private String test;
public String getTest() {
return this.test;
}
public void setTest(String test) {
this.test = test;
}
}
Run Code Online (Sandbox Code Playgroud)
应用程序-dev.yml
spring:
profiles: dev
application:
name: testApplication
app:
test: 1
Run Code Online (Sandbox Code Playgroud)
应用程序-test.yml
spring:
profiles: test
application:
name: testApplication
app:
test: 2
Run Code Online (Sandbox Code Playgroud)
应用服务测试.java
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {AppProperties.class}, initializers= ConfigFileApplicationContextInitializer.class)
@EnableConfigurationProperties
@ActiveProfiles("test")
public class AppServiceTest{
@Autowired
AppProperties appProperties;
@Test
public void test(){
appProperties.getTest();
//This returns "1" instead of the …Run Code Online (Sandbox Code Playgroud) 使用 Spring Boot 2.1.5 Release,创建了以下示例 Spring Boot 微服务:
Maven项目结构:
MicroService
?
pom.xml
src
?
????main
?
????java
? ?
? ????com
? ????microservice
? ?
? ????MicroServiceApplication.java
?
????resources
?
????data.json
?
application.properties
Run Code Online (Sandbox Code Playgroud)
有以下 JSON 文件(在 src/main/resources/data.json 中):
{"firstName": "John", "lastName": "Doe"}
Run Code Online (Sandbox Code Playgroud)
微服务应用:
@SpringBootApplication
public class MicroServiceApplication {
@Bean
CommandLineRunner runner() {
return args -> {
String data = FilePathUtils.readFileToString("../src/main/resources/data.json", MicroServiceApplication.class);
System.out.println(data);
};
}
public static void main(String[] args) {
SpringApplication.run(MicroServiceApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
抛出以下异常:
java.lang.IllegalStateException: Failed to execute CommandLineRunner …Run Code Online (Sandbox Code Playgroud) 如何将变量放入java文本块中?
喜欢
"""
{
...
"someKey": "someValue",
"date": "${LocalDate.now()}",
...
}
"""
Run Code Online (Sandbox Code Playgroud) 我在控制器中传递两个标头,如下所示
@Header("x-correlationId") String correlationId,
@Header(name = "x-consumedBy") String consumedBy
Run Code Online (Sandbox Code Playgroud)
其中x-correlationId是强制性的,x-consumedBy是可选的。我无法具体说明这一点。
在 Spring 中我们可以指定required=false.
告诉我们会发生什么。
它将两者视为强制性的。
如果我指定,@Nullable那么即使我传递该值,它也始终将该值视为 null
correlationId::12345:consumedBy:null
Run Code Online (Sandbox Code Playgroud) 我正在使用 EmbeddedKafka 通过以下注释配置来测试我的模块:
@ExtendWith(SpringExtension.class)
@SpringBootTest
@TestPropertySource(locations = "classpath:test.properties")
@EmbeddedKafka(partitions = 1,
topics = {"topic"},
brokerProperties = {
"auto.create.topics.enable=${topics.autoCreate:false}",
"delete.topic.enable=${topic.delete:true}",
"broker.id=2"})
Run Code Online (Sandbox Code Playgroud)
它在大多数情况下都有效。
但有时,它在创建 spring 上下文时失败,因为 zookeeper 会话超时:
java.lang.IllegalStateException:无法加载 ApplicationContext
...
在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 在 org.gradle.internal .concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) 在 java.base/java.lang.Thread.run(Thread.java:834) 引起:org.springframework.beans.factory.BeanCreationException:创建 bean 时出错名称为“embeddedKafka”:init 方法调用失败;嵌套异常是 org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1745) at org.springframework.beans.factory.support 过期。 org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1631) at kafka.zk 过期.KafkaZkClient.registerBroker(KafkaZkClient.scala:87) at kafka.server.KafkaServer.startup(KafkaServer.scala:257) at kafka.utils.TestUtils$.createServer(TestUtils.scala:132) at kafka.utils.TestUtils.createServer (TestUtils.scala) 在 org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:215) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:215) at 1 beans.factory.support.AbstractAutowireCapableBeanFactory。initializeBean(AbstractAutowireCapableBeanFactory.java:1741) ... 82 …
apache-kafka spring-boot apache-zookeeper spring-kafka embedded-kafka
我正在使用 @KafkaListener 注释编写一个 kafka 消费者,我知道有一种方法可以使用 ConcurrentKafkaListenerContainerFactory 中的方法增加来自不同分区的并发 kafka 消费者的数量
e.g. factory.setConcurrency(3);
Run Code Online (Sandbox Code Playgroud)
setconcurrency 的 Javadoc 是这样说的:-
KafkaMessageListenerContainer 运行的最大并发数。来自同一分区内的消息将按顺序处理。
现在我的问题是
我有一个带有 144 个分区的 kafka 主题,我们的应用程序需要使用消息,并且 3 个应用程序实例正在并行运行。
我想知道如何决定需要设置的并发值
ConcurrentKafkaListenerContainerFactory.setconcurrency (<Value>)
Run Code Online (Sandbox Code Playgroud)
这样我们就可以在消费消息时实现高吞吐量。
我应该使用 144/3 = 48 作为并发系数还是有公式可以得出这个数字?
我有一个简单的健康控制器定义如下:
@RestController
@RequestMapping("/admin")
public class AdminController {
@Value("${spring.application.name}")
String serviceName;
@GetMapping("/health")
String getHealth() {
return serviceName + " up and running";
}
}
Run Code Online (Sandbox Code Playgroud)
以及测试它的测试类:
@WebMvcTest(RedisController.class)
class AdminControllerTest {
@Autowired
private MockMvc mockMvc;
@Test
public void healthShouldReturnDefaultMessage() throws Exception {
this.mockMvc.perform(get("/admin/health"))
.andDo(print())
.andExpect(status().isOk())
.andExpect(content().string(containsString("live-data-service up and running")));
}
}
Run Code Online (Sandbox Code Playgroud)
运行测试时,我收到以下错误:
***************************
APPLICATION FAILED TO START
***************************
Description:
Field configuration in com.XXXX.LiveDataServiceApplication required a bean of type 'com.XXXXX.AppConfiguration' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true) …Run Code Online (Sandbox Code Playgroud) 是否可以在赋值时使用属性值来调用方法?
例如,我知道我可以这样做:
@Value("${name}")
private String name; // will have the value of the `name` property
Run Code Online (Sandbox Code Playgroud)
我想知道是否可以做这样的事情:
@Value("#{myMethod(${name})}")
private String myModifiedVariable; // will have the result of invoking myMethod
Run Code Online (Sandbox Code Playgroud) 我希望有一个函数返回我给它的参数的多个结果函数。
我已经尝试过谷歌搜索等,但没有任何运气
例如:
def function(x, i_add = False):
if i_add == True:
y = x+1
return x, (Y)?
Run Code Online (Sandbox Code Playgroud)
给出的例子我想要的结果是:
function(3) -> 3
function(3, True) -> 3, 4
function(3, False) -> 3
Run Code Online (Sandbox Code Playgroud)
我正在使用 python 2.7
java ×7
spring-boot ×6
apache-kafka ×3
spring-kafka ×2
function ×1
java-14 ×1
micronaut ×1
python ×1
return ×1
spring-el ×1