我们在生产中使用spring-data-r2dbc:1.3.2并dev.miku:r2dbc-mysql:0.8.2.RELEASE遇到了一个奇怪的问题。
我们不明白根本原因是什么,也不知道它是否是可恢复的,或者是否真的会产生不可预测的结果(如日志所示)。
我们定期看到错误日志如下:
我们每天都会看到它几次,这让我们感到紧张,因为它明确指出:这可能是一个导致不可预测结果的错误
我们在日志中没有得到任何附加信息。
几个问题:
假设我必须保存一个实体,在本例中为 Book。我有下一个代码:
@RestController
@RequestMapping("books")
public class BookController {
@Inject
BookRepository bookRepository;
@PostMapping
public Book saveBook(@RequestBody Book book) {
return bookRepository.save(book);
}
}
Run Code Online (Sandbox Code Playgroud)
我的实体 Book 是一个持久性实体:
@Entity(name = "BOOK")
public class Book{
@Id
@Column(name = "book_id")
private Integer id;
@Column(name = "title")
private String title;
(get/sets...)
}
Run Code Online (Sandbox Code Playgroud)
问题是:在@RequestBody控制器层使用我的持久性实体是一种不好的做法吗?或者我应该创建一本书 DTO 并将其映射到服务层中的持久性类?什么更好,为什么?
获取输入常规 java 有效负载的控制器与反应式有效负载的控制器之间有什么区别?例如,假设我有以下 2 个端点:
@RestController
public class MyController {
@PostMapping
public Flux<SomeObject> doThing(@RequestBody MyPayload playlod) {
// do things that return flux - reactive all the way from this controller
Run Code Online (Sandbox Code Playgroud)
和这个:
@RestController
public class MyController {
@PostMapping
public Flux<SomeObject> doThing(@RequestBody Mono<MyPayload> playlod) {
Run Code Online (Sandbox Code Playgroud)
从反应性的角度来看,我不明白这两种方法之间的区别。
为了提高读取性能,我尝试使用LCS来减少底层 SSTable 的数量,因此我按照一些文章的建议将sstable_size_in_mb设置为1280MB,这些文章指出160MB默认值是 Cassandra 核心团队很久以前就挑选出来的,相当不错。旧服务器现在只有 2GB RAM。但是,我担心的是sstable_size_in_mb具有较高值的影响 。
我的理解是LCS定期将L0中的所有SSTable与L1中的所有SSTable压缩在一起,然后替换L1的全部内容。因此,每次更换 L1 时,随着sstable_size_in_mb的值增大,对 CPU/RAM 和写入放大的硬件要求可能会更高。事实上,如果sstable_size_in_mb = 1280MB,那么 L1 中的 10 个 1280MB 表每次都必须与所有 L0 表合并。即使要替换的 SSTable 看起来较低(一个 L1 SSTable 与 10 个 L2 SSTable 合并,然后这 10 个 L2 SSTable 被替换),也许还会对更高级别产生影响。
问题 :
具有较高的sstable_size_in_mb值可以通过减少 CQL 表中涉及的 SSTable 数量来提高读取性能。但是, sstable_size_in_mb具有如此高的值(例如 1280MB)还有什么其他含义?
如果值较高,是否有任何相应的配置需要调整(垃圾收集器、块缓存等),以便为那些较大的 SSTable 的压缩提供更好的性能,并减少 GC 活动?
更主观的问题,您在部署中使用的sstable_size_in_mb的典型值是多少?
我正在使用 Postgre SQL 来玩 R2DBC。我正在尝试的用例是通过 ID 以及语言、演员和类别获取电影。下面是架构
这是ServiceImpl中对应的一段代码
@Override
public Mono<FilmModel> getById(Long id) {
Mono<Film> filmMono = filmRepository.findById(id).switchIfEmpty(Mono.error(DataFormatException::new)).subscribeOn(Schedulers.boundedElastic());
Flux<Actor> actorFlux = filmMono.flatMapMany(this::getByActorId).subscribeOn(Schedulers.boundedElastic());
Mono<String> language = filmMono.flatMap(film -> languageRepository.findById(film.getLanguageId())).map(Language::getName).subscribeOn(Schedulers.boundedElastic());
Mono<String> category = filmMono.flatMap(film -> filmCategoryRepository
.findFirstByFilmId(film.getFilmId()))
.flatMap(filmCategory -> categoryRepository.findById(filmCategory.getCategoryId()))
.map(Category::getName).subscribeOn(Schedulers.boundedElastic());
return Mono.zip(filmMono, actorFlux.collectList(), language, category)
.map(tuple -> {
FilmModel filmModel = GenericMapper.INSTANCE.filmToFilmModel(tuple.getT1());
List<ActorModel> actors = tuple
.getT2()
.stream()
.map(act -> GenericMapper.INSTANCE.actorToActorModel(act))
.collect(Collectors.toList());
filmModel.setActorModelList(actors);
filmModel.setLanguage(tuple.getT3());
filmModel.setCategory(tuple.getT4());
return filmModel;
});
}
Run Code Online (Sandbox Code Playgroud)
日志显示 4 次调用拍摄
2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient : Executing …Run Code Online (Sandbox Code Playgroud) project-reactor spring-webflux spring-data-r2dbc r2dbc r2dbc-postgresql
我有很多用户(150-2亿)。每个用户有 N(30-100) 个属性。该属性可以是整数、文本或时间戳类型。属性是未知的,所以我想动态地、即时地添加它们。
解决方案 1 - 通过更改表添加新列
CREATE TABLE USER_PROFILE(
UID uuid PRIMARY KEY,
LAST_UPDATE_DATE TIMESTAMP,
CREATION_DATE TIMESTAMP
);
Run Code Online (Sandbox Code Playgroud)
对于每个新属性:
ALTER TABLE USER_PROFILE ADD AGE INT;
INSERT INTO USER_PROFILE ( UID, LAST_UPDATE_DATE, CREATION_DATE, AGE) VALUES ('01f63e8b-db53-44ef-924e-7a3ccfaeec28', 2021-01-12 07:34:19.121, 2021-01-12 07:34:19.121, 27);
Run Code Online (Sandbox Code Playgroud)
解决方案 2 - 固定架构:
CREATE TABLE USER_PROFILE(
UID uuid,
ATTRIBUTE_NAME TEXT,
ATTRIBUTE_VALUE_TEXT TEXT,
ATTRIBUTE_VALUE_TIMESTAMP TIMESTAMP,
ATTRIBUTE_VALUE_INT INT,
LAST_UPDATE_DATE TIMESTAMP,
CREATION_DATE TIMESTAMP,
PRIMARY KEY (UID, ATTRIBUTE_NAME)
);
Run Code Online (Sandbox Code Playgroud)
对于每个新属性:
INSERT INTO USER_PROFILE ( UID, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_INT, LAST_UPDATE_DATE, CREATION_DATE) VALUES ('01f63e8b-db53-44ef-924e-7a3ccfaeec28', 'age', …Run Code Online (Sandbox Code Playgroud) 我有微服务应用程序。为了进行协作,每个服务都使用异步消息传递。我知道,spring data jpa默认使用乐观锁。但是,如果这种锁定方法不是由用户调用,而是由另一个服务调用(在我的示例中,有验证服务,可以将对象的状态更新为有效或无效),我想处理异常并重试更新对象。我还必须在这个微服务上使用 webflux 堆栈。现在我有这样的代码:
public void updateStatus(String id, EventStatus status) {
eventRepository.findById(id)
.doOnNext(eventDocument -> {
eventDocument.setStatus(status);
eventRepository.save(eventDocument).subscribe();
}).doOnError(OptimisticLockingFailureException.class, exception -> { //Retry in 2 sec if optimistic lock occurs on update
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
updateStatus(id, status);
})
.subscribe();
}
Run Code Online (Sandbox Code Playgroud)
我不喜欢这里的递归。对此有更好的解决方案吗?
假设我有一个Flux包含许多(数十亿个字符串)的输入,如下所示:
这样的字符串有数十亿个,它们无法放入内存中,这就是我想使用反应式方法的原因。
流已排序。现在我想要的是通过前 3 个字符创建一系列有序字符串组:
这Flux最终会出现 HTTP 响应,这意味着所有“app”项目必须在“bib”项目开始之前输出。
如果不使用,Flux我可以使用有序属性并将项目收集到准备好的存储桶中(每个存储桶的字符串数量将适合内存) - 每当前缀发生变化时,我将刷新存储桶并开始收集新的前缀。有序流的一大优点是我知道一旦遇到新的前缀,旧的就不会再出现了。
但使用Flux我不知道如何做到这一点。将会.groupBy()返回Flux,Flux但我认为在尝试将其序列化到 HTTP 响应输出流时这不会起作用。
我有一个如下所示的 Mono:
private void getNodeDetail() {
Mono<String> mono = webClient.get()
.uri("/alfresco/api/-default-/public/alfresco/versions/1/nodes/f37b52a8-de40-414b-b64d-a958137e89e2")
.retrieve().bodyToMono(String.class);
System.out.println(mono.subscribe());
System.out.println(mono.block());
}
Run Code Online (Sandbox Code Playgroud)
问题:第一个 sysout 向我展示了reactor.core.publisher.LambdaSubscriber@77114efe使用 block() 时的情况,它向我展示了我需要的内容(json 字符串)。但我想使用 Aysnc 方法。那么,如上所述,这是否意味着我的目标系统(在本例中为 Alfresco)不支持异步调用?subscribe()如果不是这种情况,如何使用,就像 一样以字符串格式在控制台上打印响应block()?