标签: spring-cloud-stream

Spring Cloud Kafka Stream无法创建Producer配置错误

我有两个带有Kafka-stream依赖项的Spring启动项目,它们在gradle和完全相同的配置中具有完全相同的依赖关系,但是启动时项目之一的日志错误如下所示

11:35:37.974 [restartedMain] INFO  o.a.k.c.admin.AdminClientConfig - AdminClientConfig values: 
    bootstrap.servers = [192.169.0.109:6667]
    client.id = client
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null …
Run Code Online (Sandbox Code Playgroud)

gradle kotlin spring-boot spring-cloud-stream spring-kafka

7
推荐指数
1
解决办法
3237
查看次数

Spring Cloud Function - 不同消费者的单独路由表达式

我有一个服务,它从不同的消息队列接收不同的结构化消息。我们可以针对@StreamListener conditions每种消息类型选择如何处理该消息。举个例子:

\n\n

我们收到两种不同类型的消息,它们具有不同的标头字段和值,例如

\n\n

来自“订单”队列的传入:

\n\n
Order1: { Header: {catalog:groceries} }\nOrder2: { Header: {catalog:tools} }\n
Run Code Online (Sandbox Code Playgroud)\n\n

来自“发货”队列的传入:

\n\n
Shipment1: { Header: {region:Europe} }\nShipment2: { Header: {region:America} }\n
Run Code Online (Sandbox Code Playgroud)\n\n

每个队列都有一个绑定,根据该绑定,@StreamListener我可以按目录和区域以不同的方式处理消息

\n\n

例如

\n\n
@StreamListener(target = OrderSink.ORDER_CHANNEL, condition = "headers[\'catalog\'] == \'groceries\'")\npublic void onGroceriesOrder(GroceryOder order){\n...\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

那么问题是,如何使用新的 Spring Cloud Function 方法来实现这一点?

\n\n

在文档https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/spring-cloud-stream.html#_event_routing中提到:

\n\n

Also, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers …

spring spring-cloud-stream spring-cloud-function

7
推荐指数
1
解决办法
4151
查看次数

如何在spring cloud stream和kafka中发送和接收相同的主题

我有一个spring-cloud-streamkafka绑定的应用程序.我想从同一个可执行文件(jar)中发送和接收来自同一主题的消息.我有我的频道定义,如下所示: - public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

@StreamListener用来接收消息.我得到各种意想不到的错误.有时,我收到

  1. 没有为每个其他消息找到unknown.message.channel的调度程序
  2. 如果我将命令行kafka订阅者附加到上述论坛主题,它将收到所有其他消息.
  3. 我的应用程序接收所有其他消息,这是来自命令行订户的独占消息集.我确保我的应用程序在特定的组名下订阅.

是否有上述用例的工作示例?

spring spring-integration spring-boot spring-cloud-stream spring-cloud-dataflow

6
推荐指数
1
解决办法
2358
查看次数

使用Spring Cloud Stream将RabbitMQ消费者绑定到现有队列

我已经使用RabbitMQ的网络的用户界面的话题交换创造TX并绑定到交换两个队列TX.Q1TX.Q2,每个路由键绑定RK1RK2因此,和产生的交流几条消息.

现在我想使用Spring Cloud Stream创建一个消费者,它只接收来自Q1的消息.我尝试使用配置:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1
Run Code Online (Sandbox Code Playgroud)

以及使用@StreamListner(Sink.INPUT)消息的方法的注释.

结果我可以看到消费者创建了一个具有相同名称TX.Q1的队列(或绑定),但新队列/ bind的Routing-Key是#.
如何通过Spring Cloud Stream配置消费者,消费者将使用来自预定义队列的消息(仅限于使用rk1路由的消息).

java spring rabbitmq spring-cloud-stream

6
推荐指数
1
解决办法
2716
查看次数

Spring Cloud Stream Kafka消费者模式

对于具有多个分区的主题-

1)一个SpringBoot实例是否使用多个线程来处理(用StreamListener注释的方法)每个分区中的每个消息?

2)是否可以为每个分区配置多个线程,还是我必须将其从侦听器线程手动移交给工作池?

spring-cloud-stream

6
推荐指数
1
解决办法
982
查看次数

如何使用Spring Cloud Stream Kafka和每服务数据库实现微服务事件驱动架构

我正在尝试实现事件驱动的体系结构来处理分布式事务.每个服务都有自己的数据库,并使用Kafka发送消息以通知其他微服务有关操作.

一个例子:

 Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database
Run Code Online (Sandbox Code Playgroud)

订单收到订单请求.它必须将新订单存储在其数据库中并发布消息,以便付款服务意识到它必须为该项目收费:

私人订单业务订单业务;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}
Run Code Online (Sandbox Code Playgroud)

这些是我的疑惑:

  1. 步骤a.-(保存在订单DB中)和b.-(发布消息)应该在事务中以原子方式执行.我怎样才能做到这一点?
  2. 这与前一个相关:我发送消息:orderSource.output().send(MessageBuilder.withPayload(order).build()); 无论Kafka代理是否已关闭,此操作都是异步的,并且总是返回true.我如何知道该消息已经到达Kafka经纪人?

apache-kafka microservices spring-cloud spring-cloud-stream spring-kafka

6
推荐指数
1
解决办法
2838
查看次数

Spring AMQP对Spring Cloud Stream for Microservices Architecture有什么好处

我使用Spring Cloud方法来构建一些应该互相交互的微服务.对于微服务之间的消息传递,我打算使用RabbitMQ和Spring AMQP,但在看了Spring Cloud Stream之后,我感到很失落.在我看来,Spring Cloud Stream是下一个抽象层次(可能太强大,但你应该得到整体印象),它有许多非常有用的功能.所以我想知道为什么有人会使用Spring AMQP进行新的开发?当一个微服务向另一个微服务发送消息并收到回复时,你能否提供Spring Spring Stream的任何Spring AMQP优势?

谢谢.

spring-amqp spring-cloud-stream

6
推荐指数
1
解决办法
1211
查看次数

如何出于开发目的禁用Spring Cloud Stream绑定?

我需要禁用事件的发布和订阅才能进行开发,但是我无法为此找到一些配置属性/其他解决方案。我怎样才能做到这一点?

可能的解决方案:使用@EnableBinding某些属性集创建自动配置,并在禁用的情况下,用生成的无操作存根替换所有绑定接口。但是也许存在更简单的解决方案?

spring-boot spring-cloud-stream

6
推荐指数
1
解决办法
2753
查看次数

无法从链中的任何提供商加载 AWS 凭证 Spring Cloud Stream Kinesis Binder

如果不使用系统级别的默认配置,我无法连接到 Spring Cloud Stream Kinesis Binder (1.2.0.RELEASE) 中的 AWS kinesis。仅当系统已配置为使用默认配置文件并且使用 [default] 配置文件设置访问密钥 ID 和秘密访问密钥时,应用程序才能工作。否则,将无法通过抛出此异常来连接到 AWS 资源:

Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@3b2c8bda: Unable to load credentials from service endpoint, com.amazonaws.auth.profile.ProfileCredentialsProvider@688d619c: No AWS profile named 'default']
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:136)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1225)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:801)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:751)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:3768)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3737)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1836)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1804)
    at com.amazonaws.services.dynamodbv2.document.Table.describe(Table.java:137)
    at org.springframework.integration.aws.metadata.DynamoDbMetadataStore.afterPropertiesSet(DynamoDbMetadataStore.java:145)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774) …
Run Code Online (Sandbox Code Playgroud)

aws-java-sdk spring-cloud-stream spring-integration-aws

6
推荐指数
1
解决办法
2万
查看次数

当没有 kafka 代理运行时,如何出于开发目的禁用 Spring Cloud 流?

我有多个 Spring Boot 应用程序使用 kafka 代理实现 Spring Cloud 流。我想知道是否可以停止或禁用 Spring Cloud 流或 kafka 代理连接以启用应用程序。

java spring apache-kafka spring-cloud spring-cloud-stream

6
推荐指数
1
解决办法
3726
查看次数