Vert.x事件循环 - 这是如何异步的?

use*_*101 17 java nio vert.x undertow

我正在使用Vert.x和基于事件循环的服务器相当新,而不是线程/连接模型.

public void start(Future<Void> fut) {
    vertx
        .createHttpServer()
        .requestHandler(r -> {
            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
            final MyModel model = new MyModel();
            try {

                for(int i=0;i<10000000;i++){
                    //some simple operation
                }

                model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

          r.response().end(
                  new Gson().toJson(model)
                 );
        })
        .listen(4568, result -> {
          if (result.succeeded()) {
            fut.complete();
          } else {
            fut.fail(result.cause());
          }
        });
    System.out.println("Server started ..");
  }
Run Code Online (Sandbox Code Playgroud)
  • 我只是想模拟一个长时间运行的请求处理程序来理解这个模型是如何工作的.
  • 我观察到的是所谓的事件循环被阻止,直到我的第一个请求完成.无论花费多少时间,后续请求都不会被执行,直到前一个请求完成.
  • 显然我在这里错过了一块,这就是我在这里的问题.

根据目前的答案编辑:

  1. 是不是接受所有被认为是异步的请求?如果只有当前一个连接被清除时才能接受新连接,它是如何异步的?
    • 假设典型请求需要100毫秒到1秒之间的任何时间(基于请求的种类和性质).所以这意味着,事件循环在前一个请求完成之前不能接受新连接(即使它在一秒钟内结束).如果我作为程序员必须仔细考虑所有这些并将这些请求处理程序推送到工作线程,那么它与线程/连接模型有何不同?
    • 我只是想了解传统的线程/连接服务器模型中这个模型是如何更好的?假设没有I/O操作或所有I/O操作都是异步处理的?当它无法并行启动所有并发请求并且必须等到前一个请求终止时,它如何解决c10k问题?
  2. 即使我决定将所有这些操作推送到工作线程(汇集),那么我回到同样的问题不是吗?线程间的上下文切换? 编辑和打顶这个问题以获得赏金

    • 不完全理解这个模型如何声称异步.
    • Vert.x有一个异步JDBC客户端(Asyncronous是关​​键字),我试图用RXJava进行调整.
    • 这是一个代码示例(相关部分)

    server.requestStream().toObservable().subscribe(req - > {

            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME));
            jdbc.getConnectionObservable().subscribe(
                    conn -> {
    
                        // Now chain some statements using flatmap composition
                        Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'");
                        // Subscribe to the final result
                        resa.subscribe(resultSet -> {
    
                            req.response().end(resultSet.getRows().toString());
                            System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                        }, err -> {
                            System.out.println("Database problem");
                            err.printStackTrace();
                        });
                    },
    
                    // Could not connect
                    err -> {
                        err.printStackTrace();
                    }
                    );
    
    });
    server.listen(4568);
    
    Run Code Online (Sandbox Code Playgroud)
    • 选择查询大约需要3秒才能返回完整的表转储.
    • 当我触发并发请求(只用2尝试)时,我看到第二个请求完全等待第一个请求完成.
    • 如果JDBC select是异步的,那么在等待select查询返回任何内容时,让框架处理第二个连接是不是一个公平的期望.

Dmi*_*kiy 31

Vert.x事件循环实际上是许多平台上存在的经典事件循环.当然,大多数解释和文档都可以在Node.js中找到,因为它是基于这种架构模式的最流行的框架.看看Node.js事件循环下的一个或多或少好的机制解释.Vert.x教程在"不要打电话给我们,我们会打电话给你"和"Verticles"之间有很好的解释.

编辑您的更新:

首先,当您使用事件循环时,主线程应该可以非常快速地处理所有请求.你不应该在这个循环中做任何长时间的工作.当然,您不应该等待对数据库的响应. - 异步调度调用 - 为结果分配回调(处理程序) - 将在工作线程,非事件循环线程中执行回调.例如,此回调将返回对套接字的响应.因此,您在事件循环中的操作应该只使用回调计划所有异步操作,并在不等待任何结果的情况下转到下一个请求.

假设典型请求需要100毫秒到1秒之间的任何时间(基于请求的种类和性质).

在这种情况下,您的请求有一些计算昂贵的部件或访问IO - 事件循环中的代码不应等待此操作的结果.

我只是想了解传统的线程/连接服务器模型中这个模型是如何更好的?假设没有I/O操作或所有I/O操作都是异步处理的?

当您有太多并发请求和传统编程模型时,您将根据每个请求创建线程.这个帖子会做什么?他们将主要等待IO操作(例如,数据库的结果).浪费资源.在我们的事件循环模型中,您有一个主线程,用于为长任务调度操作和预分配的工作线程数量.+这些工作者实际上都没有等待响应,他们只能在等待IO结果时执行另一个代码(它可以实现为当前正在进行的IO作业的回调或定期检查状态).我建议您通过Java NIO和Java NIO 2来了解如何在框架内实际实现此异步IO.绿色线程也是非常相关的概念,这将是很好理解.绿色线程和协同程序是一种阴影事件循环,它试图实现同样的事情 - 更少的线程,因为我们可以重用系统线程,而绿色线程等待某事.

当它无法并行启动所有并发请求并且必须等到前一个请求终止时,它如何解决c10k问题?

当然,我们不会在主线程中等待发送先前请求的响应.获取请求,安排长/ IO任务执行,下一个请求.

即使我决定将所有这些操作推送到工作线程(汇集),那么我回到同样的问题不是吗?线程间的上下文切换?

如果你把一切都做对了 - 没有.更重要的是,您将获得良好的数据位置和执行流预测.一个CPU核心将执行您的短事件循环并安排异步工作而无需上下文切换,仅此而已.其他核心调用数据库并返回响应,只有这一点.在回调之间切换或检查不同通道的IO状态实际上并不需要任何系统线程的上下文切换 - 它实际上在一个工作线程中工作.因此,我们每个核心有一个工作线程,这个系统线程等待/检查来自多个数据库连接的结果可用性.重温Java NIO概念,了解它如何以这种方式工作.(NIO的经典示例 - 代理服务器,可以接受许多并行连接(数千),代理请求到其他一些远程服务器,监听响应并将响应发送回客户端,所有这些都使用一个或两个线程)

关于你的代码,我为你做了一个示例项目,以证明一切都按预期工作:

public class MyFirstVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
                .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
                .put("driver_class", "org.hsqldb.jdbcDriver")
                .put("max_pool_size", 30));


        client.getConnection(conn -> {
            if (conn.failed()) {throw new RuntimeException(conn.cause());}
            final SQLConnection connection = conn.result();

            // create a table
            connection.execute("create table test(id int primary key, name varchar(255))", create -> {
                if (create.failed()) {throw new RuntimeException(create.cause());}
            });
        });

        vertx
            .createHttpServer()
            .requestHandler(r -> {
                int requestId = new Random().nextInt();
                System.out.println("Request " + requestId + " received");
                    client.getConnection(conn -> {
                         if (conn.failed()) {throw new RuntimeException(conn.cause());}

                         final SQLConnection connection = conn.result();

                         connection.execute("insert into test values ('" + requestId + "', 'World')", insert -> {
                             // query some data with arguments
                             connection
                                 .queryWithParams("select * from test where id = ?", new JsonArray().add(requestId), rs -> {
                                     connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
                                     System.out.println("Result " + requestId + " returned");
                                     r.response().end("Hello");
                                 });
                         });
                     });
            })
            .listen(8080, result -> {
                if (result.succeeded()) {
                    fut.complete();
                } else {
                    fut.fail(result.cause());
                }
            });
    }
}

@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {

  private Vertx vertx;

  @Before
  public void setUp(TestContext context) {
    vertx = Vertx.vertx();
    vertx.deployVerticle(MyFirstVerticle.class.getName(),
        context.asyncAssertSuccess());
  }

  @After
  public void tearDown(TestContext context) {
    vertx.close(context.asyncAssertSuccess());
  }

  @Test
  public void testMyApplication(TestContext context) {
      for (int i = 0; i < 10; i++) {
          final Async async = context.async();
          vertx.createHttpClient().getNow(8080, "localhost", "/",
                            response -> response.handler(body -> {
                                context.assertTrue(body.toString().contains("Hello"));
                                async.complete();
                            })
        );
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

输出:

Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned
Run Code Online (Sandbox Code Playgroud)

因此,我们接受对数据库的请求 - 调度请求,转到下一个请求,我们消耗所有这些请求,并且只有在完成所有数据库的操作时才为每个请求发送响应.

关于你的代码示例我看到两个可能的问题 - 首先,看起来你没有close()连接,这对于将它返回池是很重要的.二,如何配置池?如果只有一个空闲连接 - 此请求将序列化等待此连接.

我建议您为两个请求添加一些时间戳打印,以查找序列化的位置.你把一些东西用来阻止事件循环.或者......检查您是否在测试中并行发送请求.在获得之前的回复后不是下一个.


pra*_*sun 7

这是如何异步的?答案就在你的问题中

我观察到的是所谓的事件循环被阻止,直到我的第一个请求完成.无论花费多少时间,后续请求都不会被执行,直到前一个请求完成

我们的想法是使用与您长时间运行的任务阻止的相同线程,而不是为每个HTTP请求提供新的服务.

事件循环的目标是将上下文切换所涉及的时间从一个线程保存到另一个线程,并在任务使用IO /网络活动时利用理想的CPU时间.如果在处理您的请求时它必须进行其他IO /网络操作,例如:在此期间从远程MongoDB实例获取数据,您的线程将不会被阻塞,而是另一个请求将由同一个线程提供,这是理想的用例事件循环模型(考虑到您有并发请求到您的服务器).

如果您有长时间运行的任务不涉及网络/ IO操作,您应该考虑使用线程池,如果您阻止主事件循环线程本身,其他请求将被延迟.即对于长时间运行的任务,您可以支付上下文切换的代价,以便服务器能够做出响应.

编辑:服务器处理请求的方式可能有所不同:

1)为每个传入请求生成一个新线程(在此模型中,上下文切换会很高,每次产生一个新线程会产生额外的成本)

2)使用线程池为请求提供服务(同一组线程将用于服务请求,额外请求排队等待)

3)使用事件循环(所有请求的单个线程.可忽略的上下文切换.因为会有一些线程运行,例如:排队传入的请求)

首先,上下文切换也不错,需要保持应用服务器响应,但是,如果并发请求的数量过多(大约超过10k),过多的上下文切换可能会成为问题.如果您想更详细地了解我建议您阅读C10K文章

假设典型请求需要100毫秒到1秒之间的任何时间(基于请求的种类和性质).所以这意味着,事件循环在前一个请求完成之前不能接受新连接(即使它在一秒钟内结束).

如果您需要响应大量并发请求(超过10k),我会考虑将500ms以上作为更长时间运行的操作.其次,就像我说的那样,涉及一些线程/上下文切换,例如:排队传入的请求,但是,线程之间的上下文切换将大大减少,因为一次只有太少的线程.第三,如果在解决第一个请求时涉及网络/ IO操作,则在第一个请求解决之前有机会得到解决,这就是这个模型运行良好的地方.

如果我作为程序员必须仔细考虑所有这些并将这些请求处理程序推送到工作线程,那么它与线程/连接模型有何不同?

Vertx正试图为您提供最佳的线程和事件循环,因此,作为程序员,您可以调用如何在应用程序(即使用和不使用网络/ IO操作的长时间运行)的情况下使应用程序高效.

我只是想了解传统的线程/连接服务器模型中这个模型是如何更好的?假设没有I/O操作或所有I/O操作都是异步处理的?当它无法并行启动所有并发请求并且必须等到前一个请求终止时,它如何解决c10k问题?

以上解释应该回答这个问题.

即使我决定将所有这些操作推送到工作线程(汇集),那么我回到同样的问题不是吗?线程间的上下文切换?

就像我说的,两者都有优点和缺点,vertx为您提供了模型,根据您的使用情况,您必须选择适合您的场景的理想选择.


Pet*_*rey 3

在这些类型的处理引擎中,应该将长时间运行的任务转换为异步执行的操作,这是执行此操作的方法,以便关键线程可以尽快完成并返回执行另一个任务。即任何 IO 操作都会传递给框架,以便在 IO 完成时回调您。

该框架是异步的,因为它支持您生成和运行这些异步任务,但它不会将您的代码从同步更改为异步。