小编lka*_*ris的帖子

r2dbc - 这可能是一个导致不可预测结果的错误

我们在生产中使用spring-data-r2dbc:1.3.2dev.miku:r2dbc-mysql:0.8.2.RELEASE遇到了一个奇怪的问题。

我们不明白根本原因是什么,也不知道它是否是可恢复的,或者是否真的会产生不可预测的结果(如日志所示)。

我们定期看到错误日志如下:

  • 记录器:dev.miku.r2dbc.mysql.client.ReactorNettyClient
  • 级别:错误
  • 消息:交换已取消,交换处于活动状态。这可能是一个导致不可预测结果的错误。

我们每天都会看到它几次,这让我们感到紧张,因为它明确指出:这可能是一个导致不可预测结果的错误

我们在日志中没有得到任何附加信息。

几个问题:

  1. 这真的很危险吗?还是我们可以将其视为警告?
  2. 我们如何调试它以了解其原因?
  3. 从源代码中我们可以看到,它假设交换请求如果未完成则无法取消,并认为这种情况是一个BUG。是否有意义?如果反应流由于“连接丢失”或任何其他原因被取消怎么办?

java project-reactor spring-webflux r2dbc r2dbc-mysql

15
推荐指数
0
解决办法
1268
查看次数

是否应该使用实体类作为请求主体

假设我必须保存一个实体,在本例中为 Book。我有下一个代码:

@RestController
@RequestMapping("books")
public class BookController {
    
    @Inject
    BookRepository bookRepository;
    
    @PostMapping
    public Book saveBook(@RequestBody Book book) {
        return bookRepository.save(book);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的实体 Book 是一个持久性实体:

@Entity(name = "BOOK")
public class Book{

    @Id
    @Column(name = "book_id")
    private Integer id;

    @Column(name = "title")
    private String title;

    (get/sets...)
}
Run Code Online (Sandbox Code Playgroud)

问题是:在@RequestBody控制器层使用我的持久性实体是一种不好的做法吗?或者我应该创建一本书 DTO 并将其映射到服务层中的持久性类?什么更好,为什么?

java model-view-controller entity dto spring-boot

8
推荐指数
2
解决办法
4508
查看次数

WebFlux 控制器中的 Mono 或 Flux 请求主体

获取输入常规 java 有效负载的控制器与反应式有效负载的控制器之间有什么区别?例如,假设我有以下 2 个端点:

@RestController
public class MyController {
@PostMapping
public Flux<SomeObject> doThing(@RequestBody MyPayload playlod) {
// do things that return flux - reactive all the way from this controller
Run Code Online (Sandbox Code Playgroud)

和这个:

@RestController
public class MyController {
@PostMapping
public Flux<SomeObject> doThing(@RequestBody Mono<MyPayload> playlod) {
   
Run Code Online (Sandbox Code Playgroud)

从反应性的角度来看,我不明白这两种方法之间的区别。

reactive spring-webflux

8
推荐指数
1
解决办法
3590
查看次数

LeveledCompactionStrategy :调整 sstable_size_in_mb 有何影响?

为了提高读取性能,我尝试使用LCS来减少底层 SSTable 的数量,因此我按照一些文章的建议将sstable_size_in_mb设置为1280MB,这些文章指出160MB默认值是 Cassandra 核心团队很久以前就挑选出来的,相当不错。旧服务器现在只有 2GB RAM。但是,我担心的是sstable_size_in_mb具有较高值的​​影响

我的理解是LCS定期将L0中的所有SSTable与L1中的所有SSTable压缩在一起,然后替换L1的全部内容。因此,每次更换 L1 时,随着sstable_size_in_mb的值增大,对 CPU/RAM 和写入放大的硬件要求可能会更高。事实上,如果sstable_size_in_mb = 1280MB,那么 L1 中的 10 个 1280MB 表每次都必须与所有 L0 表合并。即使要替换的 SSTable 看起来较低(一个 L1 SSTable 与 10 个 L2 SSTable 合并,然后这 10 个 L2 SSTable 被替换),也许还会对更高级别产生影响。

问题 :

  1. 具有较高的sstable_size_in_mb值可以通过减少 CQL 表中涉及的 SSTable 数量来提高读取性能。但是, sstable_size_in_mb具有如此高的值(例如 1280MB)还有什么其他含义?

  2. 如果值较高,是否有任何相应的配置需要调整(垃圾收集器、块缓存等),以便为那些较大的 SSTable 的压缩提供更好的性能,并减少 GC 活动?

  3. 更主观的问题,您在部署中使用的sstable_size_in_mb的典型值是多少?

cassandra datastax-enterprise datastax cassandra-3.0

7
推荐指数
1
解决办法
442
查看次数

为什么会多次调用DB

我正在使用 Postgre SQL 来玩 R2DBC。我正在尝试的用例是通过 ID 以及语言、演员和类别获取电影。下面是架构

在此输入图像描述

这是ServiceImpl中对应的一段代码

@Override
public Mono<FilmModel> getById(Long id) { 
    Mono<Film> filmMono = filmRepository.findById(id).switchIfEmpty(Mono.error(DataFormatException::new)).subscribeOn(Schedulers.boundedElastic());
    Flux<Actor> actorFlux = filmMono.flatMapMany(this::getByActorId).subscribeOn(Schedulers.boundedElastic());
    Mono<String> language = filmMono.flatMap(film -> languageRepository.findById(film.getLanguageId())).map(Language::getName).subscribeOn(Schedulers.boundedElastic());
    Mono<String> category = filmMono.flatMap(film -> filmCategoryRepository
                    .findFirstByFilmId(film.getFilmId()))
            .flatMap(filmCategory -> categoryRepository.findById(filmCategory.getCategoryId()))
            .map(Category::getName).subscribeOn(Schedulers.boundedElastic());

    return Mono.zip(filmMono, actorFlux.collectList(), language, category)
            .map(tuple -> {
                FilmModel filmModel = GenericMapper.INSTANCE.filmToFilmModel(tuple.getT1());
                List<ActorModel> actors = tuple
                        .getT2()
                        .stream()
                        .map(act -> GenericMapper.INSTANCE.actorToActorModel(act))
                        .collect(Collectors.toList());
                filmModel.setActorModelList(actors);
                filmModel.setLanguage(tuple.getT3());
                filmModel.setCategory(tuple.getT4());
                return filmModel;
            });
         }
Run Code Online (Sandbox Code Playgroud)

日志显示 4 次调用拍摄

2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient     : Executing …
Run Code Online (Sandbox Code Playgroud)

project-reactor spring-webflux spring-data-r2dbc r2dbc r2dbc-postgresql

6
推荐指数
1
解决办法
1081
查看次数

Cassandra 中的动态架构更改

我有很多用户(150-2亿)。每个用户有 N(30-100) 个属性。该属性可以是整数、文本或时间戳类型。属性是未知的,所以我想动态地、即时地添加它们。

解决方案 1 - 通过更改表添加新列

   CREATE TABLE USER_PROFILE(
    UID uuid PRIMARY KEY,
    LAST_UPDATE_DATE TIMESTAMP,
    CREATION_DATE TIMESTAMP
   );
Run Code Online (Sandbox Code Playgroud)

对于每个新属性:

    ALTER TABLE USER_PROFILE ADD AGE INT;
    INSERT INTO USER_PROFILE ( UID, LAST_UPDATE_DATE, CREATION_DATE, AGE) VALUES ('01f63e8b-db53-44ef-924e-7a3ccfaeec28', 2021-01-12 07:34:19.121, 2021-01-12 07:34:19.121, 27);
Run Code Online (Sandbox Code Playgroud)

解决方案 2 - 固定架构:

CREATE TABLE USER_PROFILE(
    UID uuid,
    ATTRIBUTE_NAME TEXT,
    ATTRIBUTE_VALUE_TEXT TEXT,
    ATTRIBUTE_VALUE_TIMESTAMP TIMESTAMP,
    ATTRIBUTE_VALUE_INT INT,
    LAST_UPDATE_DATE TIMESTAMP,
    CREATION_DATE TIMESTAMP,
    PRIMARY KEY (UID, ATTRIBUTE_NAME)
);
Run Code Online (Sandbox Code Playgroud)

对于每个新属性:

INSERT INTO USER_PROFILE ( UID, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_INT, LAST_UPDATE_DATE, CREATION_DATE) VALUES ('01f63e8b-db53-44ef-924e-7a3ccfaeec28', 'age', …
Run Code Online (Sandbox Code Playgroud)

database database-design cassandra nosql database-schema

4
推荐指数
1
解决办法
1583
查看次数

重试 Reactor 中的特定异常

我有微服务应用程序。为了进行协作,每个服务都使用异步消息传递。我知道,spring data jpa默认使用乐观锁。但是,如果这种锁定方法不是由用户调用,而是由另一个服务调用(在我的示例中,有验证服务,可以将对象的状态更新为有效或无效),我想处理异常并重试更新对象。我还必须在这个微服务上使用 webflux 堆栈。现在我有这样的代码:

public void updateStatus(String id, EventStatus status) {
    eventRepository.findById(id)
            .doOnNext(eventDocument -> {
                eventDocument.setStatus(status);
                eventRepository.save(eventDocument).subscribe();
            }).doOnError(OptimisticLockingFailureException.class, exception -> { //Retry in 2 sec if optimistic lock occurs on update
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
                updateStatus(id, status);
            })
            .subscribe();
}
Run Code Online (Sandbox Code Playgroud)

我不喜欢这里的递归。对此有更好的解决方案吗?

java spring spring-data-jpa project-reactor spring-webflux

3
推荐指数
1
解决办法
2081
查看次数

如何在 Spring 中有效地将排序后的通量分组为序列化组?

假设我有一个Flux包含许多(数十亿个字符串)的输入,如下所示:

  • 苹果
  • 应用
  • 圣经

这样的字符串有数十亿个,它们无法放入内存中,这就是我想使用反应式方法的原因。

流已排序。现在我想要的是通过前 3 个字符创建一系列有序字符串组:

  • 应用程序:苹果,应用程序
  • 围兜:圣经
  • 嘘:书

Flux最终会出现 HTTP 响应,这意味着所有“app”项目必须在“bib”项目开始之前输出。

如果不使用,Flux我可以使用有序属性并将项目收集到准备好的存储桶中(每个存储桶的字符串数量将适合内存) - 每当前缀发生变化时,我将刷新存储桶并开始收集新的前缀。有序流的一大优点是我知道一旦遇到新的前缀,旧的就不会再出现了。

但使用Flux我不知道如何做到这一点。将会.groupBy()返回FluxFlux但我认为在尝试将其序列化到 HTTP 响应输出流时这不会起作用。

spring flux project-reactor spring-webflux

1
推荐指数
1
解决办法
489
查看次数

Spring WebFlux - 如何使用 WebClient 将响应打印为字符串而不是对象

我有一个如下所示的 Mono:

    private void getNodeDetail() {
        Mono<String> mono = webClient.get()
                .uri("/alfresco/api/-default-/public/alfresco/versions/1/nodes/f37b52a8-de40-414b-b64d-a958137e89e2")
                .retrieve().bodyToMono(String.class);

        System.out.println(mono.subscribe());
        System.out.println(mono.block());
    }
Run Code Online (Sandbox Code Playgroud)

问题:第一个 sysout 向我展示了reactor.core.publisher.LambdaSubscriber@77114efe使用 block() 时的情况,它向我展示了我需要的内容(json 字符串)。但我想使用 Aysnc 方法。那么,如上所述,这是否意味着我的目标系统(在本例中为 Alfresco)不支持异步调用?subscribe()如果不是这种情况,如何使用,就像 一样以字符串格式在控制台上打印响应block()

spring alfresco spring-boot project-reactor spring-webflux

0
推荐指数
1
解决办法
2897
查看次数