vert.x:发布和使用事件总线中的消息

Cra*_*hax 1 concurrency reactive-programming vert.x

我写了以下代码:

public class VertxApp {

    public static void main(String[] args)  { // This is OK
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new ReceiveVerticle());  // line A
        vertx.deployVerticle(new SendVerticle());     // line B
    }
}

public class ReceiveVerticle extends AbstractVerticle{

    @Override
    public void start(Future<Void> startFuture) {
        vertx.eventBus().consumer("address", message -> {
            System.out.println("message received by receiver");
            System.out.println(message.body());
        });
    }
}

public class SendVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> startFuture) throws InterruptedException {
        System.out.println("SendVerticle started!");
        int i = 0;

        for (i = 0; i < 5; i++) {
            System.out.println("Sender sends a message " + i );
            vertx.eventBus().publish("address", "message" + i);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

此代码不一致.有竞争条件.如果我多次运行代码,有时会消耗所有发送的5条消息,有时甚至都不会消耗它们.

你能解释为什么这里有竞争条件以及如何解决这个问题?

tse*_*ont 6

没有竞争条件,部署Verticle是一个异步操作,您的接收器Verticle可能会在发送者Verticle发送消息注册消费者.

要确保按顺序执行操作,请使用deploy带有handler参数的方法:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new ReceiveVerticle(), ar -> {
    if (ar.succeeded()) {
        vertx.deployVerticle(new SendVerticle());
    } else {
        // handle the problem -> ar.cause()
    }
});
Run Code Online (Sandbox Code Playgroud)