Spring R2dbc:有没有办法从postgresql数据库获取恒定流并处理它们?

Spa*_*cus 5 spring-webflux spring-data-r2dbc r2dbc r2dbc-postgresql

我想将 postgresql 中的表中新创建的记录作为实时/连续流获取。可以使用spring r2dbc吗?如果是这样,我有什么选择?

谢谢

Koz*_*łek 1

您需要使用pg_notify并开始在其上列出。您想要看到的任何更改都应该包含在简单的触发器中,该触发器将向pg_notify.

我的github上有一个这样的例子,长话短说:

准备函数和触发器:

CREATE OR REPLACE FUNCTION notify_member_saved()
    RETURNS TRIGGER
AS $$
BEGIN
    PERFORM pg_notify('MEMBER_SAVED',  row_to_json(NEW)::text);
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER member_saved_trigger
    AFTER INSERT OR UPDATE
    ON members
    FOR EACH ROW
EXECUTE PROCEDURE notify_member_saved();
Run Code Online (Sandbox Code Playgroud)

在java代码中准备监听器


@Service
@RequiredArgsConstructor
@Slf4j
class NotificationService {


    private final ConnectionFactory connectionFactory;
    private final Set<NotificationTopic> watchedTopics = Collections.synchronizedSet(new HashSet<>());

    @Qualifier("postgres-event-mapper")
    private final ObjectMapper objectMapper;

    private PostgresqlConnection connection;


    @PreDestroy
    private void preDestroy() {
        this.getConnection().close().subscribe();
    }

    private PostgresqlConnection getConnection() {
        if(connection == null) {
            synchronized(NotificationService.class) {
                if(connection == null) {
                    try {
                        connection = Mono.from(connectionFactory.create())
                                .cast(Wrapped.class)
                                .map(Wrapped::unwrap)
                                .cast(PostgresqlConnection.class)
                                .toFuture().get();
                    } catch(InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch(ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }
        return this.connection;
    }

    public <T> Flux<T> listen(final NotificationTopic topic, final Class<T> clazz) {

        if(!watchedTopics.contains(topic)) {
            executeListenStatement(topic);
        }

        return getConnection().getNotifications()
                .log("notifications")
                .filter(notification -> topic.name().equals(notification.getName()) && notification.getParameter() != null)
                .handle((notification, sink) -> {
                    final String json = notification.getParameter();
                    if(!StringUtils.isBlank(json)) {
                        try {
                            sink.next(objectMapper.readValue(json, clazz));
                        } catch(JsonProcessingException e) {
                            log.error(String.format("Problem deserializing an instance of [%s] " +
                                    "with the following json: %s ", clazz.getSimpleName(), json), e);
                            Mono.error(new DeserializationException(topic, e));
                        }
                    }
                });
    }

    private void executeListenStatement(final NotificationTopic topic) {
        getConnection().createStatement(String.format("LISTEN \"%s\"", topic)).execute()
                .doOnComplete(() -> watchedTopics.add(topic))
                .subscribe();
    }

    public void unlisten(final NotificationTopic topic) {
        if(watchedTopics.contains(topic)) {
            executeUnlistenStatement(topic);
        }
    }

    private void executeUnlistenStatement(final NotificationTopic topic) {
        getConnection().createStatement(String.format("UNLISTEN \"%s\"", topic)).execute()
                .doOnComplete(() -> watchedTopics.remove(topic))
                .subscribe();
    }
}
Run Code Online (Sandbox Code Playgroud)

从控制器开始列出

@GetMapping("/events")
    public Flux<ServerSentEvent<Object>> listenToEvents() {

        return Flux.merge(listenToDeletedItems(), listenToSavedItems())
                .map(o -> ServerSentEvent.builder()
                        .retry(Duration.ofSeconds(4L))
                        .event(o.getClass().getName())
                        .data(o).build()
                );

    }

    @GetMapping("/unevents")
    public Mono<ResponseEntity<Void>> unlistenToEvents() {
        unlistenToDeletedItems();
        unlistenToSavedItems();
        return Mono.just(
                ResponseEntity
                        .status(HttpStatus.I_AM_A_TEAPOT)
                        .body(null)
        );
    }

    private Flux<Member> listenToSavedItems() {
        return this.notificationService.listen(MEMBER_SAVED, Member.class);
    }


    private void unlistenToSavedItems() {
        this.notificationService.unlisten(MEMBER_SAVED);
    }
Run Code Online (Sandbox Code Playgroud)

但请记住,如果出现问题,那么您会pg_notify在一段时间内丢失事件,因此这适用于非任务关键型解决方案。