手动创建复制插槽以在PostgreSQL 10中发布

Geo*_*rge 6 postgresql replication postgresql-10

我正在尝试从PostgreSQL数据库中获取某些表的更新流。获取所有更新的常规方法如下所示:

您创建一个逻辑复制插槽

pg_create_logical_replication_slot('my_slot', 'wal2json');
Run Code Online (Sandbox Code Playgroud)

并使用pg_recvlogical或进行特殊的SQL查询连接到它。这使您可以从json中的数据库中获取所有操作(如果您使用了wal2json插件或类似工具),然后对该数据执行任何操作。

但是在PostgreSQL 10中,我们具有发布/订阅机制,该机制仅允许我们复制选定的表。这非常方便,因为不会发送大量无用的数据。该过程如下所示:

首先,您创建出版物

CREATE PUBLICATION foo FOR TABLE herp, derp;
Run Code Online (Sandbox Code Playgroud)

然后,您从另一个数据库订阅该出版物

CREATE SUBSCRIPTION mysub CONNECTION <connection stuff> PUBLICATION foo;
Run Code Online (Sandbox Code Playgroud)

这将在后台的主数据库上创建一个复制插槽,并开始侦听更新并将它们提交到第二个数据库的相同表中。如果您的工作是复制一些表,但又想获取原始数据流,那就很好了。

如前所述,该CREATE SUBSCRIPTION查询正在幕后的master数据库上创建一个复制插槽,但是如何在没有订阅的情况下手动创建一个复制插槽,又如何创建另一个数据库呢?这里的文档说:

为此,请单独创建复制插槽(使用具有插件名称pgoutput的函数pg_create_logical_replication_slot)

根据文档,这是可能的,但是pg_create_logical_replication_slot只能创建一个常规的复制插槽。是pgoutput插件负责所有的魔法?如果是,那么将无法使用其他插件(如wal2json出版物)。

我在这里想念什么?

小智 9

创建逻辑复制槽和发布后,您可以通过以下方式创建订阅:

CREATE SUBSCRIPTION mysub
       CONNECTION <conn stuff>
       PUBLICATION foo
       WITH (slot_name=my_slot, create_slot=false);
Run Code Online (Sandbox Code Playgroud)

不确定这是否能回答您的问题。


krz*_*chu 8

我在 Postgres 中逻辑复制和逻辑解码的经验有限,所以如果下面有错误,请纠正我。话虽如此,这是我发现的:

  1. 发布支持由pgoutput插件提供。您可以通过特定于插件的 options使用它。可能是其他插件有可能添加支持,但不知道逻辑解码插件接口是否暴露了足够的细节。我针对wal2json插件版本对其进行了测试9e962ba,但它无法识别此选项。
  2. 复制槽是独立于发布创建的。在获取更改流时指定要用作过滤器的发布。可以查看一个发布的更改,然后查看另一个发布的更改并观察不同的更改集,尽管使用相同的复制槽(我没有找到它的文档,我正在使用 Postgres 兼容性在 Aurora 上进行测试,因此行为可能会有所不同)。
  3. 插件输出似乎包括开始和提交的所有条目,即使事务没有触及感兴趣的发布中包含的任何表。但是,它不包括对出版物中包含的其他表格的更改。

以下是如何在 Postgres 10+ 中使用它的示例:

-- Create publication
CREATE PUBLICATION cdc;

-- Create slot
SELECT pg_create_logical_replication_slot('test_slot_v1', 'pgoutput');

-- Create example table
CREATE TABLE replication_test_v1
(
  id integer NOT NULL PRIMARY KEY,
  name text
);

-- Add table to publication
ALTER PUBLICATION cdc ADD TABLE replication_test_v1;

-- Insert example data
INSERT INTO replication_test_v1(id, name) VALUES
  (1, 'Number 1')
;

-- Peak changes (does not consume changes)
SELECT pg_logical_slot_peek_binary_changes('test_slot_v1', NULL, NULL, 'publication_names', 'cdc', 'proto_version', '1');

-- Get changes (consumes changes)
SELECT pg_logical_slot_get_binary_changes('test_slot_v1', NULL, NULL, 'publication_names', 'cdc', 'proto_version', '1');
Run Code Online (Sandbox Code Playgroud)

要将 Postgres 的更改流式传输到其他系统,您可以考虑使用Debezium项目。它是一个用于更改数据捕获的开源分布式平台,其中提供了一个 PostgreSQL 连接器。在 0.10 版本中,他们添加了对pgoutput插件的支持。即使您的用例与项目提供的非常不同,您也可以查看他们的代码以了解它们如何与复制 API 交互。

  • 谢谢!我在这里唯一要添加的是相关的 CREATE SUBSCRIPTION 命令,即“WITH (slot_name = test_slot_v1, create_slot = false)”部分。 (2认同)