小编Mar*_*nyi的帖子

Kotlin:伴侣对象和顶级对象之间的差异

在Kotlin中创建常量的一般模式似乎是使用伴随对象.但是,我也可以在文件级别定义一个常量.为什么不那么受欢迎?我错过了什么吗?

使用伴侣对象:

class Example {
    companion object {
        const val CONSTANT = "something"
}
Run Code Online (Sandbox Code Playgroud)

在顶层:

const val CONSTANT = "something"

class Example {
}
Run Code Online (Sandbox Code Playgroud)

kotlin

24
推荐指数
3
解决办法
5221
查看次数

使用 AWS 签署 Spring WebClient HTTP 请求

我想AWS对由 Spring 的响应式WebClient触发的 HTTP 请求进行签名。要签署请求,我需要访问以下内容:URL、HTTP 方法、查询参数、标头和请求正文字节。

我从编写ExchangeFilterFunction开始。由于ClientRequest接口,我可以访问我需要的所有内容,除了请求正文:

@Component
public class AwsSigningInterceptor implements ExchangeFilterFunction
{
    private final AwsHeaderSigner awsHeaderSigner;

    public AwsSigningInterceptor(AwsHeaderSigner awsHeaderSigner)
    {
        this.awsHeaderSigner = awsHeaderSigner;
    }

    @Override
    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next)
    {
        Map<String, List<String>> signingHeaders = awsHeaderSigner.createSigningHeaders(request, new byte[]{}, "es", "us-west-2"); // should pass request body bytes in place of new byte[]{}

        ClientRequest.Builder requestBuilder = ClientRequest.from(request);

        signingHeaders.forEach((key, value) -> requestBuilder.header(key, value.toArray(new String[0])));

        return next.exchange(requestBuilder.build());
    }
}
Run Code Online (Sandbox Code Playgroud)

在旧的 Spring 版本中,我们将RestTemplateClientHttpRequestInterceptor …

java spring reactive-programming amazon-web-services spring-webflux

8
推荐指数
1
解决办法
1373
查看次数

Project Reactor 中的背压是如何工作的?

我一直在 Spring Reactor 中工作,并且之前进行了一些测试,这让我想知道 Fluxes 默认情况下如何处理背压。我知道 onBackpressureBuffer 等存在,并且我还读到RxJava 默认为无界,直到您定义是否缓冲、删除等。

那么,谁能为我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么?

我尝试寻找答案,但没有找到任何明确的答案,只有背压的定义或上面链接的 RxJava 答案

spring reactor rx-java project-reactor rx-java2

8
推荐指数
1
解决办法
3279
查看次数


Project Reactor - 如何处理来自 Flux.interval 的溢出异常?

我正在使用 Spring Webflux 构建一个 spring boot 应用程序,我想让应用程序完全非阻塞。应用程序本身有一些 REST 端点和需要每隔几秒运行一次的批处理作业。对于批处理作业,我试图Flux.interval(Duration.ofMillis(1000))生成我忽略的长值并运行我的预定作业。

Flux.interval(Duration.ofMillis(1000))
    .flatMap(ignore -> doSomething())
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

但是一段时间后我收到错误

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)

有人能告诉我如何克服这个问题吗?

java project-reactor spring-webflux

6
推荐指数
1
解决办法
2283
查看次数

使用 Spring Reactor 时为什么要使用断路器和隔板?

请帮助我找到断路器和隔板模式在 Spring Reactor 应用程序中有用的原因。

由于 Reactor 中的操作将是非阻塞的,并且这两种模式旨在节省对资源(主要是线程)的潜在影响,因此在什么情况下我可以使 Spring Reactor 应用程序中的模式受益。我此时看到的唯一一件事是,如果请求量非常大,以至于将它们保留在内存中,那么在等待超时(而不是断路器启动并回落)时,我们会运行 OOM。

circuit-breaker project-reactor resilience4j

5
推荐指数
1
解决办法
1288
查看次数

将 List&lt;Mono&lt;String&gt;&gt; 转换为 Flux&lt;String&gt;

问题很少,但答案非常具体于某些代码。

一般来说,如何将 Mono Stream 转换为 Flux

List<Mono<String> listOfMono = stream()
.map( s -> { do something and return Mono<String> } )
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

如何将listOfMono对象转换为Flux<String>

java spring-boot project-reactor spring-webflux

5
推荐指数
1
解决办法
2063
查看次数

Kotlin 协程是否有 asyncAll 和/或awaitAll 运算符?

我有一个集合,我想在 Kotlin 中对其所有项目异步执行一些操作。

我可以通过两个地图操作轻松完成此操作:

suspend fun collectionAsync() = coroutineScope {

    val list = listOf("one", "two", "three")

    list.map { async { callRemoteService(it) } }.map { it.await() }.forEach { println(it) }
}

suspend fun callRemoteService(input: String): String
{
    delay(1000)
    return "response for $input"
}
Run Code Online (Sandbox Code Playgroud)

我想要的是这样的:

asyncAll(list, ::callRemoteService).awaitAll()
Run Code Online (Sandbox Code Playgroud)

我可能可以用扩展函数来实现它。我只是想知道是否有更惯用的方法来做到这一点。

编辑:我发现awaitAll已经存在。现在,我只需要一个 asyncAll。

list.map { async { callRemoteService(it) } }.awaitAll().forEach { println(it) }
Run Code Online (Sandbox Code Playgroud)

EDIT2:我写了我的 asyncAll 实现:

fun <T, V> CoroutineScope.asyncAll(
    items: Iterable<T>,
    function: suspend (T) -> V
): List<Deferred<V>>
{
    return items.map { …
Run Code Online (Sandbox Code Playgroud)

kotlin kotlin-coroutines

4
推荐指数
1
解决办法
5287
查看次数

.switchIfEmpty() 被急切地评估有什么意义?

即使我的流不为空,回退流也会始终创建?这样做的意图是什么?这是非常不习惯的。

另一方面, 。onErrorResume被懒惰地评估。

有人可以向我解释为什么。switchIsEmpty热切评价?

这是代码:

  public static void main(String[] args) {
    Mono<Integer> m = Mono.just(1);
    m.flatMap(a -> Mono.delay(Duration.ofMillis(5000)).flatMap(p -> Mono.empty()))
        .switchIfEmpty(getFallback())
        .doOnNext(a -> System.out.println(a))
        .block();
  }

  private static Mono<Integer> getFallback() {
    System.out.println("In Here");
    return Mono.just(5);
  }
Run Code Online (Sandbox Code Playgroud)

输出是:

In Here (printed immediately)
5 (after 5s)
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor reactive

3
推荐指数
1
解决办法
2914
查看次数

Reactor 和 Webflux 中的 Mono 链超时算子到底测量了什么?

我正在使用 Spring Webflux 并试图理解 Monos 链的超时概念。

例如,有一系列 Mono 调用:

myService.firstOperation()
.then(myService.secondOperation())
...
.then(myService.nOperation())
.timeout(3000L)
Run Code Online (Sandbox Code Playgroud)

如何应用超时:

1) 对于一般操作(操作总时间)

2)对于ech操作(每个操作的时间不应超过超时时间)

3) 仅针对最后一次操作(nOperation)

我几乎可以肯定超时适用于最后一个发布者。如果是这样,如何将超时应用于操作的总和?

java spring reactive-programming project-reactor spring-webflux

3
推荐指数
1
解决办法
2695
查看次数