我试图了解aws athena服务和新发布的s3 select之间的区别(仍在预览中).那两个用户的用途有何不同?这似乎都有助于从s3中选择部分数据.
amazon-s3 amazon-web-services amazon-athena amazon-s3-select
我想我在这里遗漏了一些东西......我正在尝试创建简单的兔子列表器,它可以接受自定义对象作为消息类型.现在按照医生的说法
在1.6之前的版本中,转换JSON的类型信息必须在消息头中提供,或者需要自定义ClassMapper.从版本1.6开始,如果没有类型信息头,则可以从目标方法参数推断出类型.
我在仪表板中使用rabbit mq adm手动将消息放入队列,得到错误
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.Customer] for GenericMessage [payload=byte[21], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=customer, amqp_deliveryTag=1, amqp_consumerQueue=customer, amqp_redelivered=false, id=81e8a562-71aa-b430-df03-f60e6a37c5dc, amqp_consumerTag=amq.ctag-LQARUDrR6sUcn7FqAKKVDA, timestamp=1485635555742}]
Run Code Online (Sandbox Code Playgroud)
我的配置:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("test");
connectionFactory.setPassword("test1234");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
Run Code Online (Sandbox Code Playgroud)
还有问题是这个异常消息没有放回队列中. …
我理解如下
预取只是控制代理一次允许消费者处理多少条消息。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。
但有关以下场景的问题:
假设预取为 200,我们有 2 个消费者空闲。经纪人收到了 150 条消息,我认为经纪人会随机选择一条消息并发送所有 150 条消息?我认为是的,它不会在消费者之间进行共享。
假设一个消费者有 100 条消息处于 unack 状态,其中一条处于空闲状态,再次预取为 200 条消息。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任一消息?或者它不会向已经有 100 条消息尚未确认的消费者提供
如果预取是200,一个消费者得到200,监听器会阻塞该线程(springrabbitmq listner方法)发送ack直到所有200被处理吗?我认为它不会一一发送ack,而是会等到所有预取的消息都处理完毕。换句话说,如果预取为 200 并且代理发送 200 条消息,那么代理何时开始收到确认?
是否可以在事务中运行以下代码,以便如果业务处理中引发异常,我们可以回滚发送到队列的消息?
rabbitTemplate.convertAndSend("queue1", data);
//do some processing
rabbitTemplate.convertAndSend("queue2", data);
Run Code Online (Sandbox Code Playgroud)
需要这样做的是,如果在向队列 1 发送消息后出现问题,但我们无法向队列 2 发送消息,该怎么办?或者,如果在将消息发送到队列时出现网络问题或其他问题怎么办?
我正在尝试s3 sdk进行非常基本的测试并得到以下错误.
引起:java.lang.NoSuchFieldError:SIGNING_REGION at com.amazonaws.services.s3.AmazonS3Client.createRequest(AmazonS3Client.java:4227)at com.amazonaws.services.s3.AmazonS3Client.createRequest(AmazonS3Client.java:4203)at com .amazonaws.services.s3.AmazonS3Client.listBuckets(AmazonS3Client.java:929)at com.amazonaws.services.s3.AmazonS3Client.listBuckets(AmazonS3Client.java:936)
AWS-Java的SDK-S3
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.288</version>
</dependency
ClientConfiguration cf = new ClientConfiguration();
AWSCredentials credentials = new BasicAWSCredentials("<id>","<secret>");
AmazonS3 amazonS3Client=
AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)).withRegion(Regions.US_EAST_1).build();
List<Bucket> buckets = amazonS3Client.listBuckets();
Run Code Online (Sandbox Code Playgroud)
Maven依赖树:
O] --- maven-dependency-plugin:2.10:tree (default-cli) @ AwsSdkDemo ---
O] com.example:AwsSdkDemo:jar:0.0.1-SNAPSHOT
O] +- org.springframework.boot:spring-boot-starter-data-jpa:jar:1.5.10.RELEASE:compile
O] | +- org.springframework.boot:spring-boot-starter:jar:1.5.10.RELEASE:compile
O] | | +- org.springframework.boot:spring-boot:jar:1.5.10.RELEASE:compile
O] | | +- org.springframework.boot:spring-boot-autoconfigure:jar:1.5.10.RELEASE:compile
O] | | +- org.springframework.boot:spring-boot-starter-logging:jar:1.5.10.RELEASE:compile
O] | | | +- ch.qos.logback:logback-classic:jar:1.1.11:compile
O] | | | | \- ch.qos.logback:logback-core:jar:1.1.11:compile
O] | …
Run Code Online (Sandbox Code Playgroud) 根据文档,defaultRequeueRejected 的默认值为 true,但查看代码似乎是 false。我不确定我是否遗漏了什么,或者我们必须在 SimpleRabbitListenerContainerFactory.java 中更改它
编辑
示例代码,将消息放入测试队列后,我希望它在失败后保持在队列中,但它正在将其抛出。我想重试消息,所以我在容器工厂中配置了它,如果重试后失败,我希望它回到队列中。我确定我在这里缺少理解。
@SpringBootApplication
public class MsgRequeExampleApplication {
public static void main(String[] args) {
SpringApplication.run(MsgRequeExampleApplication.class, args);
}
@Bean(name = "myContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setMissingQueuesFatal(false);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(500);
factory.setAdviceChain(new Advice[] { org.springframework.amqp.rabbit.config.RetryInterceptorBuilder.stateless()
.maxAttempts(2).backOffPolicy(backOffPolicy).build() });
return factory;
}
@RabbitListener(queues = "test", containerFactory = "myContainerFactory")
public void processAdvisory(Message message) throws MyBusinessException {
try{
//Simulating exception while processing message
String nullString=null;
nullString.length();
}catch(Exception ex){
throw new …
Run Code Online (Sandbox Code Playgroud) 我想在这里了解几件事。我的要求是,我想将记录存储在db中,并希望将消息发送到队列,然后如果抛出某些异常,则我希望以相同的方法说,我不想发送消息,也不想提交db事务。现在我想到了使用Spring事务,但是由于使用了两个不同的资源,想到了使用JTA使用一些atomikos来同步资源-但是我再次阅读了RMQ不支持2PC或XA等。无论如何,我继续尝试并没有添加atomikos首先尝试了所有这样做是为了确保我的频道已处理完毕,并且@Transaction批注已处理完毕,请参见下面的示例代码-我没有在pom中添加任何特殊内容。
现在我的问题是这是如何工作的,它与2PC有什么不同-方法可能出什么问题,什么情况会破坏使用此方法的最终一致性。令人惊讶的是,为什么我不必使用第三方jta。如果一切都很好-在我看来这最终保证了我们在使用Spring Goodies的rmq和db时的一致性!对于微服务:)
如果这不是一个好的解决方案,那有什么替代方案-如果可能的话,为了最终的一致性,我想避免使用工人流程等。
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@GetMapping
@Transactional
public void sampleEndpoint(@RequestParam boolean throwException){
Customer a=new Customer();
a.setCustomerName("XYZ");
customerRepository.save(a);
rabbitTemplate.convertAndSend("txtest","Test");
if(throwException)
throw new RuntimeException();
}
Run Code Online (Sandbox Code Playgroud)
我在上面的示例中使用Spring Boot 1.5.7使用了postgres依赖关系
distributed-transactions spring-transactions spring-rabbit spring-amqp spring-rabbitmq
我在我的服务中使用rabbit,但由于限制我无法在本地下载。为此,我想在内存代理中使用,并且认为 qpic 可以工作。我有以下配置,在日志中我可以看到 qpid 代理启动正常,但是当 Spring Boot 尝试发送消息时,它无法连接。
@Bean
Broker broker() throws Exception {
org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
BrokerOptions brokerOptions = new BrokerOptions();
brokerOptions.setConfigProperty("qpid.amqp_port", "5672");
brokerOptions.setConfigProperty("qpid.broker.defaultPreferenceStoreAttributes", "{\"type\": \"Noop\"}");
brokerOptions.setConfigProperty("qpid.vhost", "/");
brokerOptions.setConfigurationStoreType("Memory");
brokerOptions.setStartupLoggedToSystemOut(false);
broker.startup(brokerOptions);
return broker;
}
Run Code Online (Sandbox Code Playgroud)
在资源中我的初始配置如下:
{
"name": "Embedded Test Broker",
"modelVersion": "6.1",
"authenticationproviders" : [{
"name": "password",
"type": "Plain",
"secureOnlyMechanisms": [],
"users": [{"name": "guest", "password": "guest", "type": "managed"}]
}],
"ports": [{
"name": "AMQP",
"port": "${qpid.amqp_port}",
"authenticationProvider": "password",
"protocols": [ "AMQP_0_9_1" ],
"transports": [ "TCP" ],
"virtualhostaliases": [{ …
Run Code Online (Sandbox Code Playgroud) 配置: springCloudVersion = 'Finchley.SR1' springBootVersion = '2.0.2.RELEASE'
得到以下错误:
Caused by: java.lang.ClassNotFoundException: org.springframework.boot.actuate.metrics.CounterService
Run Code Online (Sandbox Code Playgroud)
这个版本的 spring-starter-actuator-2.0.2.RELEASE 中没有 CounterService 类??参考:https : //docs.spring.io/spring-boot/docs/2.0.2.RELEASE/api/
spring-amqp ×6
rabbitmq ×3
amazon-s3 ×2
spring-boot ×2
amqp ×1
aws-java-sdk ×1
aws-sdk ×1
qpid ×1