我有类似的东西:
interface Options {
fun load(conf: JsonObject)
}
object BasicOptions : Options { }
object PersonOptions : Options { }
object CarOptions : Options { }
Run Code Online (Sandbox Code Playgroud)
然后我想获取所有Objects实现 Optionsinterface并调用 load 的内容forEach。
fun main(args: Array) {
configFuture.whenComplete { config ->
options.forEach { it.load(config) }
}
}
Run Code Online (Sandbox Code Playgroud) New Spring在Spring文档中有一些WebSocketClient示例。
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);
Run Code Online (Sandbox Code Playgroud)
但这很短,也不清楚:
有人可以提供更复杂的例子吗?
UPD。我试图做类似的事情:
public Flux<String> getStreaming() {
WebSocketClient client = new ReactorNettyWebSocketClient();
EmitterProcessor<String> output = EmitterProcessor.create();
Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }");
Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"),
session -> session
.send(input.map(session::textMessage))
.thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
.then());
return output.doOnSubscribe(s -> sessionMono.subscribe());
}
Run Code Online (Sandbox Code Playgroud)
但这仅返回一条消息。就像我没有订阅。
我有一个 Flux,对于每个对象,我应该对第三方 REST 进行 API 调用(大约 1000 次调用)。为了防止每秒出现许多请求,我使用:
Flux<Calls> callsIntervalFlux=
Flux.interval(Duration.ofMillis(100))
.zipWith(callsFlux, (i, call) -> call);
// and now Calls emits every 10ms, and REST API is not overloaded
Run Code Online (Sandbox Code Playgroud)
问题是,有时应用程序会因异常而失败:
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Run Code Online (Sandbox Code Playgroud)
有没有我可以添加的逻辑来防止错误,或者只是跳过这个勾号?