我有两个带有Kafka-stream依赖项的Spring启动项目,它们在gradle和完全相同的配置中具有完全相同的依赖关系,但是启动时项目之一的日志错误如下所示
11:35:37.974 [restartedMain] INFO o.a.k.c.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [192.169.0.109:6667]
client.id = client
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null …Run Code Online (Sandbox Code Playgroud) 我有一个服务,它从不同的消息队列接收不同的结构化消息。我们可以针对@StreamListener conditions每种消息类型选择如何处理该消息。举个例子:
我们收到两种不同类型的消息,它们具有不同的标头字段和值,例如
\n\n来自“订单”队列的传入:
\n\nOrder1: { Header: {catalog:groceries} }\nOrder2: { Header: {catalog:tools} }\nRun Code Online (Sandbox Code Playgroud)\n\n来自“发货”队列的传入:
\n\nShipment1: { Header: {region:Europe} }\nShipment2: { Header: {region:America} }\nRun Code Online (Sandbox Code Playgroud)\n\n每个队列都有一个绑定,根据该绑定,@StreamListener我可以按目录和区域以不同的方式处理消息
例如
\n\n@StreamListener(target = OrderSink.ORDER_CHANNEL, condition = "headers[\'catalog\'] == \'groceries\'")\npublic void onGroceriesOrder(GroceryOder order){\n...\n}\nRun Code Online (Sandbox Code Playgroud)\n\n那么问题是,如何使用新的 Spring Cloud Function 方法来实现这一点?
\n\n\n\nAlso, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers …
我有一个spring-cloud-streamkafka绑定的应用程序.我想从同一个可执行文件(jar)中发送和接收来自同一主题的消息.我有我的频道定义,如下所示: -
public interface ChannelDefinition {
@Input("forum")
public SubscriableChannel readMessage();
@Output("forum")
public MessageChannel postMessage();
}
我@StreamListener用来接收消息.我得到各种意想不到的错误.有时,我收到
是否有上述用例的工作示例?
spring spring-integration spring-boot spring-cloud-stream spring-cloud-dataflow
我已经使用RabbitMQ的网络的用户界面的话题交换创造TX并绑定到交换两个队列TX.Q1和TX.Q2,每个路由键绑定RK1和RK2因此,和产生的交流几条消息.
现在我想使用Spring Cloud Stream创建一个消费者,它只接收来自Q1的消息.我尝试使用配置:
spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1
Run Code Online (Sandbox Code Playgroud)
以及使用@StreamListner(Sink.INPUT)消息的方法的注释.
结果我可以看到消费者创建了一个具有相同名称TX.Q1的队列(或绑定),但新队列/ bind的Routing-Key是#.
如何通过Spring Cloud Stream配置消费者,消费者将使用来自预定义队列的消息(仅限于使用rk1路由的消息).
对于具有多个分区的主题-
1)一个SpringBoot实例是否使用多个线程来处理(用StreamListener注释的方法)每个分区中的每个消息?
2)是否可以为每个分区配置多个线程,还是我必须将其从侦听器线程手动移交给工作池?
我正在尝试实现事件驱动的体系结构来处理分布式事务.每个服务都有自己的数据库,并使用Kafka发送消息以通知其他微服务有关操作.
一个例子:
Order service -------> | Kafka |------->Payment Service
| |
Orders MariaDB DB Payment MariaDB Database
Run Code Online (Sandbox Code Playgroud)
订单收到订单请求.它必须将新订单存储在其数据库中并发布消息,以便付款服务意识到它必须为该项目收费:
私人订单业务订单业务;
@PostMapping
public Order createOrder(@RequestBody Order order){
logger.debug("createOrder()");
//a.- Save the order in the DB
orderBusiness.createOrder(order);
//b. Publish in the topic so that Payment Service charges for the item.
try{
orderSource.output().send(MessageBuilder.withPayload(order).build());
}catch(Exception e){
logger.error("{}", e);
}
return order;
}
Run Code Online (Sandbox Code Playgroud)
这些是我的疑惑:
apache-kafka microservices spring-cloud spring-cloud-stream spring-kafka
我使用Spring Cloud方法来构建一些应该互相交互的微服务.对于微服务之间的消息传递,我打算使用RabbitMQ和Spring AMQP,但在看了Spring Cloud Stream之后,我感到很失落.在我看来,Spring Cloud Stream是下一个抽象层次(可能太强大,但你应该得到整体印象),它有许多非常有用的功能.所以我想知道为什么有人会使用Spring AMQP进行新的开发?当一个微服务向另一个微服务发送消息并收到回复时,你能否提供Spring Spring Stream的任何Spring AMQP优势?
谢谢.
我需要禁用事件的发布和订阅才能进行开发,但是我无法为此找到一些配置属性/其他解决方案。我怎样才能做到这一点?
可能的解决方案:使用@EnableBinding某些属性集创建自动配置,并在禁用的情况下,用生成的无操作存根替换所有绑定接口。但是也许存在更简单的解决方案?
如果不使用系统级别的默认配置,我无法连接到 Spring Cloud Stream Kinesis Binder (1.2.0.RELEASE) 中的 AWS kinesis。仅当系统已配置为使用默认配置文件并且使用 [default] 配置文件设置访问密钥 ID 和秘密访问密钥时,应用程序才能工作。否则,将无法通过抛出此异常来连接到 AWS 资源:
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@3b2c8bda: Unable to load credentials from service endpoint, com.amazonaws.auth.profile.ProfileCredentialsProvider@688d619c: No AWS profile named 'default']
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:136)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1225)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:801)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:751)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:3768)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3737)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1836)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1804)
at com.amazonaws.services.dynamodbv2.document.Table.describe(Table.java:137)
at org.springframework.integration.aws.metadata.DynamoDbMetadataStore.afterPropertiesSet(DynamoDbMetadataStore.java:145)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774) …Run Code Online (Sandbox Code Playgroud) 我有多个 Spring Boot 应用程序使用 kafka 代理实现 Spring Cloud 流。我想知道是否可以停止或禁用 Spring Cloud 流或 kafka 代理连接以启用应用程序。
spring ×4
spring-boot ×3
apache-kafka ×2
java ×2
spring-cloud ×2
spring-kafka ×2
aws-java-sdk ×1
gradle ×1
kotlin ×1
rabbitmq ×1
spring-amqp ×1