我已将 S3 配置为在每次上传文件时触发一个事件。该事件将发送至 SQS。现在我想使用 Spring Cloud for AWS 从队列中读取消息。有没有办法将有效负载转换为表示 S3 事件记录的已知类型?我尝试过
package com.test;
import java.util.Map;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;
import com.amazonaws.services.s3.event.S3EventNotification.S3EventNotificationRecord;
public class SQSHandler {
@MessageMapping("MediaQueue")
private void receiveMessage(S3EventNotificationRecord message, @Headers Map<String, String> headers) {
System.out.println("MYK MYK MYK");
}
}
Run Code Online (Sandbox Code Playgroud)
但我收到错误
ord, message=GenericMessage [payload={这里是整个有效负载}] 位于 org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115) 位于 org.springframework.messaging.handler.inspiration.HandlerMethodArgumentResolverComposite .resolveArgument(HandlerMethodArgumentResolverComposite.java:77)在org.springframework.messaging.handler.invocable.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)在org.springframework.messaging.handler.invocable.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:106 )在 org.springframework.messaging.handler.inspiration.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:447) ... 8 更多
你知道有什么方法可以做这样的事情吗?
我的微服务需要使用双向 ssl。每个微服务都是一个 Spring Boot 应用程序,注释为:
@SpringBootApplication
@EnableFeignClients
@EnableDiscoveryClient
@EnableZuulProxy
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
每个 yml 对 eureka/ribbon 都有类似的配置:
eureka:
client:
service-url:
defaultZone: ${EUREKA_CLIENT_SERVICEURL_PROTOCOL:http}://${EUREKA_CLIENT_SERVICEURL_HOST:192.168.99.100}:${EUREKA_CLIENT_SERVICEURL_PORT:8761}/eureka/
instance:
secure-virtual-host-name: ${spring.application.name}
prefer-ip-address: true
non-secure-port-enabled: ${EUREKA_NON_SECURE_PORT_ENABLED:false}
secure-port-enabled: ${EUREKA_SECURE_PORT_ENABLED:true}
secure-port: ${server.port}
ribbon:
IsSecure: true
eureka:
enabled: true
Run Code Online (Sandbox Code Playgroud)
每个微服务都有一个控制器,该控制器公开用于各种功能的 REST API。
当一个微服务需要调用另一个微服务端点时,我尝试通过创建该微服务的客户端接口来实现:
@FeignClient(name = "user", configuration = FeignConfiguration.class, url = "https://user")
public interface UserClient {
@RequestMapping(method = RequestMethod.GET, value = "/test")
String testUser();
}
Run Code Online (Sandbox Code Playgroud)
这是 Feign 配置:
@Configuration …Run Code Online (Sandbox Code Playgroud) 我想配置 zuul 将请求路由到 root/到主页。我试过:
root:
path: /
url: http://hostname/home/index.jsp
Run Code Online (Sandbox Code Playgroud)
和
root:
path: /**
url: http://hostname/home/index.jsp
Run Code Online (Sandbox Code Playgroud)
但它们都不起作用。我刚刚收到 404 NOT FOUND。我认为路径匹配配置应该与具有上下文的配置类似,例如/service/**,但事实并非如此。
语境
\n\n我们使用 Spring Cloud Netflix 和 Eureka 作为服务发现,使用 Zuul 来代理服务并对其进行负载平衡。
\n\n微服务是使用 NodeJS 实现的,并使用 NPM 模块eureka-js-client和中间的自定义层在 Eureka 注册,该自定义层处理所有微服务通用的配置和内容。
\n\n问题
\n\n问题是尤里卡无法识别一项服务是否出现故障。这是一个问题,因为我们拥有一个具有自动部署功能的开发基础架构,每次都会在不同端口上重新部署和重新启动微服务,而无需重新启动 Eureka(和 Zuul)。
\n\n因此,过了一段时间,我们就有了十个或更多的一个微服务实例,其中只有一个处于运行状态,但所有实例都被识别为 being UP。
解决方法
\n\n我尝试在客户端上设置heartbeatInterval较小的值,但这没有帮助。
我尝试在服务器上设置renewalThresholdUpdateIntervalMs较小的值,但这也没有帮助。
更多令人沮丧、无益的属性尝试\xe2\x80\xa6
问题
\n\n如何配置 Eureka 驱逐实例或将DOWN在合理时间(不是 30 分钟左右)内未发送心跳的实例设置为状态?
代码片段
\n\n服务器本身不包含可提及的代码(只是使用 Spring Cloud Starter 启动 Eureka 服务器的一些注释)。
\n\nEureka服务器的配置(我已经删除了所有不工作的尝试):
\n\nserver:\n port: 8761\n\nspring:\n cloud:\n client:\n hostname: localhost\n\neureka:\n instance:\n …Run Code Online (Sandbox Code Playgroud) 我正在将 Spring Boot 应用程序作为 Docker 容器运行。到目前为止,这工作得很好,但是当我尝试使用 Spring Cloud Consul 时,它也让我有些头痛。它可以很好地从 Consul KVS 读取配置,但健康检查似乎正在运行。
默认运行状况检查使用 docker 容器的主机名,例如http://users-microservice/health。显然,从领事访问时这不会解决。
没问题,文档提到您可以在 bootstrap.yml 文件中使用healthCheckPath来配置它。这就是我现在所拥有的:
spring:
application:
name: users-microservice
cloud:
consul:
host: myserver.com
port: 8500
config:
prefix: API-CONFIG
profileSeparator: '__'
discovery:
tags: users-microservice
healthCheckPath: http://myserver.com:${server.port}/status
healthCheckInterval: 30s
Run Code Online (Sandbox Code Playgroud)
不幸的是,这个变量的使用方式似乎与我预期的非常不同。这就是领事试图达到的目标:
Get http://users:18090http//myserver.com:18090/status: dial tcp: unknown port tcp/18090http
我怎样才能解决这个问题?我应该设置一些未记录的配置参数吗?
我有一个关于 Ribbon 如何选择服务器的快速问题。
假设我有两个可以通过功能区选择的服务。Ribbon 如何知道要选择哪个服务?它是否检查这两项服务以确定哪一项过载程度较低?如果是,它是否调用/metrics以及考虑哪个指标?
非常感谢您的回答
spring load-balancing spring-boot spring-cloud netflix-ribbon
我将使用 OAuth2 和 XACML(使用 AuthZForce、Balana、AT&T XACML 或类似的东西)来保护我的 Spring Cloud 应用程序。
我想使用 Spring-Cloud(-Netflix) 的微服务。为了使 XACML 可用,我认为我需要这个:
哪种框架最适合这种方法。
update 1 AuthZForce 应该可以(根据功能描述),但我不太确定如何(没有详细的文档或教程)。
authentication xacml spring-cloud spring-cloud-netflix authzforce
我正在对我们的 zuul 过滤器集进行补充。这个附加过滤器将查看标头并决定将请求重定向到我将配置的已知位置。
这是过滤器的运行方法的主体
@Override
public Object run() {
log.debug("Running the PreRouteTransMarkAndLoggingZuulFilter filter ");
// retrieve redirect URL
String redirectURL = filterConfigurationBean.getRedirectURL();
if (redirectURL.isEmpty()) {
return null;
}
// get the white list for allowed entries
Set<String> whiteList = new HashSet<>(Arrays.asList(filterConfigurationBean.getWhiteList().split(",")));
RequestContext ctx = RequestContext.getCurrentContext();
// if request url is part of white list then allow
String url = ctx.getRequest().getRequestURL().toString();
if (checkWhiteList(url, whiteList)) {
return null;
}
// get headers
// check if an authorization header is present
if (validHeader(ctx.getRequest())) …Run Code Online (Sandbox Code Playgroud) 我正在使用 Spring Cloud 来使用简单队列服务(SQS)。我有以下并行处理配置:
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(50);
return simpleAsyncTaskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAutoStartup(true);
factory.setTaskExecutor(simpleAsyncTaskExecutor);
factory.setWaitTimeOut(20);
factory.setMaxNumberOfMessages(10);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我需要在 50 个线程中处理 50 条消息(在 bean SimpleAsyncTaskExecutor 中配置),但仅并行处理 10 条消息(从 SQS 返回的 maxNumberOfMessages)
如何处理 50 条消息而不是 10 条?
我正在查看一个 Spring Boot 服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中,并将结果发布到另一个主题。
这是通过
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
Run Code Online (Sandbox Code Playgroud)
这是在几个服务中完成的,通常工作得很好。唯一的属性集是
spring.cloud.stream.binder.consumer.concurrency=20
Run Code Online (Sandbox Code Playgroud)
主题本身有 20 个分区,应该适合。
在监控 kafka 的读取时,我们发现吞吐量非常低和奇怪的行为:
该应用程序一次最多读取 500 条消息,然后是 1-2 分钟的无内容。在此期间,消费者反复记录“缺少心跳,因为分区被重新平衡”,“重新分配分区”,有时甚至抛出异常说“提交失败,因为轮询间隔已过”
我们得出的结论是,这意味着消费者获取 500 条消息,需要很长时间来处理所有消息,错过了它的时间窗口,因此无法将 500 条消息中的任何一条提交给代理——代理重新分配分区并重新发送相同的消息再次。
在查看线程和文档后,我发现了“max.poll.records”属性,但在设置此属性的位置的建议中存在冲突。
有人说把它放在下面
spring.cloud.stream.bindings.consumer.<input>.configuration
Run Code Online (Sandbox Code Playgroud)
有人说
spring.cloud.stream.kafka.binders.consumer-properties
Run Code Online (Sandbox Code Playgroud)
我尝试将两者都设置为 1,但服务行为没有改变。
我如何正确处理这种情况,即消费者无法跟上默认设置所需的轮询间隔?
常见的yaml:
spring.cloud.stream.default.group=${spring.application.name}
Run Code Online (Sandbox Code Playgroud)
服务-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this …Run Code Online (Sandbox Code Playgroud) spring-cloud ×10
spring ×4
amazon-sqs ×2
java ×2
spring-boot ×2
amazon-s3 ×1
apache-kafka ×1
authzforce ×1
consul ×1
docker ×1
netflix-zuul ×1
node.js ×1
xacml ×1