我试图spel if statement在xml配置中使用但没有成功:
xml的一部分:
<bean id="ExportReader"
class="org.springframework.batch.item.file.MultiResourceItemReader"
scope="step">
<property name="resources" value="#{jobParameters['isIncremental'] eq 1? '${step3.index.incremental.folder}/#{stepExecutionContext['fileName']}/*.xml' : true }">
<property name="delegate" ref="staxPatentReader"></property>
<property name="strict" value="true"></property>
</bean>
Run Code Online (Sandbox Code Playgroud)
该${step3.index.incremental.folder}工作=>解析为C:/,但第二部分失败,所以我喜欢的资源C:/#{stepExecutionContext['fileName']}/*.xml
我想有一个问题是逃避单引号...
另一件事是这个xml工作正常:
<bean id="indexFolderPartitioner"class="com.mycompany.FolderPartitioner" scope="step">
<property name="folder" value="#{jobParameters['isIncremental'] eq 1? '${step3.index.incremental.folder}' : '${step3.index.full.folder}' }"></property>
</bean>
Run Code Online (Sandbox Code Playgroud) 我在 Spring Boot 应用程序中使用 Spring Rest 模板。
即使我传递了凭据,我总是收到 401 未经授权的错误。
我可以通过 Chrome REST Web 服务客户端访问此服务。
有没有一种简化的方法来访问SpringBoot中的REST模板。
下面是迄今为止导致 401 错误的代码片段
private DetailsBean invokeDetailsRestService(UserParam userParam){
ResponseEntity<DetailsBean> responseEntity = null;
String url = "https://dev.com/app/identifyuser/";
RestClientConfig restClientConfig =new RestClientConfig("user123","pass123");
responseEntity= restClientConfig.postForEntity(url, userParam, DetailsBean.class);
log.debug("User Details : {} ", responseEntity.getBody());
return responseEntity.getBody();
}
public ClientHttpRequestFactory getRequestFactory(String userName,String password){
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials( new AuthScope(null, -1), new UsernamePasswordCredentials(userName,password) );
HttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(credsProvider).build();
return new HttpComponentsClientHttpRequestFactory(httpClient);
}
Run Code Online (Sandbox Code Playgroud)
RestClientConfig 类
public RestClientConfig(String username, String password) …Run Code Online (Sandbox Code Playgroud) 我正在研究一个从Azure服务总线读取消息的应用程序。该应用程序是使用Spring Boot,Spring jms和Qpid jms客户端创建的。我能够从Queue正确读取消息,而没有任何问题。PFB我用来阅读消息的代码。
@Service
public class QueueReceiver {
@JmsListener(destination = "testing")
public void onMessage(String message) {
if (null != message) {
System.out.println("Received message from Queue: " + message);
}
}}
Run Code Online (Sandbox Code Playgroud)
问题是,对于不同的环境,我们有不同的目的地,例如testingfor dev,testing-qafor qa和testing-prodfor production,所有这些值都分别azure.queueName在不同的application-(ENV).properpties中提供。我想将这些目标动态传递给JmsListener注释中的目标。当我尝试使用
@Value("${azure.queueName}")
private String dest;
Run Code Online (Sandbox Code Playgroud)
并将dest传递给注释 @JmsListener(destination = dest)
我收到The value for annotation attribute JmsListener.destination must be a constant expression错误消息。仔细检查了此错误后,我发现我们无法将动态值传递给注释。请帮助我如何解决此问题或任何其他解决方案。
我有交换和队列。生产者不需要消费确认,但在某些情况下,由于缺乏其他数据,消费者可能无法在当前时刻处理消息。因此,我想将这些消息返回到队列末尾。这该怎么做?还是在我拒绝消息时自动完成?
流动:
所以存在消息排序问题,在一般情况下,我按顺序接收消息,因为大多数组件正确传递消息。我想解决潜在的情况,当 Message1 的 Producer 由于负载过重或其他原因无法立即将消息进行交换时。在这种情况下,Message2 将首先被消费,但数据库中将没有足够的信息来处理它。我希望此消息返回到队列,但请确保此 Message2 将转到队列的尾部。如果它会继续下去,如果我只使用一个队列,我就会陷入无限循环。
附带问题是,是否可以跟踪消费者尝试处理消息但返回消息的次数。如果有可能像我之前描述的那样将消息放到队列的尾部,但是由于某种原因 Message1 的生产者死了,并且没有 Message1,我想让 Message2 在重试几次或一段时间后死掉。
下面有一个类似的问题:
单个 Spring 的 KafkaConsumer 侦听器可以收听多个主题吗?
所以我现在明白我可以为 KafkaListener 注释的主题参数提供一个字符串数组,但是我想知道以下内容:
我想在这里了解几件事。我的要求是,我想将记录存储在db中,并希望将消息发送到队列,然后如果抛出某些异常,则我希望以相同的方法说,我不想发送消息,也不想提交db事务。现在我想到了使用Spring事务,但是由于使用了两个不同的资源,想到了使用JTA使用一些atomikos来同步资源-但是我再次阅读了RMQ不支持2PC或XA等。无论如何,我继续尝试并没有添加atomikos首先尝试了所有这样做是为了确保我的频道已处理完毕,并且@Transaction批注已处理完毕,请参见下面的示例代码-我没有在pom中添加任何特殊内容。
现在我的问题是这是如何工作的,它与2PC有什么不同-方法可能出什么问题,什么情况会破坏使用此方法的最终一致性。令人惊讶的是,为什么我不必使用第三方jta。如果一切都很好-在我看来这最终保证了我们在使用Spring Goodies的rmq和db时的一致性!对于微服务:)
如果这不是一个好的解决方案,那有什么替代方案-如果可能的话,为了最终的一致性,我想避免使用工人流程等。
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@GetMapping
@Transactional
public void sampleEndpoint(@RequestParam boolean throwException){
Customer a=new Customer();
a.setCustomerName("XYZ");
customerRepository.save(a);
rabbitTemplate.convertAndSend("txtest","Test");
if(throwException)
throw new RuntimeException();
}
Run Code Online (Sandbox Code Playgroud)
我在上面的示例中使用Spring Boot 1.5.7使用了postgres依赖关系
distributed-transactions spring-transactions spring-rabbit spring-amqp spring-rabbitmq
我使用以下代码进行消息转换器:
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue queue,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queue.getName());
container.setMessageListener(listenerAdapter);
container.setMessageConverter(new Jackson2JsonMessageConverter());
return container;
}
Run Code Online (Sandbox Code Playgroud)
我的听众被宣布:
public void receiveMessage(List<Map<String, Object>> message) {
try {
System.out.println("Received <" + new String(message, "UTF-8") + ">");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
但它总是尝试给出以下错误:
Failed to invoke target method 'receiveMessage' with argument type = [class [B], value = [{[B@40c2d9c5}]","at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408)
Run Code Online (Sandbox Code Playgroud)
它似乎尝试调用 byte[] 作为参数,而不是将 json 字符串转换为 List>。
我在使用 docker 容器部署我的应用程序 spring 启动应用程序时遇到技术问题。
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: 无法加载 SSL 密钥库 /tmp/tomcat-docbase.4737956707529585395.8080/deployments/app/certs/kafka.truststore.jks
/deployments 是我在 dockerfile 中配置的工作目录
我发现它选择 tmp/tomcat docbase 很奇怪,因为在为其他信任库配置时,它进入了正确的位置。这是我的 application.yaml 里面的内容
spring:
kafka:
bootstrap-servers:localhost:9092
ssl:
truststore-location: /deployments/app/certs/kafka-truststore.jks
truststore-password: test
consumer:
group-id: consumerid
server:
ssl:
enabled: false
key-store: /deployments/app/certs/dp--dev.jks
key-store-password: changeit
trust-store: /deployments/app/certs/ol-truststore-dev.jks
trust-store-password: test
Run Code Online (Sandbox Code Playgroud)
它是我遗漏的东西还是与我正在使用的 kafka springframework lib 有关?
在 Spring 启动应用程序中为 kafka 使用者设置重试策略时,何时使用 ExponentialBackOffPolicy 与 FixedBackOffPolicy?
我将FixedBackOffPolicy视为BackOffPolicy 的一种实现,它在继续之前暂停一段固定的时间,而ExponentialBackOffPolicy 则视为 BackOffPolicy 的一种实现,它增加了给定集中每次重试尝试的回退时间。
除此之外, FixedBackOffPolicy 扩展了 StatelessBackOffPolicy 而 ExponentialBackOffPolicy 没有。在这方面,请帮助我理解,什么是更喜欢一个而不是另一个的合适用例?
我有一个监听 kafka 的 Spring-boot 应用程序。为了避免重复处理,我尝试进行手动提交。为此,我在阅读主题后立即引用了异步提交消息。但我陷入了如何实现消费者幂等性的困境,这样记录就不会被处理两次。
apache-kafka ×4
spring-boot ×4
spring-kafka ×4
spring ×3
rabbitmq ×2
spring-amqp ×2
amqp ×1
annotations ×1
docker ×1
java ×1
resttemplate ×1
spring-batch ×1
spring-el ×1
spring-jms ×1
spring-rest ×1
xml ×1