小编czg*_*jic的帖子

春云溪流kinesis Binder

我正在尝试实现一个 spring boot aws kinesis 消费者,它能够自动缩放,以便与原始实例共享负载(分割处理碎片)。

我能够做的:使用Kinesis binder 文档中提供的定义良好的自述文件和示例,我已经能够启动多个消费者,这些消费者实际上通过提供这些属性来划分分片以进行处理。

在生产者上,我通过应用程序属性提供partitionCount: 2。对于消费者,我提供了instanceIndex 和instanceCount。

在消费者1上我有instanceIndex = 0和instantCount = 2,在消费者2上我有instanceIndex = 1和instantCount = 2

这工作正常,我有两个处理其特定分片的 Spring Boot 应用程序。但在这种情况下,我必须为每个启动应用程序提供一个预配置的属性文件,该文件需要在加载时可用,以便它们分割负载。如果我只启动第一个消费者(非自动缩放),我只处理特定于索引 0 的分片,而不处理其他分片。

我想做但不确定是否可以部署一个消费者(处理所有分片)。如果我部署另一个实例,我希望该实例重新体验某些负载的第一个消费者,换句话说,如果我有 2 个分片和一个消费者,它将处理这两个实例,如果我随后部署另一个应用程序,我希望第一个消费者到目前为止,仅处理单个分片,将第二个分片留给第二个消费者。

我尝试通过不在消费者上指定instanceIndex或instanceCount而仅提供组名称来做到这一点,但这使得第二个消费者处于空闲状态,直到第一个消费者关闭。仅供参考,我还创建了自己的元数据和锁定表,以防止活页夹创建默认的元数据和锁定表。

配置:生产者-----------------

originator: KinesisProducer
server:
 port: 8090

    spring: 
      cloud: 
        stream: 
          bindings:
            output: 
              destination: <stream-name> 
              content-type: application/json
              producer: 
                headerMode: none
                partitionKeyExpression: headers.type
Run Code Online (Sandbox Code Playgroud)

消费者-------------------------------------

originator: KinesisSink
server:
 port: 8091

spring:
  cloud:
    stream:
      kinesis:
        bindings:
          input:
            consumer:
              listenerMode: batch
              recordsLimit: 10
              shardIteratorType: TRIM_HORIZON
        binder:
          checkpoint:
            table: <checkpoint-table>
          locks:
            table: <locking-table
      bindings: …
Run Code Online (Sandbox Code Playgroud)

spring spring-integration amazon-web-services spring-cloud-stream

5
推荐指数
1
解决办法
4223
查看次数