我有一个spring-cloud-streamkafka绑定的应用程序.我想从同一个可执行文件(jar)中发送和接收来自同一主题的消息.我有我的频道定义,如下所示: -
public interface ChannelDefinition {
@Input("forum")
public SubscriableChannel readMessage();
@Output("forum")
public MessageChannel postMessage();
}
我@StreamListener用来接收消息.我得到各种意想不到的错误.有时,我收到
是否有上述用例的工作示例?
spring spring-integration spring-boot spring-cloud-stream spring-cloud-dataflow
对于具有多个分区的主题-
1)一个SpringBoot实例是否使用多个线程来处理(用StreamListener注释的方法)每个分区中的每个消息?
2)是否可以为每个分区配置多个线程,还是我必须将其从侦听器线程手动移交给工作池?
我正在尝试实现事件驱动的体系结构来处理分布式事务.每个服务都有自己的数据库,并使用Kafka发送消息以通知其他微服务有关操作.
一个例子:
Order service -------> | Kafka |------->Payment Service
| |
Orders MariaDB DB Payment MariaDB Database
Run Code Online (Sandbox Code Playgroud)
订单收到订单请求.它必须将新订单存储在其数据库中并发布消息,以便付款服务意识到它必须为该项目收费:
私人订单业务订单业务;
@PostMapping
public Order createOrder(@RequestBody Order order){
logger.debug("createOrder()");
//a.- Save the order in the DB
orderBusiness.createOrder(order);
//b. Publish in the topic so that Payment Service charges for the item.
try{
orderSource.output().send(MessageBuilder.withPayload(order).build());
}catch(Exception e){
logger.error("{}", e);
}
return order;
}
Run Code Online (Sandbox Code Playgroud)
这些是我的疑惑:
apache-kafka microservices spring-cloud spring-cloud-stream spring-kafka
我需要禁用事件的发布和订阅才能进行开发,但是我无法为此找到一些配置属性/其他解决方案。我怎样才能做到这一点?
可能的解决方案:使用@EnableBinding某些属性集创建自动配置,并在禁用的情况下,用生成的无操作存根替换所有绑定接口。但是也许存在更简单的解决方案?
我正在构建一个微服务组件,它将默认使用由其他 (SCS) 组件生成的 Spring Cloud Stream (SCS) Kafka 消息。
但是我还需要从使用融合 API 的其他组件中使用 Kafka 消息。
我有一个示例存储库,显示了我正在尝试做什么。
https://github.com/donalthurley/KafkaConsumeScsAndConfluent
这是下面带有 SCS 输入绑定和融合输入绑定的应用程序配置。
spring:
application:
name: kafka
kafka:
consumer:
properties.schema.registry.url: http://192.168.99.100:8081
cloud:
stream:
kafka:
binder:
brokers: PLAINTEXT://192.168.99.100:9092
# configuration:
# specific:
# avro:
# reader: true
# key:
# deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value:
# deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
inputConfluent:
contentType: application/*+avro
destination: confluent-destination
group: input-confluent-group
inputScs:
contentType: application/*+avro
destination: scs-destination
group: input-scs-group
Run Code Online (Sandbox Code Playgroud)
通过上述配置,我使用 SCS 默认配置创建了两个使用者。例如,类 org.apache.kafka.common.serialization.ByteArrayDeserializer 是两个输入绑定的值反序列化器。
如果我删除上述配置中的注释,我会从我的 Confluent 客户端发送配置的两个消费者。例如,类 io.confluent.kafka.serializers.KafkaAvroDeserializer 是两个输入绑定的值反序列化器。
我理解因为配置在 Kafka binder …
我无法http | log按照入门流指南部署和执行基本流
:
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/2.1.0.RELEASE/spring-cloud-dataflow-server/docker-compose.yml;
export DATAFLOW_VERSION=2.1.0.RELEASE;
export SKIPPER_VERSION=2.0.2.RELEASE;
docker-compose up;
open http://localhost:9393/dashboard/#/streams/create;
echo "Then, create a stream via text input: `http | log`"
echo "Then, deploy the stream. The deployment fails with exit code 137."
Run Code Online (Sandbox Code Playgroud)
我得到绿色的“成功部署的流定义”。状态显示为“正在部署”,但从未完全部署。ERROR日志、UI 或网络请求中没有消息。
这是docker-compose up我部署流后的控制台输出。
skipper | 2019-05-24 22:31:53.363 INFO 1 --- [io-7577-exec-10] o.s.s.support.LifecycleObjectSupport : started UPGRADE UPGRADE_DEPLOY_TARGET_APPS_SUCCEED UPGRADE_DEPLOY_TARGET_APPS UPGRADE_START UPGRADE_DELETE_SOURCE_APPS UPGRADE_CHECK_TARGET_APPS UPGRADE_WAIT_TARGET_APPS UPGRADE_CANCEL UPGRADE_DEPLOY_TARGET_APPS_FAILED UPGRADE_CHECK_CHOICE UPGRADE_EXIT INSTALL INSTALL_INSTALL INSTALL_EXIT ERROR DELETE DELETE_DELETE DELETE_EXIT ROLLBACK ROLLBACK_START …Run Code Online (Sandbox Code Playgroud) 实现发件箱模式的通常方法是将消息有效负载存储在发件箱表中,并有一个单独的进程(消息中继)查询待处理的消息,并将它们发布到消息代理中,在我的例子中是 Kafka。
发件箱表的状态可能如下所示。
OUTBOX TABLE
---------------------------------
|ID | STATE | TOPIC | PAYLOAD |
---------------------------------
| 1 | PROCESSED | user |
| 2 | PENDING | user |
| 3 | PENDING | billing |
----------------------------------
Run Code Online (Sandbox Code Playgroud)
My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它定期 ( @Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。
第一个问题是:如果我启动 Message Relay 的多个实例,所有实例都会查询 Outbox 表,并且可能在某些时候不同的实例将获得相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我怎样才能防止这种情况?
另一种情况:假设只有一个消息中继。它获取一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布它。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。
背景
{
"markdownPercentage": 20,
"currency": "SEK",
"startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
{
"markdownPercentage": 20,
"currency": "SEK",
"startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
问题
当应用程序尝试将消息反序列化到对象时,它会引发以下异常
public class Markdown {
@JsonProperty("markdownPercentage")
@NotNull
private Integer markdownPercentage = 0;
@JsonProperty("currency")
@NotNull
private String currency = "";
@JsonFormat(
shape = Shape.STRING,
pattern = "yyyy-MM-dd"
)
@JsonProperty("startDate")
@NotNull
private ZonedDateTime startDate;
// Constructors, Getters, Setters etc.
} …Run Code Online (Sandbox Code Playgroud) 我有一个使用 Spring Integration DSL 流和 Kafka 绑定器的简单 Spring Cloud Stream 项目。一切正常,但来自 Kafka 的消息头值以byte[].
这意味着我的 SI@Header参数需要是 类型byte[]。哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值)。
我已经将 Kafka 客户端配置为使用 StringSerializer/StringDeserializer。我假设我还需要以某种方式告诉 Spring Kafka 哪些标头映射为字符串以及使用什么字符编码。
我显然在这里遗漏了一些东西。有小费吗?
spring spring-integration spring-cloud-stream spring-kafka spring-integration-dsl
我们有多个应用程序消费者监听同一个 kafka 主题,并且生产者在向主题发送消息时设置消息标头,以便特定实例可以评估标头并处理消息。例如
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
log.info("sydney order request received message is {}",message.getName());
}
Run Code Online (Sandbox Code Playgroud)
在 Spring Cloud Stream 3.0.0 中,@StreamListener 已被弃用,我无法在 Function 中找到条件属性的等效项。
有什么建议吗?
java spring-boot spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka
spring ×4
spring-boot ×4
apache-kafka ×3
java ×3
spring-cloud ×2
spring-kafka ×2
jackson ×1
json ×1