JJ *_*kar 12 postgresql spring websocket rx-java spring-webflux
我正在尝试用反应异步postgres-async-driver替换PostgreSQL数据库轮询器并将新插入的行流式传输到Spring 5 Webflux Reactive websocket客户端,例如Josh Long 在这里演示并基于SébastienDeleuze的 spring-reactive-playground的很棒的示例.
我Publisher
获得第一个row
,但后来没有返回后续行.是问题,我Observable
,我Publisher
,或者我如何使用Postgres的,异步驱动程序Db
?
public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
return
// com.github.pgasync.Db
db.queryRows(sql)
// ~RowMapper method
.map(row -> mapRowToDto(row))
// serialize dto to String for websocket
.map(dto -> { return objectMapper.writeValueAsString(dto); })
// finally, write to websocket session
.map(str -> { return session.textMessage((String) str);
});
}
Run Code Online (Sandbox Code Playgroud)
然后,Observable
我WebSocketHandler
使用RxReactiveStream.toPublisher
转换器连接到我:
@Bean
WebSocketHandler dbWebSocketHandler() {
return session -> {
Observable<WebSocketMessage> o = getObservableWSM(session);
return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
};
}
Run Code Online (Sandbox Code Playgroud)
这row
将从我的sql
声明中获取第一个,但没有额外的行.如何继续传输其他行?
理想情况下,我认为我希望PostgreSQL等效于MongoDB Tailable游标.
我创建了一个Postgres 触发器INSERT
,根据
以下示例触发我的表:
CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
id bigint;
BEGIN
IF TG_OP = 'INSERT' THEN
id = NEW.id;
ELSE
id = OLD.id;
END IF;
PERFORM pg_notify('my_trigger_name', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Run Code Online (Sandbox Code Playgroud)
然后我使用reactive-pg-client订阅了Postgres触发器。以下是他们的 Pub/Sub 示例中的代码:
@Bean
PgPool subscribedNotificationHandler() {
PgPool client = pgPool();
client.getConnection(asyncResult -> {
if (asyncResult.succeeded()) {
PgConnection connection = asyncResult.result();
connection.notificationHandler(notification -> {
notification.getPayload();
// do things with payload
});
connection.query("LISTEN my_trigger_name", ar -> {
log.info("Subscribed to channel");
});
}
});
return client;
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2466 次 |
最近记录: |