关闭 R2DBC 池的连接

Ole*_*lek 6 java connection-pooling r2dbc

我正在尝试使用连接池配置与 PostgreSQL 数据库的 R2DBC 连接。我已遵循此文档:https://github.com/r2dbc/r2dbc-pool/blob/main/README.md

ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(DRIVER, "pool")
        .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
        .option(HOST, "192.168.1.200")
        .option(PORT, 5433)
        .option(DATABASE, "XXX")
        .option(USER, "XXX")
        .option(PASSWORD, "XXX")
        .build());

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(Duration.ofMillis(1000))
        .initialSize(10)
        .maxSize(100)
        .build();
ConnectionPool pool = new ConnectionPool(configuration);

Mono<Connection> connection = pool.create();
return Flux.from(connection)
        .flatMap(conn -> conn
                .createStatement("SELECT id, name, description FROM assortment")
                .execute())
        .flatMap(result -> result
                .map((row, rowMetadata) ->
                        new AssortmentItem(
                                row.get("id", Long.class),
                                row.get("name", String.class),
                                row.get("description", String.class))
                ));
Run Code Online (Sandbox Code Playgroud)

但我不知道如何在查询后关闭连接,以将其返回到池中。我试过了:

connection.close();
Run Code Online (Sandbox Code Playgroud)

但这不起作用,请尝试使用资源。如果没有这个,经过 10 次调用后,我收到“io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException”,并显示消息“抱歉,已经有太多客户端”。

在我的示例中如何关闭连接并将其返回池?当最大池大小为 100 时,为什么我只能从池(初始池大小)调用 10 个查询?

Luk*_*der 6

直接使用R2DBC,带reactor

这是使用 Reactor 的一种方法,灵感来自OracleTestKit

pool.create()
    .flatMapMany(connection ->
        Flux.from(connection.createStatement("SELECT ...").execute())
            .flatMap(it -> it.map((r, m) -> new AssortmentItem(...)))
    )
    .concatWith(Mono.from(connection.close()).cast(AssortmentItem.class))
Run Code Online (Sandbox Code Playgroud)

其他示例在邮件列表的讨论中给出,例如通过Flux.usingWhen()

Flux.usingWhen(
        pool.create(),
        c -> c.createStatement("SELECT ...").execute(),
        c -> c.close()
    )
    .flatMap(it -> it.map((r, m) -> r.get("id", Long.class)));
Run Code Online (Sandbox Code Playgroud)

最终,这不是关于 R2DBC,而是关于使用反应流 API(例如,reactor)来正确排序execute()close()操作。

将 R2DBC 与 jOOQ 结合使用

许多第三方 API已经封装了 R2DBC。jOOQ 3.15 也会。这将使这个特定的任务变得更容易一些:

// Inject this into your beans, or whatever
DSLContext ctx = DSL.using(pool);

// jOOQ queries are publishers
Flux.from(ctx.resultQuery("SELECT id, name, description FROM assortment"))

// Use jOOQ's mapping to automatically map fields by reflection
    .map(r -> r.into(AssortItem.class));
Run Code Online (Sandbox Code Playgroud)

这不使用或依赖于 jOOQ 的 DSL(尽管如果您愿意,您也可以这样做)。我想 JDBI 和其他 API 也会提供类似的简化吗?

免责声明:我在 jOOQ 背后的公司工作。