瀚高数据库
目录
环境
文档用途
详细信息
环境
系统平台:Linux x86-64 Red Hat Enterprise Linux 7
版本:14
文档用途
本文为pglogical的系列文章,主要介绍pglogical各场景下的常用操作,演示场景接之前的安装部署.
详细信息
一、常用视图介绍
1、节点信息
select * from pglogical.node;
2、查询节点接口信息
select * from pglogical.node_interface;
3、查询序列信息
select * from pglogial.queue;
4、查询复制集明细
select * from pglogical.replication_set;
5、查询复制集序列明细
select * from pglogical.replication_set_seq;
6、查询复制集中的表明细
select * from pglogical.replication_set_table;
7、查询复制序列状态
select * from pglogical.sequence_state;
8、查询订阅端明细
select * from pglogical.subscription;
9、查询可加入逻辑复制的表列表
select * from pglogical.tables;
10、查询订阅的同步状态
select * from pglogical.show_subscription_status(subscription_name name);
select subscription_name,status from pglogical.show_subscription_status();
select subscription_name,status,provider_dsn from pglogical.show_subscription_status();
11、查询订阅表的同步状态
select * from pglogical.show_subscription_table(subscription_name name, relation regclass)
注:以下场景均是在pglogical的安装配置场景演示试验,故前置的搭建过程已省略,请接上文连续阅读。
二、增加单个普通表
对于单向逻辑复制,在两端都已存在要同步的表结构的情况下,直接在复制集中增加要同步的表即可
核心函数:pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)
发布端创建测试表t04并插入数据
[postgres@host1 ~]$ psql test1 test1
psql (14.6)
Type "help" for help.
test1=> create table t04(id int primary key,name text);
CREATE TABLE
test1=> insert into t04 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000
test1=>
订阅端创建同步表结构
[postgres@host2 ~]$ psql test2 test2
psql (14.6)
Type "help" for help.
test2=> create table t04(id int primary key,name text);
CREATE TABLE
发布端将表t04增加到复制集中,可看到相关逻辑复制进程启动过程
test1=> \c test1 postgres
You are now connected to database "test1" as user "postgres".
test1=# select pglogical.replication_set_add_table(set_name:='default',relation:='t04',synchronize_data:=true);
replication_set_add_table
---------------------------
t
(1 row)
test1=# 2023-01-03 16:33:56.738 CST [2252] LOG: logical decoding found consistent point at 0/198DF50
2023-01-03 16:33:56.738 CST [2252] DETAIL: There are no running transactions.
2023-01-03 16:33:56.738 CST [2252] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_81966cb4" LOGICAL pglogical_output
2023-01-03 16:33:56.738 CST [2252] LOG: exported logical decoding snapshot: "00000006-000003AF-1" with 0 transaction IDs
2023-01-03 16:33:56.738 CST [2252] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_81966cb4" LOGICAL pglogical_output
订阅端查询同步完成,并可看到同步进程
test2=# 2023-01-03 16:33:57.137 CST [2114] LOG: starting sync of table public.t04 for subscriber subscription1
2023-01-03 16:33:58.144 CST [2114] LOG: finished sync of table public.t04 for subscriber subscription1
2023-01-03 16:33:58.144 CST [2114] LOG: sync worker [2114] at slot 2 generation 2 detaching cleanly
test2=# select count(*) from t04;
count
-------
1000
(1 row)
test2=# select * from pglogical.show_subscription_table('subscription1','t04');
nspname | relname | status
---------+---------+-------------
public | t04 | replicating
对于双向逻辑复制,在确保两端均没有业务接入的情况下,分别在两端将表添加到复制集中即可。
三、按模式增加普通表
核心函数:pglogical.replication_set_add_all_tables(set_name name, schema_names text[], synchronize_data boolean)
将test模式下的所有表增加到default复制集内
发布端创建test模式,创建测试表test.test1、test.test2、test.test3
postgres=# \c test1 test1
test1=> create schema test;
CREATE SCHEMA
test1=> create table test.test1(id int primary key,name text);
CREATE TABLE
test1=> insert into test.test1 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000
test1=> create table test.test2(id int primary key,name text);
CREATE TABLE
test1=> insert into test.test2 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000
test1=> create table test.test3(id int primary key,name text);
CREATE TABLE
test1=> insert into test.test3 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000
订阅端创建test模式
test2=# \c test2 test2
You are now connected to database "test2" as user "test2".
test2=> create schema test;
CREATE SCHEMA
test2=> create table test.test1(id int primary key,name text);
CREATE TABLE
test2=> create table test.test2(id int primary key,name text);
CREATE TABLE
test2=> create table test.test3(id int primary key,name text);
CREATE TABLE
发布端将test模式下的表加入到default复制集中
test1=# select pglogical.replication_set_add_all_tables(set_name :='default',schema_names := '{test}',synchronize_data := true);
replication_set_add_all_tables
--------------------------------
t
(1 row)
test1=# 2023-01-04 10:32:23.064 CST [1864] LOG: logical decoding found consistent point at 0/1A590B0
2023-01-04 10:32:23.064 CST [1864] DETAIL: There are no running transactions.
2023-01-04 10:32:23.064 CST [1864] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_f52a378d" LOGICAL pglogical_output
2023-01-04 10:32:23.064 CST [1864] LOG: exported logical decoding snapshot: "00000007-0000006B-1" with 0 transaction IDs
2023-01-04 10:32:23.064 CST [1864] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_f52a378d" LOGICAL pglogical_output
2023-01-04 10:32:24.171 CST [1868] LOG: logical decoding found consistent point at 0/1A59148
2023-01-04 10:32:24.171 CST [1868] DETAIL: There are no running transactions.
2023-01-04 10:32:24.171 CST [1868] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_59d23c9f" LOGICAL pglogical_output
2023-01-04 10:32:24.171 CST [1868] LOG: exported logical decoding snapshot: "00000007-00000071-1" with 0 transaction IDs
2023-01-04 10:32:24.171 CST [1868] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_59d23c9f" LOGICAL pglogical_output
2023-01-04 10:32:25.184 CST [1872] LOG: logical decoding found consistent point at 0/1A59180
2023-01-04 10:32:25.184 CST [1872] DETAIL: There are no running transactions.
2023-01-04 10:32:25.184 CST [1872] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_1ca0e353" LOGICAL pglogical_output
2023-01-04 10:32:25.184 CST [1872] LOG: exported logical decoding snapshot: "00000007-00000077-1" with 0 transaction IDs
2023-01-04 10:32:25.184 CST [1872] STATEMENT: CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_1ca0e353" LOGICAL pglogical_output
可以看到三张表已加入到发布
test1=# select * from pglogical.replication_set_table;
set_id | set_reloid | set_att_list | set_row_filter
-----------+------------+--------------+----------------
290045701 | t01 | |
290045701 | test.test1 | |
290045701 | test.test2 | |
290045701 | test.test3 | |
(4 rows)
订阅端的客户端日志
test2=> 2023-01-04 10:32:24.011 CST [1807] LOG: starting sync of table test.test1 for subscriber subscription1
2023-01-04 10:32:25.147 CST [1807] LOG: finished sync of table test.test1 for subscriber subscription1
2023-01-04 10:32:25.147 CST [1807] LOG: sync worker [1807] at slot 2 generation 2 detaching cleanly
2023-01-04 10:32:25.149 CST [1809] LOG: starting sync of table test.test2 for subscriber subscription1
2023-01-04 10:32:26.158 CST [1809] LOG: finished sync of table test.test2 for subscriber subscription1
2023-01-04 10:32:26.158 CST [1809] LOG: sync worker [1809] at slot 2 generation 3 detaching cleanly
2023-01-04 10:32:26.161 CST [1811] LOG: starting sync of table test.test3 for subscriber subscription1
2023-01-04 10:32:27.170 CST [1811] LOG: finished sync of table test.test3 for subscriber subscription1
2023-01-04 10:32:27.170 CST [1811] LOG: sync worker [1811] at slot 2 generation 4 detaching cleanly
订阅端查询订阅状态,单张表的订阅状态
test2=# select pglogical.show_subscription_status('subscription1');
show_subscription_status
--------------------------------------------------------------------------------------------------------------------------------------------
------------------------
(subscription1,replicating,provider1,"host=192.168.164.51 port=5432 dbname=test1",pgl_test2_provider1_subscription1,"{default,default_inser
t_only,ddl_sql}",{all})
(1 row)
test2=# select * from pglogical.show_subscription_status();
subscription_name | status | provider_node | provider_dsn | slot_name |
replication_sets | forward_origins
-------------------+-------------+---------------+--------------------------------------------+-----------------------------------+---------
------------------------------+-----------------
subscription1 | replicating | provider1 | host=192.168.164.51 port=5432 dbname=test1 | pgl_test2_provider1_subscription1 | {default
,default_insert_only,ddl_sql} | {all}
test2=# select subscription_name,status from pglogical.show_subscription_status();
subscription_name | status
-------------------+-------------
subscription1 | replicating
(1 row)
test2=# select * from pglogical.show_subscription_table('subscription1','test.test1');
nspname | relname | status
---------+---------+-------------
test | test1 | replicating
(1 row)
test2=# select * from pglogical.show_subscription_table('subscription1','test.test2');
nspname | relname | status
---------+---------+-------------
test | test2 | replicating
(1 row)
test2=# select * from pglogical.show_subscription_table('subscription1','test.test3');
nspname | relname | status
---------+---------+-------------
test | test3 | replicating
(1 row)
四、删除单个普通表
对于单向逻辑复制,直接在复制集中删除目标表即可
核心函数:pglogical.replication_set_remove_table(set_name name, relation regclass)
查询复制集中的表同复制集的对应关系
test1=# select t.set_reloid,s.set_name from pglogical.replication_set_table t,pglogical.replication_set s where t.set_id=s.set_id;
set_reloid | set_name
------------+----------
t01 | default
t04 | default
(2 rows)
将目标表移出复制集
test1=# select pglogical.replication_set_remove_table(set_name:='default',relation:='t04');
replication_set_remove_table
------------------------------
t
(1 row)
复制集中的表已经没有
test1=# select t.set_reloid,s.set_name from pglogical.replication_set_table t,pglogical.replication_set s where t.set_id=s.set_id;
set_reloid | set_name
------------+----------
t01 | default
(1 row)
发布端再插入1000条测试数据
test1=> insert into t04 select n,'aaaaaa' from generate_series(1001,2000) n;
INSERT 0 1000
test1=> select count(*) from t04;
count
-------
2000
(1 row)
订阅端查询,表的同步已停止
test2=> select count(*) from t04;
count
-------
1000
(1 row)
对于双向逻辑复制,在确保两端均没有业务接入的情况下,分别在两端执行将表从复制集中删除的动作即可,不再进行示例展示
五、按照模式删除表
pglogical现不具备按照模式删除表的功能,本部分不做介绍,待后续版本支持后再做更新。
六、分区表的同步
同pg自带的逻辑复制实现方式不同,pglogical增加分区表需要将每个子表添加到复制集中。
我们重新建库跟用户,发布端用户test3 业务库test3 订阅端用户test4,业务库test4 分区表主表running_log,分区表字表running_log_202211、running_log_202212、running_log_202301
发布端创建分区表并插入测试数据
test1=# \c test3 test3
You are now connected to database "test3" as user "test3".
test3=> create table running_log
test3-> (
test3(> id serial,
test3(> commit_time timestamp(0) without time zone,
test3(> num int,
test3(> executor_name varchar(40),
test3(> primary key (id,commit_time)
test3(> ) PARTITION BY RANGE (commit_time);
CREATE TABLE
test3=>
test3=>
test3=>
test3=> CREATE TABLE running_log_202211 PARTITION OF running_log FOR VALUES FROM ('2022-11-01') TO ('2022-12-01');
CREATE TABLE
test3=> CREATE TABLE running_log_202212 PARTITION OF running_log FOR VALUES FROM ('2022-12-01') TO ('2023-01-01');
CREATE TABLE
test3=> CREATE TABLE running_log_202301 PARTITION OF running_log FOR VALUES FROM ('2023-01-01') TO ('2023-02-01');
CREATE TABLE
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (1, '2022-11-11', 1203,'方言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (2, '2022-11-12', 1178,'方言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (3, '2022-12-12', 1100,'莫言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (4, '2022-12-13', 1202,'莫言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, nume,xecutor_name) VALUES (5, '2023-01-01', 1500,'休言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (6, '2023-01-02', 1815,'休言');
INSERT 0 1
订阅端创建分区表结构
test4=# \c test4 test4
You are now connected to database "test4" as user "test4".
test4=> create table running_log
test4-> (
test4(> id serial,
test4(> commit_time timestamp(0) without time zone,
test4(> num int,
test4(> executor_name varchar(40),
test4(> primary key (id,commit_time)
test4(> ) PARTITION BY RANGE (commit_time);
test4=> CREATE TABLE running_log_202211 PARTITION OF running_log FOR VALUES FROM ('2022-11-01') TO ('2022-12-01');
CREATE TABLE
test4=> CREATE TABLE running_log_202212 PARTITION OF running_log FOR VALUES FROM ('2022-12-01') TO ('2023-01-01');
CREATE TABLE
test4=> CREATE TABLE running_log_202301 PARTITION OF running_log FOR VALUES FROM ('2023-01-01') TO ('2023-02-01');
CREATE TABLE
发布端创建复制集
test3=# select pglogical.create_replication_set('fenqu');
create_replication_set
------------------------
3067052044
(1 row)
将子表加入到复制集
test3=# select pglogical.replication_set_add_table('fenqu','public.running_log_202211');
replication_set_add_table
---------------------------
t
(1 row)
test3=# select pglogical.replication_set_add_table('fenqu','public.running_log_202212');
replication_set_add_table
---------------------------
t
(1 row)
test3=# select pglogical.replication_set_add_table('fenqu','public.running_log_202301');
replication_set_add_table
---------------------------
t
(1 row)
test3=# select * from pglogical.replication_set_table ; 可以看到分区表已插入到子集中
set_id | set_reloid | set_att_list | set_row_filter
------------+--------------------+--------------+----------------
3067052044 | running_log_202211 | |
3067052044 | running_log_202212 | |
3067052044 | running_log_202301 | |
(3 rows)
订阅端创建节点
test4=# select pglogical.create_node(node_name :='subscriber2',dsn :='host=192.168.164.52 port=5432 dbname=test4');
create_node
-------------
770182093
(1 row)
订阅端创建订阅
test4=# SELECT pglogical.create_subscription(subscription_name := 'subscription2',replication_sets:=array['fenqu'],provider_dsn := 'host=192.168.164.51 port=5432 dbname=test3');
create_subscription
---------------------
1871150101
(1 row)
test4=# select count(*) from running_log;
count
-------
6
(1 row)
test4=# select subscription_name,status from pglogical.show_subscription_status();
subscription_name | status
-------------------+-------------
subscription2 | replicating
(1 row)
test4=# select * from pglogical.show_subscription_table('subscription2','running_log_202211');
nspname | relname | status
---------+--------------------+-------------
public | running_log_202211 | replicating
(1 row)
test4=# select * from pglogical.show_subscription_table('subscription2','running_log_202212');
nspname | relname | status
---------+--------------------+-------------
public | running_log_202212 | replicating
(1 row)
^
test4=# select * from pglogical.show_subscription_table('subscription2','running_log_202301');
nspname | relname | status
---------+--------------------+-------------
public | running_log_202301 | replicating
(1 row)
七、重做逻辑复制(刷新少量表数据或重建订阅)
核心函数:pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass)、pglogical.drop_subscription(subscription_name name, ifexists bool) 、pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval)
场景一、逻辑复制未中断,但几张表因人为修改、业务控制等原因导致发布端订阅端数据不一致,想刷新几张表的数据,我们可以使用pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass)函数来实现
订阅端相关表不做处理,直接调用函数刷新.
test4=# select count(*) from running_log;
count
-------
6
(1 row)
test4=# select pglogical.alter_subscription_resynchronize_table('subscription2','running_log_202211');
alter_subscription_resynchronize_table
----------------------------------------
t
(1 row)
test4=# select pglogical.alter_subscription_resynchronize_table('subscription2','running_log_202212');
alter_subscription_resynchronize_table
----------------------------------------
t
(1 row)
test4=# select pglogical.alter_subscription_resynchronize_table('subscription2','running_log_202301');
alter_subscription_resynchronize_table
----------------------------------------
t
(1 row)
test4=# select count(*) from running_log;
count
-------
6
(1 row)
场景二、逻辑复制已中断、且表的量特别多,数据存在较大差异,可使用重建订阅的方式来刷新数据
订阅端删除订阅,清空表数据
test4=# select pglogical.drop_subscription('subscription2');
drop_subscription
-------------------
1
(1 row)
test4=# delete from running_log;
DELETE 6
test4=# select count(*) from running_log;
count
-------
0
(1 row)
可在发布端看到subscription2的复制槽已经自动删除,只剩前面试验的subscription1的复制槽
test3=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmi
n | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
-----------------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+------------
--+-------------+---------------------+------------+---------------+-----------
pgl_test2_provider1_subscription1 | pglogical_output | logical | 16387 | test1 | f | t | 16160 | | 88
1 | 0/1C83F98 | 0/1C83FD0 | reserved | | f
(1 row)
订阅端重建订阅,数据完成重新同步
test4=# SELECT pglogical.create_subscription(subscription_name := 'subscription2',replication_sets:=array['fenqu'],provider_dsn := 'host=192.168.164.51 port=5432 dbname=test3');
create_subscription
---------------------
1871150101
(1 row)
test4=# select count(*) from running_log;
count
-------
6
(1 row)