在我详细解释问题之前,我告诉你我目前的做法.
我有一个运行setInterval()的js脚本.每个间隔,我都会调用SQS从队列中获取消息.如果有消息,那我就处理它.
所以,它将无限运行,直到我杀死进程.
我之前也构建了一个节点服务器(使用nodejs.org中的示例)
所以,我想知道的是,...而不是定期运行setInterval.有没有办法,如果SQS中有新消息,那么它会触发事件并处理消息?
我在SQS队列中有一小组消息,即使发送到AWS端点的删除请求返回200响应,也不会删除这些消息.我的应用程序处理的消息很好,删除请求也很好.
我正在使用Java AWS SDK 1.3.6.
还有其他人遇到过这个问题吗?
我试图向另一个域发出ajax请求,它已经可以工作了,但现在我还有另一个问题......
这是我的代码:
function getChannelMessages(channel) {
jQuery.support.cors = true;
$.ajax(channel, {
cache : true,
type : "get",
data : _channels[channel].request,
global : false,
dataType : "jsonp text xml",
jsonp : false,
success : function jsonpCallback (response) {
console.log(response);
updateChannelRequest(channel);
//getChannelMessages(channel);
}
});
}
Run Code Online (Sandbox Code Playgroud)
正如我所说的,它已经工作,但问题是,服务器返回的XML(不是我的服务器,是从另一家公司另一个服务器 - Web服务 - 所以我不能改变什么,它返回),并作为JSONP期待一个JSON它失败并出现错误:
SyntaxError: syntax error
<?xml version="1.0"?><ReceiveMessageResponse xmlns="http://q ... />
Run Code Online (Sandbox Code Playgroud)
根据jQuery文档,添加jsonp text xml应该是魔术,将响应转换为简单文本,然后将其解析为XML,但它不起作用.
我已经能够使用YQL,但它每小时限制10,000个请求,我正在开发的系统每小时将有多达1000万个请求.出于同样的原因,我无法在我自己的服务器中"代理"这些请求......
FYI:我想从SQS最新的消息,所以如果有反正告诉它以将数据返回为JSON,它会更容易,更好,但我还没有找到文档中任何事情要么...
我们正在使用具有多个SQS队列订户的SNS主题实施AWS扇出模式.我想知道如果我成功地在SNS主题上发布消息会发生什么,但是由于某种原因它无法将其转发到队列中.SNS会重试,如果有的话,有没有办法控制它.
我找到了这个页面http://docs.aws.amazon.com/sns/latest/dg/DeliveryPolicies.html,它讨论了为SNS HTTP/HTTPS端点配置重试策略,但SQS上没有任何内容.
我试图围绕消息队列模型和我想在PHP应用程序中实现的作业:
我的目标是卸载需要发送到多个第三方API的消息/数据,因此访问它们不会减慢客户端的速度.因此,将数据发送到消息队列是理想的选择.
我考虑使用Gearman来保存MQ/Jobs,但我想使用像SQS或Rackspace Cloud Queues这样的Cloud Queue服务,所以我不必管理这些消息.
这是我认为我应该做的图表:
问题:
我的工作人员,将用PHP编写,他们都必须轮询云队列服务?特别是当你有很多工人时,这可能会变得昂贵.
我想也许有一个工人只是为了轮询队列,如果有消息,通知其他工人他们有工作,我只需要让这个1工人在线使用supervisord?这种轮询方法比使用可以通知的MQ更好吗?我应该如何每秒轮询一次MQ,或者每秒轮询一次?如果看到它放慢速度,那么增加投票工作人员?
我还考虑为所有消息设置一个队列,然后是工作人员监视,根据需要处理的位置将消息分发到其他云MQ,因为1个消息可能需要由2个差异工作者处理.
我还需要gearman管理我的工人supervisord吗?还是我可以用来上下旋转工人?
每当发送消息与轮询MQ时,向主工作人员发送通知是否更有效和更快?我假设我需要用来gearman通知我的主要工作者MQ有一条消息,所以它可以开始检查它.或者,如果我每秒有300条消息,这将产生300个作业来检查MQ?
基本上我怎么能尽可能有效和有效地检查MQ?
对我的架构的建议或更正?
我有一个AWS SQS队列,其权限策略如下所示:
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-east-1:123123123:default_staging/SQSDefaultPolicy",
"Statement": [
{
"Sid": "Sid123123123123",
"Effect": "Allow",
"Principal": {
"AWS": "123123123123"
},
"Action": "SQS:*",
"Resource": "arn:aws:sqs:us-east-1:123123123123123:default_staging"
}
]
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,我无法default_staging从我在不同地区的AWS服务器上向队列添加消息.
如果我将权限策略设置为全开,我可以default_staging从其他区域添加消息.
如何调整我的政策以允许SQS:*我所有地区的行动?
亚马逊已经宣布他们新的FIFO SQS服务,我想在Laravel Queue中使用它来解决一些并发问题.
我创建了几个新队列并更改了配置.但是,我收到了一个MissingParameter错误
The request must contain the parameter MessageGroupId.
Run Code Online (Sandbox Code Playgroud)
所以我修改了文件 vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);
return $response->get('MessageId');
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
])->get('MessageId');
}
Run Code Online (Sandbox Code Playgroud)
我正在使用它APP_ENV …
我正在尝试传递然后将带有属性的消息检索到AWS SQS中.即使我可以通过管理控制台查看消息的属性,但我无法使用boto3获取它们,总是得到无.更改"AttributeNames"并没有什么不同.可以检索邮件正文.
import boto3
sqs = boto3.resource('sqs', region_name = "us-west-2")
queue = sqs.get_queue_by_name(QueueName='test')
queue.send_message(MessageBody = "LastEvaluatedKey",
MessageAttributes ={
'class_number':{
"StringValue":"Value value ",
"DataType":"String"
}
}
)
messages = queue.receive_messages(
MaxNumberOfMessages=1,
AttributeNames=['All']
)
for msg in messages:
print(msg.message_attributes) # returns None
print(msg.body) # returns correct value
Run Code Online (Sandbox Code Playgroud) 在下面的示例中,我将max和core pool size设置为1.但是没有消息正在处理中.当我启用调试日志时,我能够看到从SQS中提取的消息,但我猜它没有被处理/删除.但是,当我将核心和最大池大小增加到2时,似乎会处理消息.
编辑
我相信Spring可能为接收器分配一个线程,该接收器从队列中读取数据,因此无法将线程分配给正在处理消息的侦听器.当我将corepoolsize增加到2时,我看到消息正在从队列中读取.当我添加另一个监听器(用于死信队列)时,我遇到了同样的问题 - 由于没有处理消息,因此2个线程是不够的.当我将corepoolsize增加到3时,它开始处理消息.我假设在这种情况下,分配了1个线程来读取队列中的消息,并为2个侦听器分配了1个线程.
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e); …Run Code Online (Sandbox Code Playgroud) 尝试使用SQS消息时,看到以下异常:
org.springframework.messaging.converter.MessageConversionException:
Cannot convert from [java.lang.String] to [com.example.demo.Foo] for GenericMessage [payload={}, headers={LogicalResourceId=my-queue, ApproximateReceiveCount=1, SentTimestamp=1529021258825, ReceiptHandle=xxxx, Visibility=org.springframework.cloud.aws.messaging.listener.QueueMessageVisibility@47ce6922, SenderId=xxxx, lookupDestination=my-queue, ApproximateFirstReceiveTimestamp=1529021264456, MessageId=xxxx}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:515)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:473)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:409)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:205)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:342)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:397)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
Spring Boot的代码如下:
@Configuration
@EnableSqs
public class AmazonSqsConfiguration {
@Bean
public AmazonSQS amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(Regions.US_WEST_2)
.build();
}
}
@Service
public class MyService {
// Throws MessageConversionException
@SqsListener("my-queue")
public void listen(Foo payload) {
} …Run Code Online (Sandbox Code Playgroud) amazon-sqs ×10
java ×2
php ×2
spring-cloud ×2
amazon-sns ×1
boto3 ×1
events ×1
gearman ×1
javascript ×1
jquery ×1
jsonp ×1
laravel ×1
node.js ×1
spring ×1
supervisord ×1
xml ×1