当前位置: 首页 > 工具软件 > pglogical > 使用案例 >

pglogical常用操作(一)

鲁丰
2023-12-01

瀚高数据库
目录
环境
文档用途
详细信息

环境
系统平台:Linux x86-64 Red Hat Enterprise Linux 7
版本:14
文档用途
本文为pglogical的系列文章,主要介绍pglogical各场景下的常用操作,演示场景接之前的安装部署.

详细信息

一、常用视图介绍
1、节点信息

select * from pglogical.node;

2、查询节点接口信息

select * from pglogical.node_interface;

3、查询序列信息

select * from pglogial.queue;

4、查询复制集明细

select * from pglogical.replication_set;

5、查询复制集序列明细

select * from pglogical.replication_set_seq;

6、查询复制集中的表明细

select * from pglogical.replication_set_table;

7、查询复制序列状态

select * from pglogical.sequence_state;

8、查询订阅端明细

select * from pglogical.subscription;

9、查询可加入逻辑复制的表列表

select * from pglogical.tables;

10、查询订阅的同步状态

select * from pglogical.show_subscription_status(subscription_name name); 

select subscription_name,status from pglogical.show_subscription_status();

select subscription_name,status,provider_dsn from pglogical.show_subscription_status();

11、查询订阅表的同步状态

select * from pglogical.show_subscription_table(subscription_name name, relation regclass) 

注:以下场景均是在pglogical的安装配置场景演示试验,故前置的搭建过程已省略,请接上文连续阅读。

二、增加单个普通表

对于单向逻辑复制,在两端都已存在要同步的表结构的情况下,直接在复制集中增加要同步的表即可

核心函数:pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)

发布端创建测试表t04并插入数据
[postgres@host1 ~]$ psql test1 test1
psql (14.6)
Type "help" for help.


test1=> create table t04(id int primary key,name text);
CREATE TABLE
test1=>  insert into t04 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000
test1=>


订阅端创建同步表结构
[postgres@host2 ~]$ psql test2 test2
psql (14.6)
Type "help" for help.


test2=> create table t04(id int primary key,name text);
CREATE TABLE


发布端将表t04增加到复制集中,可看到相关逻辑复制进程启动过程
test1=> \c test1 postgres
You are now connected to database "test1" as user "postgres".
test1=# select pglogical.replication_set_add_table(set_name:='default',relation:='t04',synchronize_data:=true);
replication_set_add_table
---------------------------
t
(1 row)


test1=# 2023-01-03 16:33:56.738 CST [2252] LOG:  logical decoding found consistent point at 0/198DF50
2023-01-03 16:33:56.738 CST [2252] DETAIL:  There are no running transactions.
2023-01-03 16:33:56.738 CST [2252] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_81966cb4" LOGICAL pglogical_output
2023-01-03 16:33:56.738 CST [2252] LOG:  exported logical decoding snapshot: "00000006-000003AF-1" with 0 transaction IDs
2023-01-03 16:33:56.738 CST [2252] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_81966cb4" LOGICAL pglogical_output


订阅端查询同步完成,并可看到同步进程
test2=# 2023-01-03 16:33:57.137 CST [2114] LOG:  starting sync of table public.t04 for subscriber subscription1
2023-01-03 16:33:58.144 CST [2114] LOG:  finished sync of table public.t04 for subscriber subscription1
2023-01-03 16:33:58.144 CST [2114] LOG:  sync worker [2114] at slot 2 generation 2 detaching cleanly


test2=# select count(*) from t04;
count
-------
 1000
(1 row)


test2=# select * from pglogical.show_subscription_table('subscription1','t04');
nspname | relname |   status    
---------+---------+-------------
public  | t04     | replicating

对于双向逻辑复制,在确保两端均没有业务接入的情况下,分别在两端将表添加到复制集中即可。

三、按模式增加普通表

核心函数:pglogical.replication_set_add_all_tables(set_name name, schema_names text[], synchronize_data boolean)

将test模式下的所有表增加到default复制集内

发布端创建test模式,创建测试表test.test1、test.test2、test.test3
postgres=# \c test1 test1
test1=> create schema test;
CREATE SCHEMA
test1=>  create table test.test1(id int primary key,name text);
CREATE TABLE
test1=>  insert into test.test1 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000
test1=>  create table test.test2(id int primary key,name text);
CREATE TABLE
test1=>  insert into test.test2 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000
test1=>  create table test.test3(id int primary key,name text);
CREATE TABLE
test1=>  insert into test.test3 select n,'aaaaaa' from generate_series(1,1000) n;
INSERT 0 1000


订阅端创建test模式
test2=# \c test2 test2
You are now connected to database "test2" as user "test2".
test2=> create schema test;
CREATE SCHEMA
test2=> create table test.test1(id int primary key,name text);
CREATE TABLE
test2=> create table test.test2(id int primary key,name text);
CREATE TABLE
test2=> create table test.test3(id int primary key,name text);
CREATE TABLE

发布端将test模式下的表加入到default复制集中
test1=# select pglogical.replication_set_add_all_tables(set_name :='default',schema_names := '{test}',synchronize_data := true);
replication_set_add_all_tables
--------------------------------
t
(1 row)

test1=# 2023-01-04 10:32:23.064 CST [1864] LOG:  logical decoding found consistent point at 0/1A590B0
2023-01-04 10:32:23.064 CST [1864] DETAIL:  There are no running transactions.
2023-01-04 10:32:23.064 CST [1864] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_f52a378d" LOGICAL pglogical_output
2023-01-04 10:32:23.064 CST [1864] LOG:  exported logical decoding snapshot: "00000007-0000006B-1" with 0 transaction IDs
2023-01-04 10:32:23.064 CST [1864] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_f52a378d" LOGICAL pglogical_output
2023-01-04 10:32:24.171 CST [1868] LOG:  logical decoding found consistent point at 0/1A59148
2023-01-04 10:32:24.171 CST [1868] DETAIL:  There are no running transactions.
2023-01-04 10:32:24.171 CST [1868] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_59d23c9f" LOGICAL pglogical_output
2023-01-04 10:32:24.171 CST [1868] LOG:  exported logical decoding snapshot: "00000007-00000071-1" with 0 transaction IDs
2023-01-04 10:32:24.171 CST [1868] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_59d23c9f" LOGICAL pglogical_output
2023-01-04 10:32:25.184 CST [1872] LOG:  logical decoding found consistent point at 0/1A59180
2023-01-04 10:32:25.184 CST [1872] DETAIL:  There are no running transactions.
2023-01-04 10:32:25.184 CST [1872] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_1ca0e353" LOGICAL pglogical_output
2023-01-04 10:32:25.184 CST [1872] LOG:  exported logical decoding snapshot: "00000007-00000077-1" with 0 transaction IDs
2023-01-04 10:32:25.184 CST [1872] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_test2_provider1_subscription1_1ca0e353" LOGICAL pglogical_output
可以看到三张表已加入到发布
test1=# select * from pglogical.replication_set_table;
 set_id   | set_reloid | set_att_list | set_row_filter
-----------+------------+--------------+----------------
290045701 | t01        |              |
290045701 | test.test1 |              |
290045701 | test.test2 |              |
290045701 | test.test3 |              |
(4 rows)


订阅端的客户端日志
test2=> 2023-01-04 10:32:24.011 CST [1807] LOG:  starting sync of table test.test1 for subscriber subscription1
2023-01-04 10:32:25.147 CST [1807] LOG:  finished sync of table test.test1 for subscriber subscription1
2023-01-04 10:32:25.147 CST [1807] LOG:  sync worker [1807] at slot 2 generation 2 detaching cleanly
2023-01-04 10:32:25.149 CST [1809] LOG:  starting sync of table test.test2 for subscriber subscription1
2023-01-04 10:32:26.158 CST [1809] LOG:  finished sync of table test.test2 for subscriber subscription1
2023-01-04 10:32:26.158 CST [1809] LOG:  sync worker [1809] at slot 2 generation 3 detaching cleanly
2023-01-04 10:32:26.161 CST [1811] LOG:  starting sync of table test.test3 for subscriber subscription1
2023-01-04 10:32:27.170 CST [1811] LOG:  finished sync of table test.test3 for subscriber subscription1
2023-01-04 10:32:27.170 CST [1811] LOG:  sync worker [1811] at slot 2 generation 4 detaching cleanly



订阅端查询订阅状态,单张表的订阅状态
test2=#  select pglogical.show_subscription_status('subscription1');
                                                                     show_subscription_status                                              
                       
--------------------------------------------------------------------------------------------------------------------------------------------
------------------------
(subscription1,replicating,provider1,"host=192.168.164.51 port=5432 dbname=test1",pgl_test2_provider1_subscription1,"{default,default_inser
t_only,ddl_sql}",{all})
(1 row)
test2=# select * from pglogical.show_subscription_status();
subscription_name |   status    | provider_node |                provider_dsn                |             slot_name             |        
 replication_sets            | forward_origins
-------------------+-------------+---------------+--------------------------------------------+-----------------------------------+---------
------------------------------+-----------------
subscription1     | replicating | provider1     | host=192.168.164.51 port=5432 dbname=test1 | pgl_test2_provider1_subscription1 | {default
,default_insert_only,ddl_sql} | {all}
test2=# select subscription_name,status from pglogical.show_subscription_status();
subscription_name |   status    
-------------------+-------------
subscription1     | replicating
(1 row)


test2=# select * from pglogical.show_subscription_table('subscription1','test.test1');
nspname | relname |   status    
---------+---------+-------------
test    | test1   | replicating
(1 row)

test2=# select * from pglogical.show_subscription_table('subscription1','test.test2');
nspname | relname |   status    
---------+---------+-------------
test    | test2   | replicating
(1 row)

test2=# select * from pglogical.show_subscription_table('subscription1','test.test3');
nspname | relname |   status    
---------+---------+-------------
test    | test3   | replicating
(1 row)

四、删除单个普通表

对于单向逻辑复制,直接在复制集中删除目标表即可

核心函数:pglogical.replication_set_remove_table(set_name name, relation regclass)

查询复制集中的表同复制集的对应关系
test1=# select t.set_reloid,s.set_name from pglogical.replication_set_table t,pglogical.replication_set s where t.set_id=s.set_id;
set_reloid | set_name
------------+----------
t01        | default
t04        | default
(2 rows)


将目标表移出复制集
test1=# select pglogical.replication_set_remove_table(set_name:='default',relation:='t04');
replication_set_remove_table
------------------------------
t
(1 row)

复制集中的表已经没有
test1=# select t.set_reloid,s.set_name from pglogical.replication_set_table t,pglogical.replication_set s where t.set_id=s.set_id;
set_reloid | set_name
------------+----------
t01        | default
(1 row)


发布端再插入1000条测试数据
test1=>  insert into t04 select n,'aaaaaa' from generate_series(1001,2000) n;
INSERT 0 1000
test1=> select count(*) from t04;
count
-------
 2000
(1 row)




订阅端查询,表的同步已停止
test2=> select count(*) from t04;
count
-------
 1000
(1 row)

对于双向逻辑复制,在确保两端均没有业务接入的情况下,分别在两端执行将表从复制集中删除的动作即可,不再进行示例展示

五、按照模式删除表

pglogical现不具备按照模式删除表的功能,本部分不做介绍,待后续版本支持后再做更新。

六、分区表的同步

同pg自带的逻辑复制实现方式不同,pglogical增加分区表需要将每个子表添加到复制集中。

我们重新建库跟用户,发布端用户test3 业务库test3 订阅端用户test4,业务库test4 分区表主表running_log,分区表字表running_log_202211、running_log_202212、running_log_202301

发布端创建分区表并插入测试数据
test1=# \c test3 test3
You are now connected to database "test3" as user "test3".
test3=> create table running_log
test3-> (
test3(>     id       serial,
test3(>     commit_time timestamp(0) without time zone,
test3(>     num  int,
test3(>     executor_name varchar(40),
test3(>     primary key (id,commit_time)
test3(> ) PARTITION BY RANGE (commit_time);
CREATE TABLE
test3=>
test3=>
test3=>
test3=> CREATE TABLE running_log_202211 PARTITION OF running_log FOR VALUES FROM ('2022-11-01') TO ('2022-12-01');
CREATE TABLE
test3=> CREATE TABLE running_log_202212 PARTITION OF running_log FOR VALUES FROM ('2022-12-01') TO ('2023-01-01');
CREATE TABLE
test3=> CREATE TABLE running_log_202301 PARTITION OF running_log FOR VALUES FROM ('2023-01-01') TO ('2023-02-01');
CREATE TABLE
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (1, '2022-11-11', 1203,'方言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (2, '2022-11-12', 1178,'方言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (3, '2022-12-12', 1100,'莫言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (4, '2022-12-13', 1202,'莫言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, nume,xecutor_name) VALUES (5, '2023-01-01', 1500,'休言');
INSERT 0 1
test3=> INSERT INTO running_log(id, commit_time, num,executor_name) VALUES (6, '2023-01-02', 1815,'休言');
INSERT 0 1

 
订阅端创建分区表结构      
test4=# \c test4 test4
You are now connected to database "test4" as user "test4".
test4=> create table running_log
test4-> (
test4(>     id       serial,
test4(>     commit_time timestamp(0) without time zone,
test4(>     num  int,
test4(>     executor_name varchar(40),
test4(>     primary key (id,commit_time)
test4(> ) PARTITION BY RANGE (commit_time);
test4=> CREATE TABLE running_log_202211 PARTITION OF running_log FOR VALUES FROM ('2022-11-01') TO ('2022-12-01');
CREATE TABLE
test4=> CREATE TABLE running_log_202212 PARTITION OF running_log FOR VALUES FROM ('2022-12-01') TO ('2023-01-01');
CREATE TABLE
test4=> CREATE TABLE running_log_202301 PARTITION OF running_log FOR VALUES FROM ('2023-01-01') TO ('2023-02-01');
CREATE TABLE
     


发布端创建复制集

test3=# select pglogical.create_replication_set('fenqu');
create_replication_set
------------------------
            3067052044
(1 row)
     
将子表加入到复制集
test3=# select pglogical.replication_set_add_table('fenqu','public.running_log_202211');
replication_set_add_table
---------------------------
t
(1 row)
test3=# select pglogical.replication_set_add_table('fenqu','public.running_log_202212');
replication_set_add_table
---------------------------
t
(1 row)
test3=# select pglogical.replication_set_add_table('fenqu','public.running_log_202301');
replication_set_add_table
---------------------------
t
(1 row)
test3=# select * from pglogical.replication_set_table ;   可以看到分区表已插入到子集中
  set_id   |     set_reloid     | set_att_list | set_row_filter
------------+--------------------+--------------+----------------
3067052044 | running_log_202211 |              |
3067052044 | running_log_202212 |              |
3067052044 | running_log_202301 |              |
(3 rows)


订阅端创建节点
test4=# select pglogical.create_node(node_name :='subscriber2',dsn :='host=192.168.164.52 port=5432 dbname=test4');
create_node
-------------
  770182093
(1 row)

订阅端创建订阅
test4=# SELECT pglogical.create_subscription(subscription_name := 'subscription2',replication_sets:=array['fenqu'],provider_dsn := 'host=192.168.164.51 port=5432 dbname=test3');
create_subscription
---------------------
         1871150101
(1 row)

test4=# select count(*) from running_log;
count
-------
    6
(1 row)


test4=# select subscription_name,status from pglogical.show_subscription_status();
subscription_name |   status    
-------------------+-------------
subscription2     | replicating
(1 row)

test4=#  select * from pglogical.show_subscription_table('subscription2','running_log_202211');
nspname |      relname       |   status    
---------+--------------------+-------------
public  | running_log_202211 | replicating
(1 row)

test4=#  select * from pglogical.show_subscription_table('subscription2','running_log_202212');
nspname |      relname       |   status    
---------+--------------------+-------------
public  | running_log_202212 | replicating
(1 row)
                                                            ^
test4=#  select * from pglogical.show_subscription_table('subscription2','running_log_202301');
nspname |      relname       |   status    
---------+--------------------+-------------
public  | running_log_202301 | replicating
(1 row)

七、重做逻辑复制(刷新少量表数据或重建订阅)

核心函数:pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass)、pglogical.drop_subscription(subscription_name name, ifexists bool) 、pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval)

场景一、逻辑复制未中断,但几张表因人为修改、业务控制等原因导致发布端订阅端数据不一致,想刷新几张表的数据,我们可以使用pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass)函数来实现

订阅端相关表不做处理,直接调用函数刷新.

test4=# select count(*) from running_log;

count
-------
    6
(1 row)


test4=# select pglogical.alter_subscription_resynchronize_table('subscription2','running_log_202211');
alter_subscription_resynchronize_table
----------------------------------------
t
(1 row)


test4=# select pglogical.alter_subscription_resynchronize_table('subscription2','running_log_202212');
alter_subscription_resynchronize_table
----------------------------------------
t
(1 row)


test4=# select pglogical.alter_subscription_resynchronize_table('subscription2','running_log_202301');
alter_subscription_resynchronize_table
----------------------------------------
t
(1 row)


test4=# select count(*) from  running_log;
count
-------
    6
(1 row)

场景二、逻辑复制已中断、且表的量特别多,数据存在较大差异,可使用重建订阅的方式来刷新数据

订阅端删除订阅,清空表数据
test4=# select pglogical.drop_subscription('subscription2');
drop_subscription
-------------------
                1
(1 row)

test4=# delete from running_log;
DELETE 6
test4=# select count(*) from running_log;
count
-------
    0
(1 row)

可在发布端看到subscription2的复制槽已经自动删除,只剩前面试验的subscription1的复制槽
test3=# select * from pg_replication_slots ;
            slot_name             |      plugin      | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmi
n | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
-----------------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+------------
--+-------------+---------------------+------------+---------------+-----------
pgl_test2_provider1_subscription1 | pglogical_output | logical   |  16387 | test1    | f         | t      |      16160 |      |          88
1 | 0/1C83F98   | 0/1C83FD0           | reserved   |               | f
(1 row)


订阅端重建订阅,数据完成重新同步
test4=# SELECT pglogical.create_subscription(subscription_name := 'subscription2',replication_sets:=array['fenqu'],provider_dsn := 'host=192.168.164.51 port=5432 dbname=test3');
create_subscription
---------------------
         1871150101
(1 row)

test4=# select count(*) from running_log;
count
-------
    6
(1 row)
 类似资料: