小编Ale*_*zel的帖子

科特林。我可以获得实现特定接口的所有对象吗

我有类似的东西:

interface Options {
    fun load(conf: JsonObject)
}

object BasicOptions : Options { }
object PersonOptions : Options { }
object CarOptions : Options { }
Run Code Online (Sandbox Code Playgroud)

然后我想获取所有Objects实现 Optionsinterface并调用 load 的内容forEach

fun main(args: Array) {
    configFuture.whenComplete { config ->
            options.forEach { it.load(config) }
    }
}
Run Code Online (Sandbox Code Playgroud)

kotlin

5
推荐指数
1
解决办法
4949
查看次数

使用示例ReactorNettyWebSocketClient

New Spring在Spring文档中有一些WebSocketClient示例。

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);
Run Code Online (Sandbox Code Playgroud)

但这很短,也不清楚:

  1. 如何向服务器(订阅频道)发送消息?
  2. 然后处理传入的流并发出Flux消息?
  3. 连接中断时重新连接到服务器?

有人可以提供更复杂的例子吗?

UPD。我试图做类似的事情:

public Flux<String> getStreaming() {

    WebSocketClient client = new ReactorNettyWebSocketClient();
    EmitterProcessor<String> output = EmitterProcessor.create();
    Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }");

    Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"),
            session -> session
                    .send(input.map(session::textMessage))
                    .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
                    .then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}
Run Code Online (Sandbox Code Playgroud)

但这仅返回一条消息。就像我没有订阅。

spring-boot spring-websocket spring-webflux

4
推荐指数
1
解决办法
1014
查看次数

Spring WebFlux(反应器)。zipWith 时出错 - 由于缺少请求而无法发出滴答声

我有一个 Flux,对于每个对象,我应该对第三方 REST 进行 API 调用(大约 1000 次调用)。为了防止每秒出现许多请求,我使用:

    Flux<Calls> callsIntervalFlux=
            Flux.interval(Duration.ofMillis(100))
                    .zipWith(callsFlux, (i, call) -> call);

// and now Calls emits every 10ms, and REST API is not overloaded
Run Code Online (Sandbox Code Playgroud)

问题是,有时应用程序会因异常而失败:

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Run Code Online (Sandbox Code Playgroud)

有没有我可以添加的逻辑来防止错误,或者只是跳过这个勾号?

project-reactor

3
推荐指数
1
解决办法
2113
查看次数