我是 Spring 项目 Reactor 的新手,我不完全确定如何执行某些操作:
我有我的管道管道返回记录。都好。
但我想计算这些记录,然后做一些事情(比如 if else),如果返回的记录 > X 则出错,否则继续。
知道 Count 返回 a Mono<Long>,那么之后我会丢失记录,我该怎么办?
我在想:
以某种方式使用flatMap并执行此平面图内的某些操作。不知何故,我发现 Flux 中有一种reduce方法可能会有所帮助。
关键是,我不知道如何继续。
java spring reactive-programming project-reactor spring-reactor
我在本地机器上玩Kafka,我添加了以下Topic配置:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 config retention.ms=60000
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 —config file.delete.delay.ms=40000
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --config segment.bytes=400000
Run Code Online (Sandbox Code Playgroud)
我的理解是,当段达到上面定义的段大小 (segment.bytes=400000) 加上段中的每条消息都比上面定义的保留时间 (retention.ms=60000) 旧时,段将被删除。
我注意到的是只有 35 个字节的段,其中只包含一条消息,一分钟后被删除(可能多一点)
我从哪里得到这些信息?来自 Linkedin 工程师关于删除过程如何工作的帖子:
保留将基于保留和段大小设置的组合(作为旁注,建议使用 log.retention.ms 和 log.segment.ms,而不是小时配置。这是出于遗留原因,但 ms 配置更一致)。当 Kafka 收到消息时,它们会被写入每个分区的当前打开的日志段。当达到 log.segment.bytes 或 log.segment.ms 限制时,该段将被轮换。一旦发生这种情况,日志段将关闭并打开一个新段。只有在日志段关闭后才能通过保留设置将其删除。一旦日志段关闭并且该段中的所有消息都早于 log.retention.ms 或总分区大小大于 log.retention.bytes,则该日志段将被清除。
链接:保留的工作原理
假设我在 Spring XD 中有一个自定义模块(我使用 Spring XD + Spring Integration + hibernate)。该模块基本上从数据库获取一些东西(假设我使用休眠实体存储它,所以我使用一个名为“DataFromDB”的对象)。DataFromDB 是一个列表,然后我从列表中获取每个元素,然后我想使用如下方式发送它:
String payload = convertDataFromDBToJson(record);
return MessageBuilder.createMessage(payload, message.getHeaders());
Run Code Online (Sandbox Code Playgroud)
有没有办法发送多条消息?
编辑:
我只是根据尝试复制该场景的评论创建了一个小示例。这就是我所拥有的:
我的变压器类:
public class TransformerClass {
public Collection<Message<?>> transformerMethod(Message<?> message) {
List<Message<?>> messages = new ArrayList<Message<?>>();
messages.add(new GenericMessage<>("foo"));
messages.add(new GenericMessage<>("bar"));
return messages;
}
Run Code Online (Sandbox Code Playgroud)
我的xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<tx:annotation-driven />
<int:channel id="input" />
<bean id="transFormerClass" class="myModule.TransformerClass">
</bean>
<int:transformer input-channel="input" output-channel="output" ref="transFormerClass" method="transformerMethod"/>
<int:channel id="output"/>
</beans> …Run Code Online (Sandbox Code Playgroud) 我面临以下问题:
我正在使用一个简单的方法调用另一个服务org.springframework.web.client.RestTemplate
调用它时,我需要拦截请求,修改请求正文,并让它继续流程。到目前为止,我没有遇到任何问题,因为org.springframework.http.client.ClientHttpRequestInterceptor我可以对我的请求执行任何我想要的操作(在将其发送到我正在调用的服务之前将 RequestObjectA 转换为 requestObjectB)。
问题:如何修改响应体?
我看到调用时ClientHttpResponse execute = clientHttpRequestExecution.execute(httpRequest, body)我可以让身体做execute.getBody(),所以我可以修改它,但我没有找到一种方法以某种方式将其设置回来。
有什么办法可以将我修改后的身体设置为ClientHttpResponse?