标签: spring-cloud

Spring Cloud for AWS - 如何转换从 SQS 接收并由 S3 发送的消息

我已将 S3 配置为在每次上传文件时触发一个事件。该事件将发送至 SQS。现在我想使用 Spring Cloud for AWS 从队列中读取消息。有没有办法将有效负载转换为表示 S3 事件记录的已知类型?我尝试过

package com.test;
import java.util.Map;

import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;

import com.amazonaws.services.s3.event.S3EventNotification.S3EventNotificationRecord;

public class SQSHandler {
    @MessageMapping("MediaQueue")
    private void receiveMessage(S3EventNotificationRecord message, @Headers Map<String, String> headers) {
        System.out.println("MYK MYK MYK");
    }
}
Run Code Online (Sandbox Code Playgroud)

但我收到错误

ord, message=GenericMessage [payload={这里是整个有效负载}] 位于 org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115) 位于 org.springframework.messaging.handler.inspiration.HandlerMethodArgumentResolverComposite .resolveArgument(HandlerMethodArgumentResolverComposite.java:77)在org.springframework.messaging.handler.invocable.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)在org.springframework.messaging.handler.invocable.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:106 )在 org.springframework.messaging.handler.inspiration.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:447) ... 8 更多

你知道有什么方法可以做这样的事情吗?

amazon-s3 amazon-sqs amazon-web-services spring-cloud

2
推荐指数
1
解决办法
2523
查看次数

FeignClient 带有客户端证书和 Docker

我的微服务需要使用双向 ssl。每个微服务都是一个 Spring Boot 应用程序,注释为:

@SpringBootApplication
@EnableFeignClients
@EnableDiscoveryClient
@EnableZuulProxy
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

每个 yml 对 eureka/ribbon 都有类似的配置:

eureka:
    client:
        service-url:
          defaultZone: ${EUREKA_CLIENT_SERVICEURL_PROTOCOL:http}://${EUREKA_CLIENT_SERVICEURL_HOST:192.168.99.100}:${EUREKA_CLIENT_SERVICEURL_PORT:8761}/eureka/
    instance:
        secure-virtual-host-name: ${spring.application.name}
        prefer-ip-address: true
        non-secure-port-enabled: ${EUREKA_NON_SECURE_PORT_ENABLED:false}
        secure-port-enabled: ${EUREKA_SECURE_PORT_ENABLED:true}
        secure-port: ${server.port}
ribbon:
    IsSecure: true
    eureka:
        enabled: true
Run Code Online (Sandbox Code Playgroud)

每个微服务都有一个控制器,该控制器公开用于各种功能的 REST API。

当一个微服务需要调用另一个微服务端点时,我尝试通过创建该微服务的客户端接口来实现:

@FeignClient(name = "user", configuration = FeignConfiguration.class, url = "https://user")
public interface UserClient {
    @RequestMapping(method = RequestMethod.GET, value = "/test")
    String testUser();
}
Run Code Online (Sandbox Code Playgroud)

这是 Feign 配置:

@Configuration …
Run Code Online (Sandbox Code Playgroud)

spring docker spring-cloud netflix-eureka netflix-ribbon

2
推荐指数
1
解决办法
1万
查看次数

根路径上的 Zuul 路由

我想配置 zuul 将请求路由到 root/到主页。我试过:

root:
  path: /
  url: http://hostname/home/index.jsp
Run Code Online (Sandbox Code Playgroud)

root:
  path: /**
  url: http://hostname/home/index.jsp
Run Code Online (Sandbox Code Playgroud)

但它们都不起作用。我刚刚收到 404 NOT FOUND。我认为路径匹配配置应该与具有上下文的配置类似,例如/service/**,但事实并非如此。

spring-cloud netflix-zuul

2
推荐指数
1
解决办法
3930
查看次数

Eureka检测服务状态

语境

\n\n

我们使用 Spring Cloud Netflix 和 Eureka 作为服务发现,使用 Zuul 来代理服务并对其进行负载平衡。

\n\n

微服务是使用 NodeJS 实现的,并使用 NPM 模块eureka-js-client和中间的自定义层在 Eureka 注册,该自定义层处理所有微服务通用的配置和内容。

\n\n
\n\n

问题

\n\n

问题是尤里卡无法识别一项服务是否出现故障。这是一个问题,因为我们拥有一个具有自动部署功能的开发基础架构,每次都会在不同端口上重新部署和重新启动微服务,而无需重新启动 Eureka(和 Zuul)。

\n\n

因此,过了一段时间,我们就有了十个或更多的一个微服务实例,其中只有一个处于运行状态,但所有实例都被识别为 being UP

\n\n
\n\n

解决方法

\n\n
    \n
  1. 我尝试在客户端上设置heartbeatInterval较小的值,但这没有帮助。

  2. \n
  3. 我尝试在服务器上设置renewalThresholdUpdateIntervalMs较小的值,但这也没有帮助。

  4. \n
  5. 更多令人沮丧、无益的属性尝试\xe2\x80\xa6

  6. \n
\n\n
\n\n

问题

\n\n

如何配置 Eureka 驱逐实例或将DOWN在合理时间(不是 30 分钟左右)内未发送心跳的实例设置为状态?

\n\n
\n\n

代码片段

\n\n

服务器本身不包含可提及的代码(只是使用 Spring Cloud Starter 启动 Eureka 服务器的一些注释)。

\n\n

Eureka服务器的配置(我已经删除了所有不工作的尝试):

\n\n
server:\n  port: 8761\n\nspring:\n  cloud:\n    client:\n      hostname: localhost\n\neureka:\n  instance:\n …
Run Code Online (Sandbox Code Playgroud)

java spring node.js spring-cloud netflix-eureka

2
推荐指数
1
解决办法
8093
查看次数

Spring Cloud Consul健康检查配置

我正在将 Spring Boot 应用程序作为 Docker 容器运行。到目前为止,这工作得很好,但是当我尝试使用 Spring Cloud Consul 时,它也让我有些头痛。它可以很好地从 Consul KVS 读取配置,但健康检查似乎正在运行。

默认运行状况检查使用 docker 容器的主机名,例如http://users-microservice/health。显然,从领事访问时这不会解决。

没问题,文档提到您可以在 bootstrap.yml 文件中使用healthCheckPath来配置它。这就是我现在所拥有的:

spring:
    application:
        name: users-microservice
    cloud:
        consul:
        host: myserver.com
        port: 8500
        config:
            prefix: API-CONFIG
            profileSeparator: '__'
        discovery:
            tags: users-microservice
            healthCheckPath: http://myserver.com:${server.port}/status
            healthCheckInterval: 30s
Run Code Online (Sandbox Code Playgroud)

不幸的是,这个变量的使用方式似乎与我预期的非常不同。这就是领事试图达到的目标:

Get http://users:18090http//myserver.com:18090/status: dial tcp: unknown port tcp/18090http

我怎样才能解决这个问题?我应该设置一些未记录的配置参数吗?

spring-boot consul spring-cloud

2
推荐指数
1
解决办法
6424
查看次数

使用 Ribbon 进行负载平衡

我有一个关于 Ribbon 如何选择服务器的快速问题。

假设我有两个可以通过功能区选择的服务。Ribbon 如何知道要选择哪个服务?它是否检查这两项服务以确定哪一项过载程度较低?如果是,它是否调用/metrics以及考虑哪个指标?

非常感谢您的回答

spring load-balancing spring-boot spring-cloud netflix-ribbon

2
推荐指数
1
解决办法
1846
查看次数

Spring Cloud 的 XACML

我将使用 OAuth2 和 XACML(使用 AuthZForce、Balana、AT&T XACML 或类似的东西)来保护我的 Spring Cloud 应用程序。

我想使用 Spring-Cloud(-Netflix) 的微服务。为了使 XACML 可用,我认为我需要这个:

  1. 每个现有 API 服务的 PEP
  2. PDP 作为新服务,由 PEP 使用。因为Spring-Cloud(-Netflix)具有负载均衡功能(Eureka),所以我需要在Eureka上注册该服务并实现REST-API。
  3. 由于所有 PDP 应使用相同的策略,因此需要集中存储它们(策略提供者)

哪种框架最适合这种方法。

update 1 AuthZForce 应该可以(根据功能描述),但我不太确定如何(没有详细的文档或教程)。

authentication xacml spring-cloud spring-cloud-netflix authzforce

2
推荐指数
1
解决办法
1096
查看次数

从 zuul 过滤器发送重定向

我正在对我们的 zuul 过滤器集进行补充。这个附加过滤器将查看标头并决定将请求重定向到我将配置的已知位置。

这是过滤器的运行方法的主体

 @Override
    public Object run() {
        log.debug("Running the PreRouteTransMarkAndLoggingZuulFilter filter ");
        // retrieve redirect URL
        String redirectURL = filterConfigurationBean.getRedirectURL();
        if (redirectURL.isEmpty()) {
            return null;
        }
        // get the white list for allowed entries
        Set<String> whiteList = new HashSet<>(Arrays.asList(filterConfigurationBean.getWhiteList().split(",")));
        RequestContext ctx = RequestContext.getCurrentContext();
        // if request url is part of white list then allow
        String url = ctx.getRequest().getRequestURL().toString();
        if (checkWhiteList(url, whiteList)) {
            return null;
        }
        // get headers
        // check if an authorization header is present
        if (validHeader(ctx.getRequest())) …
Run Code Online (Sandbox Code Playgroud)

spring-cloud

2
推荐指数
1
解决办法
9664
查看次数

并行处理量简单队列服务(SQS)

我正在使用 Spring Cloud 来使用简单队列服务(SQS)。我有以下并行处理配置:

@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setConcurrencyLimit(50);
    return simpleAsyncTaskExecutor;
}

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor) {

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAutoStartup(true);
    factory.setTaskExecutor(simpleAsyncTaskExecutor);
    factory.setWaitTimeOut(20);
    factory.setMaxNumberOfMessages(10);

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我需要在 50 个线程中处理 50 条消息(在 bean SimpleAsyncTaskExecutor 中配置),但仅并行处理 10 条消息(从 SQS 返回的 maxNumberOfMessages)

如何处理 50 条消息而不是 10 条?

java spring amazon-sqs spring-cloud

2
推荐指数
1
解决办法
6147
查看次数

Spring Cloud Stream kafka - 获取批量并错过心跳

我正在查看一个 Spring Boot 服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中,并将结果发布到另一个主题。

这是通过

@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
Run Code Online (Sandbox Code Playgroud)

这是在几个服务中完成的,通常工作得很好。唯一的属性集是

spring.cloud.stream.binder.consumer.concurrency=20
Run Code Online (Sandbox Code Playgroud)

主题本身有 20 个分区,应该适合。

在监控 kafka 的读取时,我们发现吞吐量非常低和奇怪的行为:

该应用程序一次最多读取 500 条消息,然后是 1-2 分钟的无内容。在此期间,消费者反复记录“缺少心跳,因为分区被重新平衡”,“重新分配分区”,有时甚至抛出异常说“提交失败,因为轮询间隔已过”

我们得出的结论是,这意味着消费者获取 500 条消息,需要很长时间来处理所有消息,错过了它的时间窗口,因此无法将 500 条消息中的任何一条提交给代理——代理重新分配分区并重新发送相同的消息再次。

在查看线程和文档后,我发现了“max.poll.records”属性,但在设置此属性的位置的建议中存在冲突。

有人说把它放在下面

spring.cloud.stream.bindings.consumer.<input>.configuration
Run Code Online (Sandbox Code Playgroud)

有人说

spring.cloud.stream.kafka.binders.consumer-properties
Run Code Online (Sandbox Code Playgroud)

我尝试将两者都设置为 1,但服务行为没有改变。

我如何正确处理这种情况,即消费者无法跟上默认设置所需的轮询间隔?

常见的yaml:

spring.cloud.stream.default.group=${spring.application.name}
Run Code Online (Sandbox Code Playgroud)

服务-yaml

spring:
  clould:
    stream:
      default:
        consumer.headerMode: embeddedHeaders
        producer.headerMode: embeddedHeaders
      bindings:
       someOutput:
         destination: outTopic
       someInput:
         destination: inTopic
           consumer:
             concurrency: 30
      kafka:
        bindings:
          consumer:
            someInput:
              configuarion:
                max.poll.records: 20 # ConsumerConfig ignores this
              consumer:
                enableDlq: true
                configuarion:
                  max.poll.records: 30 # ConsumerConfig ignores this …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud spring-cloud-stream

2
推荐指数
1
解决办法
3652
查看次数