我希望写 Reactive Kafa 来收听新的消息。但我不知道该怎么做。就像阻塞 Kafka 中的@KafkaListener - 它正在等待新消息
Spring Boot Webflux 和 Reactor Kafka 上的代码演示:
public class KafkaConsumer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "Kafka_Example";
private final ReceiverOptions<String, String> receiverOptions;
public KafkaConsumer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
receiverOptions = ReceiverOptions.create(props);
}
public Disposable consumeMessages(String topic) {
ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> System.out.println("onPartitionsAssigned " + partitions)) …
Run Code Online (Sandbox Code Playgroud) 你能帮我解决下面的问题吗
我有一个实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
@Id
private Long id;
private String name;
}
Run Code Online (Sandbox Code Playgroud)
我将其保存在 Postgresql 中,代码如下:
public Mono<User> save(User user){
return databaseClient().inTransaction(db -> {
return db
.insert()
.into(User.class)
.using(user)
//todo: return saved user
});
}
Run Code Online (Sandbox Code Playgroud)
我希望获得保存的用户,我该怎么做
我有一个像这样的超级实体类:
@Getter
@Setter
@NoArgsConstructor
public class GenericEntity {
@Id
private Long id;
@JsonIgnore
@CreatedBy
private Long createdBy;
@JsonIgnore
@CreatedDate
private Long createdDate;
@JsonIgnore
@LastModifiedBy
private Long updatedBy;
@JsonIgnore
@LastModifiedDate
private Long updatedDate;
@JsonIgnore
@Version
private Integer version = 0;
}
Run Code Online (Sandbox Code Playgroud)
Role 类从 GenericEntity 扩展,如下所示:
@Getter
@Setter
@NoArgsConstructor
public class Role extends GenericEntity {
private String name;
private String desc;
private Integer sort;
}
Run Code Online (Sandbox Code Playgroud)
之后我有像这样的 RoleRepo 接口:
@Repository
public interface RoleRepo extends ReactiveCrudRepository<Role, Long>;
Run Code Online (Sandbox Code Playgroud)
在 Router 函数中,我有 2 个处理程序方法
private Mono<ServerResponse> …
Run Code Online (Sandbox Code Playgroud)