小编Col*_*b1a的帖子

如何计算 Flux 中的项目数,如果计数大于 X,则返回错误,否则继续使用 Pipeline

我是 Spring 项目 Reactor 的新手,我不完全确定如何执行某些操作:

我有我的管道管道返回记录。都好。

但我想计算这些记录,然后做一些事情(比如 if else),如果返回的记录 > X 则出错,否则继续。

知道 Count 返回 a Mono<Long>,那么之后我会丢失记录,我该怎么办?

我在想:

以某种方式使用flatMap并执行此平面图内的某些操作。不知何故,我发现 Flux 中有一种reduce方法可能会有所帮助。

关键是,我不知道如何继续。

java spring reactive-programming project-reactor spring-reactor

8
推荐指数
1
解决办法
1万
查看次数

Kafka 甚至在达到段大小之前删除段

我在本地机器上玩Kafka,我添加了以下Topic配置:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 config retention.ms=60000
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 —config file.delete.delay.ms=40000
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --config segment.bytes=400000
Run Code Online (Sandbox Code Playgroud)

我的理解是,当段达到上面定义的段大小 (segment.bytes=400000) 加上段中的每条消息都比上面定义的保留时间 (retention.ms=60000) 旧时,段将被删除。

我注意到的是只有 35 个字节的段,其中只包含一条消息,一分钟后被删除(可能多一点)

我从哪里得到这些信息?来自 Linkedin 工程师关于删除过程如何工作的帖子:

保留将基于保留和段大小设置的组合(作为旁注,建议使用 log.retention.ms 和 log.segment.ms,而不是小时配置。这是出于遗留原因,但 ms 配置更一致)。当 Kafka 收到消息时,它们会被写入每个分区的当前打开的日志段。当达到 log.segment.bytes 或 log.segment.ms 限制时,该段将被轮换。一旦发生这种情况,日志段将关闭并打开一个新段。只有在日志段关闭后才能通过保留设置将其删除。一旦日志段关闭并且该段中的所有消息都早于 log.retention.ms 或总分区大小大于 log.retention.bytes,则该日志段将被清除。

链接:保留的工作原理

apache-kafka

6
推荐指数
2
解决办法
8556
查看次数

Spring:使用循环发送多条消息

假设我在 Spring XD 中有一个自定义模块(我使用 Spring XD + Spring Integration + hibernate)。该模块基本上从数据库获取一些东西(假设我使用休眠实体存储它,所以我使用一个名为“DataFromDB”的对象)。DataFromDB 是一个列表,然后我从列表中获取每个元素,然后我想使用如下方式发送它:

String payload = convertDataFromDBToJson(record);
return MessageBuilder.createMessage(payload, message.getHeaders());
Run Code Online (Sandbox Code Playgroud)
  • 问题是每次我必须发送消息时我都必须返回消息。
  • 所以我想循环 DataFromDB 列表并将每个元素作为消息发送。

有没有办法发送多条消息?

编辑:

我只是根据尝试复制该场景的评论创建了一个小示例。这就是我所拥有的:

我的变压器类:

public class TransformerClass {
public Collection<Message<?>> transformerMethod(Message<?> message) {
    List<Message<?>> messages = new ArrayList<Message<?>>();
    messages.add(new GenericMessage<>("foo"));
    messages.add(new GenericMessage<>("bar"));
    return messages;
}
Run Code Online (Sandbox Code Playgroud)

我的xml配置:

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx.xsd">

    <tx:annotation-driven />
    <int:channel id="input" />

    <bean id="transFormerClass" class="myModule.TransformerClass">
    </bean>

    <int:transformer input-channel="input" output-channel="output" ref="transFormerClass" method="transformerMethod"/>

    <int:channel id="output"/>

</beans> …
Run Code Online (Sandbox Code Playgroud)

java spring spring-integration spring-xd spring-messaging

1
推荐指数
1
解决办法
3471
查看次数

实现ClientHttpRequestInterceptor时修改Response Body

我面临以下问题:

我正在使用一个简单的方法调用另一个服务org.springframework.web.client.RestTemplate

调用它时,我需要拦截请求,修改请求正文,并让它继续流程。到目前为止,我没有遇到任何问题,因为org.springframework.http.client.ClientHttpRequestInterceptor我可以对我的请求执行任何我想要的操作(在将其发送到我正在调用的服务之前将 RequestObjectA 转换为 requestObjectB)。

问题:如何修改响应体?

我看到调用时ClientHttpResponse execute = clientHttpRequestExecution.execute(httpRequest, body)我可以让身体做execute.getBody(),所以我可以修改它,但我没有找到一种方法以某种方式将其设置回来。

有什么办法可以将我修改后的身体设置为ClientHttpResponse

java spring-mvc resttemplate spring-resttemplate

1
推荐指数
1
解决办法
4423
查看次数