Ole*_*lek 6 java connection-pooling r2dbc
我正在尝试使用连接池配置与 PostgreSQL 数据库的 R2DBC 连接。我已遵循此文档:https://github.com/r2dbc/r2dbc-pool/blob/main/README.md。
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
.option(HOST, "192.168.1.200")
.option(PORT, 5433)
.option(DATABASE, "XXX")
.option(USER, "XXX")
.option(PASSWORD, "XXX")
.build());
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMillis(1000))
.initialSize(10)
.maxSize(100)
.build();
ConnectionPool pool = new ConnectionPool(configuration);
Mono<Connection> connection = pool.create();
return Flux.from(connection)
.flatMap(conn -> conn
.createStatement("SELECT id, name, description FROM assortment")
.execute())
.flatMap(result -> result
.map((row, rowMetadata) ->
new AssortmentItem(
row.get("id", Long.class),
row.get("name", String.class),
row.get("description", String.class))
));
Run Code Online (Sandbox Code Playgroud)
但我不知道如何在查询后关闭连接,以将其返回到池中。我试过了:
connection.close();
Run Code Online (Sandbox Code Playgroud)
但这不起作用,请尝试使用资源。如果没有这个,经过 10 次调用后,我收到“io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException”,并显示消息“抱歉,已经有太多客户端”。
在我的示例中如何关闭连接并将其返回池?当最大池大小为 100 时,为什么我只能从池(初始池大小)调用 10 个查询?
这是使用 Reactor 的一种方法,灵感来自OracleTestKit:
pool.create()
.flatMapMany(connection ->
Flux.from(connection.createStatement("SELECT ...").execute())
.flatMap(it -> it.map((r, m) -> new AssortmentItem(...)))
)
.concatWith(Mono.from(connection.close()).cast(AssortmentItem.class))
Run Code Online (Sandbox Code Playgroud)
其他示例在邮件列表的讨论中给出,例如通过Flux.usingWhen():
Flux.usingWhen(
pool.create(),
c -> c.createStatement("SELECT ...").execute(),
c -> c.close()
)
.flatMap(it -> it.map((r, m) -> r.get("id", Long.class)));
Run Code Online (Sandbox Code Playgroud)
最终,这不是关于 R2DBC,而是关于使用反应流 API(例如,reactor)来正确排序execute()和close()操作。
许多第三方 API已经封装了 R2DBC。jOOQ 3.15 也会。这将使这个特定的任务变得更容易一些:
// Inject this into your beans, or whatever
DSLContext ctx = DSL.using(pool);
// jOOQ queries are publishers
Flux.from(ctx.resultQuery("SELECT id, name, description FROM assortment"))
// Use jOOQ's mapping to automatically map fields by reflection
.map(r -> r.into(AssortItem.class));
Run Code Online (Sandbox Code Playgroud)
这不使用或依赖于 jOOQ 的 DSL(尽管如果您愿意,您也可以这样做)。我想 JDBI 和其他 API 也会提供类似的简化吗?
免责声明:我在 jOOQ 背后的公司工作。
| 归档时间: |
|
| 查看次数: |
5422 次 |
| 最近记录: |