Vertx 中同步发送 EventBus 消息

Dra*_*ban 0 java reactive-programming event-bus vert.x rx-java

我想通过 Vertx 中的 EventBus 同步发送多条消息。我想发送一条消息,等待它,然后再发送下一条消息。地址是一样的。如果是默认的话我该怎么做?还是必须使用executeBlocking代码?

这是我的代码。

public class EventBusSync {
    private Vertx vertx = Vertx.vertx();
    private static final String SERVICE_ADDRESS =  "service.worker";
    
  public void sentViaEvBus() {
    String message1 = "message1";
    String message2 = "message2";
    
    String reply1 = sendCommand(SERVICE_ADDRESS,message1);
    String reply2 = sendCommand(SERVICE_ADDRESS,message2);

  }

  private String sendCommand(String address, String command) {
   String message;
   vertx.eventBus().send(address,command, handler -> {
    if(handler.succeeded()) {
     log.info("success");
   } else {
     log.error("error",handler.cause());
     throw new RuntimeException("ERROR");
    }
    message = handler.result.body();
    });
 return message;
  }
 }
Run Code Online (Sandbox Code Playgroud)

所以在这里,如果发送第一个命令并且发生了一些事情,我想中断下一个事件总线的发送。

谢谢

tse*_*ont 6

使用CompleteFuture

  private String sendCommand(String address, String command) {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();
    vertx.eventBus().<String>send(address, command, asyncResult -> {
      if (asyncResult.succeeded()) {
        completableFuture.complete(asyncResult.result().body());
      } else {
        completableFuture.completeExceptionally(asyncResult.cause());
      }
    });
    try {
      return completableFuture.get();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
Run Code Online (Sandbox Code Playgroud)

确保此代码不会在 Vert.x 事件循环上调用,因为get()在知道回复之前会阻塞。