0%

基于PG自动创建分区表的一种实现

PostgreSQL 是一个现代化的数据库,开源,免费试用,社区活跃,和Oracle具有同等强大的功能。作为一款高度可扩展的企业级关系型数据库管理系统,其内置的分区表功能在处理大规模数据场景中扮演着重要角色。尤其是自动表分区技术,通过自动化管理分区,极大地简化了分区表的创建和维护过程,提高了数据库的性能和管理效率。支持JSON格式和自定义类型,支持并行查询,聚合函数、窗口函数和数据透视表非常适合作为统计数据的数据源。

合理规划数据是作为数据库使用的重要手段。目前Filscan作为首款后端使用PostgreSQL数据库的项目,目前已经支撑了亿级以上的Filecoin庞大数据量,后续随着Filecoin链继续发展,数据还将继续膨胀。

Filecoin的状态数据的数量非常庞大,如果在一张表内保存,会非常影响查询效率。为达到保存一段时间内的数据,且可以根据时间范围伸缩,提高查询效率,需要对巨量数据表进行表分区。让数据库按需自动的创建新的表来存放按照规则划分的数据,是可以提高自动化维护表程度,减少人为的干扰和出错的可能。

表分区方案

表分区方案是一直存在的,有三种方式:

预先创建一批分区表

这种方式最大的弊端是需要定期的创建一批新分区表以适应新数据范围分区,其次分区表不能按实际数据区间创建分区表,所有的数据范围都必须紧密存在,才不会漏存对应的数据到分区中。定期创建分区是容易被忽视的一个环节,一旦忘记定期创建,尤其是定期周期较长,就会导致数据丢失。如果有默认分区存在,新表记录会被保存到默认分区内,对后续可持续维护带来了额外的停机时间开销。

使用程序对表进行分区

这种方式没有问题,不过需要做一些额外适配工作,以适应不同的分区表的动态创建,分区表逻辑的更新依赖程序的更新。而基于数据库实现自动表分区,只需要即时更新存储过程或函数就可立即生效,也能够享受数据库自身的特性带来的便捷性,减少开发周期,降低维护成本。

第三方插件

pg_pathman 是一个PostgreSQL数据库的第三方插件。该插件的好处是封装实现了一套维护分区表的完整操作。阿里云RDS PostgreSQL版本支持安装该插件,下面是阿里云RDS各个PostgreSQL版本所支持该插件的情况。

插件名 14 13 12 11 10 9.4 描述
pg_pathman 1.5 1.5 1.5 1.5 高性能分区表插件

https://help.aliyun.com/document_detail/142340.html

这里有关于该插件的全部用法:https://help.aliyun.com/document_detail/140900.html

但是该插件官网做了以下提示:

注意:该项目不再处于开发阶段

pg_pathman支持 Postgres 版本 [9.5..13],但很可能不会移植到 14 及更高版本。声明式分区现在已经相当成熟,几乎所有的东西都在pg_pathman; 我们鼓励用户切换到它。我们仍在维护该项目(修复受支持版本中的错误),但这里不会发生新的开发。

这个项目官方预计停止后续的支持,目前处于维护阶段,提示官方声明式分区版本已经做得足够好,可以不需要依赖第三方插件。从这点上说,选择声明式分区无疑是最优的选择,不会因为服务器版本升级而需要额外升级依赖第三方实现。

声明式分区(DDL-Partition)

PostgreSQL提供了一种方法指定如何把一个表划分成称为分区的片段。被划分的表被称作分区表

表分区是解决一些因单表过大引用的性能问题的方式,比如某张表过大就会造成查询变慢,可能分区是一种解决方案。一般建议当单表大小超过内存就可以考虑表分区了。

PostgreSQL表分区有两种实现方式,声明式分区和表继承。它们各自有优缺点,比如,对声明式分区来说,分区必须具有和分区表正好相同的列集合,而在表继承中,子表可以有父表中没有出现过的额外列。除此之外,声明式分区也有着优秀的特点,可以减轻对分区表的维护代价、提供了操作的便捷性和查询性能,使用表继承将要做更多的工作才能达到同样的效果。

特点

默认分区

PosgtreSQL 11开始,支持为声明式分区表创建一个默认(DEFAULT)的分区,用于存储无法匹配其他任何分区的数据。只有 RANGE 分区表和 LIST 分区表需要默认分区。创建默认分区时使用 DEFAULT 子句替代 FOR VALUES 子句。

1
2
3
4
5
6
7
8
9
CREATE TABLE example_table(
epoch BIGINT NOT NULL ,
data text
) PARTITION BY RANGE (epoch);

CREATE TABLE example_table_1_100 PARTITION OF example_table FOR VALUES FROM (MINVALUE) TO (101);
CREATE TABLE example_table_101_200 PARTITION OF example_table FOR VALUES FROM (101) TO (201);
-- 创建默认分区时使用 DEFAULT 子句替代 FOR VALUES 子句。
CREATE TABLE example_table_default PARTITION OF example_table DEFAULT ;

如果没有创建默认分区,在插入epoch=201时,会报错,提示插入错误。

1
2
3
-- [23514] 错误: 没有为行找到关系"example_table"的分区
-- 详细:失败行的分区键包含(epoch) = (201).
INSERT INTO example_table(epoch, data) VALUES (201, 'good');

默认分区存在以下限制:

  • 一个分区表只能拥有一个 DEFAULT 分区;
  • 对于已经存储在 DEFAULT 分区中的数据,不能再创建相应的分区;参见下文示例;
  • 如果将已有的表挂载为 DEFAULT 分区,将会检查该表中的所有数据;如果在已有的分区中存在相同的数据,将会产生一个错误;
  • 哈希分区表不支持 DEFAULT 分区,实际上也不需要支持。

以列表分区为例:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE example_table_list(
miner varchar(100) NOT NULL ,
epoch BIGINT NOT NULL ,
data varchar(50)
) PARTITION BY LIST (miner);

CREATE TABLE example_table_list_02438 PARTITION OF example_table_list FOR VALUES IN ('f02438');
CREATE TABLE example_table_list_025002_025005 PARTITION OF example_table_list FOR VALUES IN ('f025002', 'f025005');
CREATE TABLE example_table_list_default PARTITION OF example_table_list DEFAULT ;

INSERT INTO example_table_list VALUES ('f03721', 98, 'Green');

如果此时未创建对f03721的分区表,上面的语句会插入到example_table_list_default表内。如果此时开始创建

1
CREATE TABLE example_table_list_03721 PARTITION OF example_table_list FOR VALUES IN ('f03721');

操作将会失败。因为添加新的分区需要修改默认分区的范围。但是默认分区中已经存在f02731的分区数据项。

错误: 某些行将违反默认分区”example_table_list_default”的更新分区约束

为了解决这个问题,可以先将默认分区从分区表中卸载(DETACH PARTITION),创建新的分区,将默认分区中的相应的数据移动到新的分区,最后重新挂载默认分区。

1
2
3
4
5
6
7
8
9
10
11
-- 临时卸载默认分区
ALTER TABLE example_table_list DETACH PARTITION example_table_list_default;
-- 创建一个新的列表分区表
CREATE TABLE example_table_list_03721 PARTITION OF example_table_list FOR VALUES IN ('f03721');
-- 查询默认分区表内的对应数据并插入到新分区表内
INSERT INTO example_table_list_03721
SELECT * FROM example_table_list_default WHERE miner = 'f03721';
-- 删除默认分区表内的数据
DELETE FROM example_table_list_default WHERE miner = 'f03721';
-- 重新挂载默认分区表
ALTER TABLE example_table_list ATTACH PARTITION example_table_list_default DEFAULT ;

分区自动索引

在 PostgreSQL10 中,分区上的索引需要基于各个分区手动创建,而不能基于分区的父表创建索引。PostgreSQL 11 及以后的版本,声明式分区表可以基于分区表创建索引。分区表上的索引并不会创建一个物理上的索引,而是为每个分区上的索引创建一个模板。表继承方式的分区表需要为每一张分区表创建索引和约束。

如果在声明式分区表上创建了一个索引,PostgreSQL 自动为每个分区创建具有相同属性的索引。

1
2
3
4
5
-- 在主表上创建索引,会在分表上自动创建对应的索引。
-- 在创建新的分区表时,也会自动创建好对应的索引。
CREATE UNIQUE INDEX example_table_epoch ON example_table(epoch DESC);
-- 在主表上执行删除索引时,也会自动在分区表上删除对应的索引。
DROP INDEX example_table_epoch;
  • 自动创建的索引,名称按照 “{partition name}{column name}_idx” 的模式定义。多个字段的复合索引使用下划线()连接字段名称。如果索引名称已经存在,在名称的最后添加一个数字。如果名称过长,使用缩写。

  • 随后新增的分区或者通过 ATTACH PARTITION 挂载的分区都会自动创建相应的索引。

  • 自动创建的索引不能单独删除,可以通过分区表统一删除。

跨分区移动数据

PostgreSQL 10 中,声明式分区表如果 UPDATE 语句修改了分区字段的值,导致数据需要移动到其他分区时,语句将会失败。PostgreSQL 11+ 以后能够正确处理更新分区字段的操作。根据提交记录,这种 UPDATE 语句实际上分为两步执行:从旧的分区中 DELETE相应记录,在新的分区中 INSERT相应记录。另外,跨分区移动数据的 UPDATE 语句将会导致触发器的执行顺序更加复杂。

限制

分区表有下列限制:

  • 没有办法创建跨越所有分区的排除约束,只可能单个约束每个叶子分区。
  • 分区表上的惟一约束(也就是主键)必须包括所有分区键列。存在此限制是因为PostgreSQL只能每个分区中分别强制实施唯一性。
  • BEFORE ROW 触发器无法更改哪个分区是新行的最终目标。
  • 不允许在同一个分区树中混杂临时关系和持久关系。因此,如果分区表是持久的,则其分区也必须是持久的,反之亦然。在使用临时关系时,分区数的所有成员都必须来自于同一个会话。

执行流程

厘清执行流程有助于从宏观上理解在哪个步骤中适合实现自动创建分区表。从数据库提供的机制上,有存储过程和触发器两种武器,分别可以实现在不同阶段处理不同拦截逻辑。

对于声明式分区表,所有被插入的行将被基于分区键的值,路由到分区中。每个分区都有一个由其分区边界定义的数据子集。当前支持的分区方法是范围、列表以及哈希。也就是说分区表的插入/更新操作,又一个内置的路由规则。

根据定义,结合对表的插入/更新操作以及触发器流程,一条数据在插入到分区表时,将经历以下步骤:

  1. 触发分区主表的statement before insert
  2. 自动路由到符合范围的分区子表
  3. 触发符合范围的分区子表row before insert
  4. 数据插入
  5. 触发符合范围的分区子表row after insert
  6. 触发分区主表的statement after insert

下面验证一下上述流程。

触发器行为

创建分区表

举个例子,创建一张测试分区表,并创建三个分区,不创建默认分区。由于默认分区的限制,对于已经存储在 DEFAULT 分区中的数据,不能再创建相应的分区,需要先DETACH默认分区手动做数据转移,然后才可以创建新范围分区,这个步骤无法实现自动创建。

1
2
3
4
5
DROP TABLE example_table CASCADE ;
CREATE TABLE example_table(col1 INT) PARTITION BY RANGE (col1);
CREATE TABLE example_table_1 PARTITION OF example_table FOR VALUES FROM (0) TO (10);
CREATE TABLE example_table_2 PARTITION OF example_table FOR VALUES FROM (11) TO (20);
CREATE TABLE example_table_3 PARTITION OF example_table FOR VALUES FROM (21) TO (30);

创建触发器函数

创建一个用于观察触发器触发阶段的函数

1
2
3
4
5
6
7
8
CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS
$$
BEGIN
RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%',
tg_name, tg_table_name, tg_level, tg_op, tg_when, new;
RETURN new;
END
$$ LANGUAGE plpgsql;

创建触发器

在分区表主表example_table上绑定测试分区表INSERT和UPDATE触发器,分别是语句级别和行级别,与before 和after的组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
CREATE TRIGGER show_notice_before_insert_stmt
BEFORE INSERT
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_insert_stmt
AFTER INSERT
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_before_insert_row
BEFORE INSERT
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_insert_row
AFTER INSERT
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();

CREATE TRIGGER show_notice_before_update_stmt
BEFORE UPDATE
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_update_stmt
AFTER UPDATE
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_before_update_row
BEFORE UPDATE
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_update_row
AFTER UPDATE
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();

通过观察表结构定义我们可以看到,主表上创建出8个触发器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+---------+--------------+-------------
col1 | integer | | | | plain | |
Partition key: RANGE (col1)
Triggers:
show_notice_after_insert_row AFTER INSERT ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_after_insert_stmt AFTER INSERT ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
show_notice_after_update_row AFTER UPDATE ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_after_update_stmt AFTER UPDATE ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
show_notice_before_insert_row BEFORE INSERT ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_before_insert_stmt BEFORE INSERT ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
show_notice_before_update_row BEFORE UPDATE ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_before_update_stmt BEFORE UPDATE ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
Partitions: example_table_1 FOR VALUES FROM (0) TO (10),
example_table_2 FOR VALUES FROM (11) TO (20),
example_table_3 FOR VALUES FROM (21) TO (30)

分区表上也自动创建出了4个行级触发器。

1
2
3
4
5
6
7
8
9
10
11
example=# \d example_table_1
Table "public.example_table_1"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
col1 | integer | | |
Partition of: example_table FOR VALUES FROM (0) TO (10)
Triggers:
show_notice_after_insert_row AFTER INSERT ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table
show_notice_after_update_row AFTER UPDATE ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table
show_notice_before_insert_row BEFORE INSERT ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table
show_notice_before_update_row BEFORE UPDATE ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table

分区表主表类型为(partitioned table),分区表类型为(table)。

  • 在主表上创建触发器,会为每一个子表创建触发器。
  • FOR EACH STATEMENT 只会在主表上创建
  • FOR EACH ROW 才会为每一个子表创建(如下查询结果)。

插入一条数据

1
2
3
4
5
6
7
8
INSERT INTO example_table VALUES (3);

-- Output
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>
[00000] trigger name:show_notice_before_insert_row; table_name:example_table_1; level:ROW; op:INSERT; when:BEFORE; value:(3)
[00000] trigger name:show_notice_after_insert_row; table_name:example_table_1; level:ROW; op:INSERT; when:AFTER; value:(3)
[00000] trigger name:show_notice_after_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:AFTER; value:<NULL>
1 row affected in 60 ms

分区表INSERT触发器的执行顺序:

  1. 主表example_table statement 级别 INSERT BEFOER触发器
  2. 主表example_table statement 级别 INSERT AFTER 触发器
  3. 分区表example_table_1 行级别 INSERT BEFORE 触发器
  4. 分区表example_table_1 行级别 INSERT AFTER 触发器

插入不存在分区的数据

1
2
3
4
5
6
INSERT INTO example_table VALUES (32);

-- Output
[23514] 错误: 没有为行找到关系"example_table"的分区
详细:失败行的分区键包含(col1) = (32).
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>

插入一条不存在分区的数据时,会导致插入错误,提示没找到分区。分区表INSERT触发器的执行顺序:

  1. 主表example_table statement 级别 INSERT BEFORE 触发器
  2. 主表example_table statement 级别 INSERT AFTER 触发器

修改触发器函数

修改触发器函数notice_trigger使它拥有创建表的能力。以下代码仅供测试,认为新插入数据时为不存在分区范围的数据,且分区列数值在31-40之间。并定义触发器在行级、插入前触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS
$$
BEGIN
RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%',
tg_name, tg_table_name, tg_level, tg_op, tg_when, new;
IF tg_level = 'ROW' AND tg_op = 'INSERT' AND tg_when = 'BEFORE' THEN
CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31) TO (40);
INSERT INTO example_table_4 SELECT new.*;
RETURN NULL;
END IF;

RETURN new;
END
$$ LANGUAGE plpgsql;

再次执行数据插入

1
2
3
4
5
6
INSERT INTO example_table VALUES (32);

-- Output
[23514] 错误: 没有为行找到关系"example_table"的分区
详细:失败行的分区键包含(col1) = (32).
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>

主表上由于分区表的默认插入行为,并没有触发行级触发器,并由于自动路由,错误提示找不到关系分区报错退出。

再次修改触发器函数

这次计划通过语句级别INSERT BEFORE触发器来创建新分区表。

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS
$$
BEGIN
RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%',
tg_name, tg_table_name, tg_level, tg_op, tg_when, new;
IF tg_level = 'STATEMENT' AND tg_op = 'INSERT' AND tg_when = 'BEFORE' THEN
CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31) TO (40);
RETURN NULL;
END IF;

RETURN new;
END
$$ LANGUAGE plpgsql;

再次尝试插入

1
2
3
4
5
6
7
INSERT INTO example_table VALUES (32);

-- Output
[55006] 错误: 无法CREATE TABLE .. PARTITION OF "example_table" 因为它正在被这个会话中的活动查询使用
在位置:SQL 语句 "CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31) TO (40)"
SQL语句的第6行的PL/pgSQL函数notice_trigger()
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>

从上得知,BEFORE ROW 触发器无法更改哪个分区是新行的最终目标。

自动路由流程

从以上动作得知:

  • 分区表主表只会触发语句级别的触发器
  • 分区表只会触发行级触发器
  • 数据库会对插入和更新动作自动做路由规则处理

自动路由处理流程

整个插入过程在一个事务内,在插入开始期间,会收集整个分区表信息;声明式分区表有一个内置的路由行为,该行为发生在语句级别before触发器之后。它会根据分区列将实际的INSERT/UPDATE动作定向到分区中进行插入和更新。此时主分区表不会触发它的行级触发器,而是触发分区表中的行级触发器,且不会触发分区表的语句级触发器。无法在语句级别动态创建新分区,到达自动路由阶段,对不存在分区的插入会导致插入失败,不再执行后续动作。

规则系统

考虑到高可维护性,简便易行和最大程度使用数据库的特点,最终考虑使用声明式分区来实现自动表分区。

既然无法使用触发器达到在插入数据时,自动创建不存在范围的分区表的行为,有没有办法不通过触发器,而通过数据库自身的特性达到同样的目的呢?

答案是有的。经过调研,PostgreSQL数据库支持重写规则(RULE)。

用法

1
2
3
4
5
6
7
CREATE [ OR REPLACE ] RULE name AS ON event
TO table_name [ WHERE condition ]
DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }

其中 event 可以是以下之一:

SELECT | INSERT | UPDATE | DELETE

PostgreSQL规则系统允许我们定义 针对数据库表中插入、更新或者删除动作上的替代动作。大约来说,当在 一个给定表上执行给定命令时,一条规则会导致执行额外的命令。或者, INSTEAD规则可以用另一个命令替换给定的命令,或者导致一个命令根本不被执行。

使用规则来改变插入的流程,绕开分区表触发器的执行顺序限制,直接插入到目标表内。

原理是使用规则重定向插入,将INSERT/UPDATE操作替换成用函数来插入。函数插入时判断是否报错了,如果报错,新增表。

设计流程

在分区表主表上创建两个规则,分别是INSERT和 UPDATE,替换数据库向主表默认插入和更新操作。此时的触发器和流程从下图左边变成了右边的流程,规则接管默认插入机制,在插入数据前做一层条件判断,达到自动创建分区的目的。由于规则系统仅支持INSERT/UPDATE/SELECT/DELETE操作,因此需要编写一个函数来托管整个过程。

重写规则

在这个大框架下还需要解决自动化创建表时一致性行为,以及接管一些数据库默认插入/更新时的功能:

  • 表分区名称约定
  • 结合Filecoin业务,比如根据区块高度对数据分区,对高度计算得到分区表后缀
  • 获取分区表名称是否存在
  • 唯一键冲突
  • 分区移动

函数(存储过程)

表分区名称约定

表名称约定

通常,对表的数据进行分区,对数据行记录时间进行划分。以此为基础便于以时间范围查询,因此分区表将设置成主表+后缀名的方式命名。后缀名定义以下4种按时间划分的分区规则:

  1. 按天分区 (分区表命名规则:table_name_2022_02_18_1560240_1563119)
  2. 按周分区 (分区表命名规则:table_name_2022_w7_1548720_1568879)
  3. 按月分区 (分区表命名规则:table_name_2022_02_1560240_1563119)
  4. 按年分区 (分区表命名规则:table_name_2022_1560240_1563119)

以上分区规则统一按照北京时间为基准,以一天的零点作为开始,23点59分59秒作为结束。

按天分区

分区表命名规则为 主表____起始epoch_终止epoch

其中月份为两位数,不足两位前缀补0。

按周分区

分区表命名规则为 主表__w全年周数_起始epoch_终止epoch

按月分区

分区表命名规则为 主表___起始epoch_终止epoch

其中月份为两位数,不足两位前缀补0。

按年分区

分区表命名规则为 主表__起始epoch_终止epoch

以下内容为实现分区表命名规则而设立的函数,执行函数可以计算得到上述规则后缀和分区范围。

分区范围类型定义

定义数据库的类型,拥有以下6个字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
DROP TYPE IF EXISTS PARTITION_TABLE_RANGE_DEFINITION CASCADE ;
-- 创建统一的分区规则类型,定义后缀字段、触发查询时的日期、
-- 分区的起始时间(起始epoch)和终止时间(终止epoch)
-- 终止epoch与终止时间不是一一对应,终止epoch通常是23:59:30。链的高度每30秒变更一次。
CREATE TYPE PARTITION_TABLE_RANGE_DEFINITION AS
(
suffix VARCHAR,
hit_date DATE,
begin_epoch BIGINT,
begin_time TIMESTAMPTZ,
end_epoch BIGINT,
end_time TIMESTAMPTZ
);
分区范围字段 说明
suffix 分区表的后缀名称,配合主表名+后缀定义出分区表名称
hit_date 查询分区范围定义的日期
begin_epoch 分区表开始高度(Filecoin epoch, 下同)
begin_time 分区表开始时间(参考,分区以begin_epoch为主,下同)
end_epoch 分区表结束高度
end_time 分区表结束时间

分区范围实现

注意!分区表FROM (MINVALUE) TO (101) 是前闭后开。也就是插入的分区列应满足 epoch ∈[MINVALUE, 101)

1
2
3
4
5
6
7
CREATE TABLE test_part_table(col1 INT) PARTITION BY RANGE (col1);
CREATE TABLE test_part_table_1 PARTITION OF test_part_table FOR VALUES FROM (0) TO (10);
CREATE TABLE test_part_table_2 PARTITION OF test_part_table FOR VALUES FROM (10) TO (20);
INSERT INTO test_part_table SELECT i FROM generate_series(0, 10) g(i);

-- 插入到 test_part_table_1 的值 从 0-9;
-- 插入到 test_part_table_2 的值 为10.

按天分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
-- 按日分区,返回的分区表的表后缀规则为 '年_月_日'_开始epoch_结束epoch 
-- (例如:table_name_2022_02_18_1560240_1563119)
-- 时间范围 一天的的00:00:00 ~ 一天的23:59:59
-- 高度范围 一天日期的00:00:00 ~ 一天日期的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_day(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
"year" INT = extract(YEAR FROM current_ts);
"month" INT = extract(MONTHS FROM current_ts);
"day" INT = extract(DAY FROM current_ts);
begin_time TIMESTAMPTZ = date_trunc('day', current_ts);
begin_epoch BIGINT = calc_epoch_by_ts(begin_time);
day_interval INTERVAL = INTERVAL '1 DAY' - INTERVAL '1 SECOND';
end_time TIMESTAMPTZ = begin_time + day_interval;
end_epoch BIGINT = calc_epoch_by_ts(end_time + INTERVAL '1 SECOND');
mm_month VARCHAR = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
begin_time = startup_time;
begin_epoch = 0;
"year" = extract(YEAR FROM begin_time);
"month" = extract(MONTHS FROM begin_time);
"day" = extract(DAY FROM begin_time);
end_time = date_trunc('day', begin_time) + day_interval;
end_epoch = calc_epoch_by_ts(end_time + INTERVAL '1 SECOND');
mm_month = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END);
END IF;
RAISE NOTICE '[calc_partition_range_by_day]current_ts: %', current_ts;
RETURN QUERY SELECT concat_ws('_', "year", mm_month, "day", begin_epoch, end_epoch)::VARCHAR,
current_ts::DATE,
begin_epoch,
begin_time,
end_epoch,
end_time;
END;
$$ LANGUAGE plpgsql;

按周分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-- 按周分区,返回的分区表的表后缀规则为 '年份'_w'第n周'_开始epoch_结束epoch 
-- (例如:table_name_2022_w7_1548720_1568879)
-- 时间范围 周一的00:00:00 ~ 周日的23:59:59
-- 高度范围 周一日期的00:00:00 ~ 周日的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_week_of_year(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
week_of_year INT = extract(WEEK FROM current_ts);
iso_year INT = extract(ISOYEAR FROM current_ts);
begin_of_week TIMESTAMPTZ = date_trunc('WEEK', current_ts);
begin_epoch BIGINT = calc_epoch_by_ts(begin_of_week);
week_interval INTERVAL = INTERVAL '7 DAYS';
end_of_week TIMESTAMPTZ = begin_of_week + week_interval;
end_epoch BIGINT = calc_epoch_by_ts(end_of_week);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
begin_of_week = startup_time;
begin_epoch = 0;
week_of_year = extract(WEEK FROM begin_of_week);
iso_year = extract(ISOYEAR FROM begin_of_week);
end_of_week = date_trunc('WEEK', begin_of_week) + week_interval;
end_epoch = calc_epoch_by_ts(end_of_week);
END IF;
RETURN QUERY SELECT (concat_ws('_', iso_year, concat('w', week_of_year), begin_epoch, end_epoch))::VARCHAR,
current_ts::DATE,
begin_epoch,
begin_of_week,
end_epoch,
end_of_week;
END
$$ LANGUAGE plpgsql;

按月分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- 按月分区,返回的分区表的表后缀规则为 '年_月'_开始epoch_结束epoch 
-- (例如:table_name_2022_02_1560240_1563119)
-- 时间范围 每个月的第一天的的00:00:00 ~ 每个月的最后一天的23:59:59
-- 高度范围 每个月的第一天日期的00:00:00 ~ 每个月的最后一天日期的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_month(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
begin_time TIMESTAMPTZ = date_trunc('month', current_ts);
"year" INT = extract(YEAR FROM begin_time);
"month" INT = extract(MONTHS FROM begin_time);
begin_epoch BIGINT = calc_epoch_by_ts(begin_time);
month_interval INTERVAL = INTERVAL '1 month';
end_time TIMESTAMPTZ = begin_time + month_interval;
end_epoch BIGINT = calc_epoch_by_ts(end_time);
mm_month VARCHAR = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
begin_time = startup_time;
begin_epoch = 0;
"year" = extract(YEAR FROM begin_time);
"month" = extract(MONTHS FROM begin_time);
end_time = date_trunc('month', begin_time) + month_interval;
end_epoch = calc_epoch_by_ts(end_time);
mm_month = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END)::varchar;
END IF;
RETURN QUERY SELECT (concat_ws('_', "year", mm_month, begin_epoch, end_epoch))::VARCHAR,
current_ts::DATE,
begin_epoch,
begin_time,
end_epoch,
end_time;
END;
$$ LANGUAGE plpgsql;

按年分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
-- 按年分区,返回的分区表的表后缀规则为 '年'_开始epoch_结束epoch 
-- (例如:table_name_2022_1560240_1563119)
-- 时间范围 每年的第一天的的00:00:00 ~ 每年的最后一天的23:59:59
-- 高度范围 每年的第一天日期的00:00:00 ~ 每年的最后一天日期的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_year(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
first_day_of_year TIMESTAMPTZ = date_trunc('year', current_ts);
year_interval INTERVAL = INTERVAL '1 year';
last_day_of_year TIMESTAMPTZ = first_day_of_year + year_interval;
begin_epoch BIGINT = calc_epoch_by_ts(first_day_of_year);
end_epoch BIGINT = calc_epoch_by_ts(last_day_of_year);
"year" INT = extract(YEAR FROM first_day_of_year);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
first_day_of_year = startup_time;
begin_epoch = 0;
last_day_of_year = date_trunc('year', first_day_of_year) + year_interval;
end_epoch = calc_epoch_by_ts(last_day_of_year);
"year" = extract(YEAR FROM first_day_of_year);
END IF;
RETURN QUERY SELECT (concat_ws('_', "year", begin_epoch, end_epoch))::VARCHAR,
current_ts::DATE,
begin_epoch,
first_day_of_year,
end_epoch,
last_day_of_year;
END;
$$ LANGUAGE plpgsql;

区块高度与时间

2020-08-25T06:00:00+08:00 是Filecoin 上线时间,每30秒一个纪元(高度)。通过与上线时间与高度换算的相对秒数,求得高度对应的时间。反之亦然。其计算方式如下:

高度转换时间

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 根据当前时间计算出对应的FileCoin高度,如果不传高度,默认高度为0,即链的起始高度。
CREATE OR REPLACE FUNCTION calc_timestamp_by_epoch(height BIGINT = 0) RETURNS TIMESTAMP WITH TIME ZONE
IMMUTABLE STRICT
LANGUAGE plpgsql
AS
$$
DECLARE
baseEpoch INTEGER = extract(EPOCH FROM TIMESTAMPTZ '2020-08-25T06:00:00+08:00')::INTEGER;
BEGIN
-- 根据高度计算时间
RETURN to_timestamp(baseEpoch + height * 30);
END
$$;

时间转换高度

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 根据FileCoin epoch计算出对应的时间,传入的参数如不填,默认当前时间
CREATE OR REPLACE FUNCTION calc_epoch_by_ts(ts timestamp with time zone = current_timestamp) RETURNS bigint
IMMUTABLE STRICT
LANGUAGE plpgsql
AS
$$
DECLARE
baseTime TIMESTAMP = TIMESTAMPTZ '2020-08-25T06:00:00+08:00';
BEGIN
-- 根据时间计算高度
RETURN extract(EPOCH FROM (ts - baseTime))::INTEGER / 30;
END
$$;

获取分区名称

通过系统数据库(pg_catalog)中的表获得分区表名称。如果不存在分区表,返回NULL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE OR REPLACE FUNCTION
pg_get_part_table_name(main_table VARCHAR, suffix VARCHAR)
RETURNS VARCHAR
IMMUTABLE
AS
$$
DECLARE
part_table_name VARCHAR;
BEGIN
WITH o AS (SELECT inhrelid FROM pg_inherits WHERE inhparent = (concat_ws('.','public', main_table))::REGCLASS)
SELECT relname
FROM pg_class
WHERE oid IN (SELECT * FROM o)
AND relname = concat_ws('_', main_table, suffix)
INTO part_table_name;
RETURN part_table_name;
END
$$ LANGUAGE plpgsql;

获取分区表分区键(废弃⚠️ at 2022-03-25)

这个函数用来插入时作为更新键,但实际作为更新键通常是唯一键。唯一键可以是复合键。该函数不适用。

pg_get_partkeydef PostgreSQL 内置函数,位于pg_catalog 数据库中,用于提取分区表的分区键定义。strpossubstr函数同样位于pg_catalog数据库中定义,这两个函数参数用法与C语言相同。

通过pg_get_partkeydef函数返回的结果如下:

1
2
select pg_get_partkeydef('test'::regclass);
-- Output: RANGE (epoch)

使用strpossubstr得到上述返回结果定义列名称。完整实现如下:

通过调用pg_get_part_table_range_key 输入分区表(主表)名称转换成oid,返回分区键名称。非分区表返回结果为NULL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE OR REPLACE FUNCTION pg_get_part_table_range_key(oid REGCLASS)
RETURNS VARCHAR
IMMUTABLE STRICT
AS
$$
DECLARE
part_key_def TEXT = pg_get_partkeydef(oid); -- Output: RANGE (epoch)
begin_position INT = strpos(part_key_def, '(') + 1;
def_len INT = length(part_key_def);
-- substr(text, int, int) 第二个参数代表位置,第三个参数代表长度
part_key TEXT = substr(part_key_def, begin_position, def_len - begin_position);
BEGIN
-- 如果不是分区表,返回null
RETURN cast(part_key AS VARCHAR);
END
$$ LANGUAGE plpgsql;

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- Range分区
CREATE TABLE test11
(
epoch INTEGER NOT NULL,
miner VARCHAR(100) NOT NULL,
balance NUMERIC,
CONSTRAINT uk_epoch_miner
UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

select pg_get_part_table_range_key('test11'::regclass);
-- Output: epoch

-- 哈希分区 以及分区表
CREATE TABLE test22(
miner varchar(50)
) PARTITION BY HASH (miner);

CREATE TABLE test22_1 PARTITION OF test22 FOR VALUES WITH (MODULUS 2, REMAINDER 0);

select pg_get_part_table_range_key('test22'::regclass);
-- Output: miner

-- 列表分区
CREATE TABLE test33(
miner varchar(50)
) PARTITION BY LIST (miner);

CREATE TABLE test33_1 PARTITION OF test33 FOR VALUES IN ('f02439');

select pg_get_part_table_range_key('test33'::regclass);
-- Output: miner

-- 普通非分区表
create table test66(miner varchar(50));

select pg_get_part_table_range_key('test66'::regclass);
-- Output: <NULL>

获取更新唯一键(2022-03-25 新增)

💡 这里约定,自动建分区表,需要建立一个且只有一个constraint定义。例如:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE IF NOT EXISTS miner_actors
(
epoch BIGINT NOT NULL,
miner VARCHAR NOT NULL,
owner VARCHAR NOT NULL,
worker VARCHAR,
controllers VARCHAR[],
sector_size BIGINT,
power DECIMAL,
CONSTRAINT uk_epoch_miner_actors UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

以上有一个唯一性约束uk_epoch_miner_actors,且只要定义一个,其它的约束以添加index方式在外部添加。

然后以下面的语句查询约束字段。

1
2
3
4
5
6
7
SELECT
DISTINCT kcu.column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'UNIQUE' AND tc.table_name = 'miner_actors';

作用于下面的代码

获取分区表更新列模版

modified at 2022-03-25

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE FUNCTION pg_compute_part_table_update_columns(main_table CHARACTER VARYING) RETURNS TEXT
IMMUTABLE
LANGUAGE plpgsql
AS
$$
DECLARE
update_columns TEXT;
BEGIN
SELECT string_agg(concat(column_name, '=$1.', column_name), ', ') AS columns
INTO update_columns
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = main_table
AND column_name NOT IN (SELECT
DISTINCT kcu.column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'UNIQUE' AND tc.table_name = main_table);
RETURN update_columns;
END
$$;

创建update 更新约束字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE OR REPLACE FUNCTION pg_compute_constraint_keys(main_table VARCHAR) RETURNS TEXT
IMMUTABLE
LANGUAGE plpgsql
AS
$$
DECLARE
ukeys TEXT;
BEGIN
WITH t AS (SELECT DISTINCT kcu.column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'UNIQUE'
AND tc.table_name = main_table)
SELECT string_agg(concat(t.column_name, '=$1.', t.column_name), ' AND ')
FROM t
INTO ukeys;
RETURN ukeys;
END
$$;

创建新分区

分区Meta表

分区meta表记录了分区表的分区信息、创建时间和分区范围。该表内的记录应在创建新分区时被记录。part_type字段表示范围分区、哈希分区或列表分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE IF NOT EXISTS part_tables
(
id SERIAL NOT NULL PRIMARY KEY,
tb_name VARCHAR(100),
part_type SMALLINT NOT NULL DEFAULT 0,
part_table VARCHAR(100),
begin_time TIMESTAMPTZ,
end_time TIMESTAMPTZ,
created_time TIMESTAMPTZ DEFAULT current_timestamp,
-- 分区表需要唯一
CONSTRAINT uk_part_table_name UNIQUE (tb_name, part_table),
CHECK (part_type IN (0, 1, 2)) -- 0 range / 1 hash /2 list
);

主函数

这里创建新分区一定要加上 IF NOT EXISTS判断,在同一个事务中,批量插入时,pg_catalog表内未完成实际分区表的创建,所以查询出来的结果是空,会重复创建。通过IF NOT EXISTS规避同一事务中的重复建表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CREATE OR REPLACE FUNCTION
create_new_partition_table(main_table VARCHAR, part_table_type SMALLINT, part_def PARTITION_TABLE_RANGE_DEFINITION)
RETURNS VARCHAR
AS
$$
DECLARE
part_table_name TEXT;
BEGIN
part_table_name = concat_ws('_', main_table, part_def.suffix);
EXECUTE format('CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)',
part_table_name, main_table, part_def.begin_epoch, part_def.end_epoch);

CREATE TABLE IF NOT EXISTS part_tables
(
id SERIAL NOT NULL PRIMARY KEY,
tb_name VARCHAR(100),
part_type SMALLINT NOT NULL DEFAULT 0,
part_table VARCHAR(100),
begin_time TIMESTAMPTZ,
end_time TIMESTAMPTZ,
created_time TIMESTAMPTZ DEFAULT current_timestamp,
-- 分区表需要唯一
CONSTRAINT uk_part_table_name UNIQUE (tb_name, part_table),
CHECK (part_type IN (0, 1, 2)) -- 0 range / 1 hash /2 list
);
INSERT INTO part_tables(tb_name, part_type, part_table, begin_time, end_time, created_time)
VALUES (main_table, part_table_type, part_table_name,
part_def.begin_time, part_def.end_time, current_timestamp)
ON CONFLICT (tb_name, part_table) DO NOTHING ;

RETURN part_table_name;
END
$$ LANGUAGE plpgsql;

重写规则

通过PostgreSQL数据库规则系统,改变数据库分区表默认的INSERT/UPDATE操作,来达到在插入或更新前,判断目标分区是否存在,如不存在创建之的目的。

为方便后续对分区表的维护,额外建立一张独立的数据表,存放新创建的分区以及创建时间。可以根据时间对分区表合理裁剪。

准备分区表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE TABLE example_table
(
epoch INT,
miner VARCHAR(100),
balance DECIMAL,
code VARCHAR(100),
CONSTRAINT uk_epoch_miner UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Nullable | Default | Storage | Stats target |
---------+------------------------+----------+---------+----------+--------------+-
epoch | integer | | | plain | |
miner | character varying(100) | | | extended | |
balance | numeric | | | main | |
code | character varying(100) | | | extended | |
Partition key: RANGE (epoch)
Indexes:
"uk_epoch_miner" UNIQUE CONSTRAINT, btree (epoch, miner)
Number of partitions: 0

插入(INSERT)

替代数据库分区表默认的插入操作,需要考虑默认插入动作的几个关键要素:自动路由到对应的表分区进行插入,处理唯一键冲突。

自动路由

自动路由的前提是对应的表分区已存在。它分成两个步骤:

  • 创建新分区

在插入行数据前,使用函数pg_get_part_table_name查找新插入的记录分区字段所对应的表分区,如果在数据库中不存在,则新创建。

  • 插入数据

向具体的表分区插入数据。

处理唯一键冲突

使用重写规则有一个限制,就是无法处理唯一键冲突更新。

无法对具有INSERT或者UPDATE规则的表使用带有ON CONFLICT子句的INSERT

1
INSERT INTO from_table VALUES (3, '333') ON CONFLICT(col1) DO UPDATE SET data = excluded.data;
1
0A000  feature_not_supported

因此,需要处理unique_violation的执行错误。使用EXCEPTION捕获unique_violation的错误,执行分区表UPDATE。

注意,一旦捕获unique_violation并处理实现唯一键冲突更新的操作,意味着替代的INSERT操作将天生具有upsert的功能。

具体实现

一、创建插入函数。

根据数据库规则语法,规则仅支持SELECT/INSERT/UPDATE/DELETE操作,因此要执行一系列动作需要创建一个存储过程来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CREATE OR REPLACE FUNCTION insert_action(table_row miner_actors) RETURNS VOID
AS $$
DECLARE
main_table VARCHAR = 'example_table';
part_table_type SMALLINT = 0;
range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(table_row.epoch);
part_def PARTITION_TABLE_RANGE_DEFINITION = calc_partition_range_by_day(range_ts);
part_table_name VARCHAR = pg_get_part_table_name(main_table, part_def.suffix);
update_columns TEXT;
constraint_keys TEXT;
BEGIN
RAISE NOTICE 'suffix:%; begin_epoch:%; end_epoch:%; begin_time:%; end_time:%',
part_def.suffix, part_def.begin_epoch, part_def.end_epoch, part_def.begin_time, part_def.end_time;
RAISE NOTICE 'part_table_name: %', part_table_name;
IF part_table_name ISNULL THEN
SELECT create_new_partition_table(main_table, part_table_type, part_def)
INTO part_table_name;
END IF;
EXECUTE format('INSERT INTO %s SELECT $1.*', part_table_name) USING table_row;
EXCEPTION
WHEN unique_violation THEN
update_columns = pg_compute_part_table_update_columns(main_table);
constraint_keys = pg_compute_constraint_keys(main_table);
RAISE NOTICE '%', format('UPDATE %s SET %s WHERE %s', part_table_name, update_columns, constraint_keys);
EXECUTE format('UPDATE %s SET %s WHERE %s', part_table_name, update_columns, constraint_keys) USING table_row;
END
$$ LANGUAGE plpgsql;

二、创建INSERT RULE。

在分区表主表上创建一条规则,其方式是INSTEAD默认INSERT操作。其中new是一个内置变量,代表新插入的记录行。

1
CREATE RULE upsert_part_data AS ON INSERT TO example_table DO INSTEAD SELECT insert_action(new);

三、插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
INSERT INTO example_table(epoch, miner, balance, code) VALUES (1, 'f02438', '100', 'Good');

example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Nullable | Default | Storage | Stats target |
---------+------------------------+----------+---------+----------+--------------+-
epoch | integer | | | plain | |
miner | character varying(100) | | | extended | |
balance | numeric | | | main | |
code | character varying(100) | | | extended | |
Partition key: RANGE (epoch)
Indexes:
"uk_epoch_miner" UNIQUE CONSTRAINT, btree (epoch, miner)
Rules:
upsert_part_data AS
ON INSERT TO example_table DO INSTEAD SELECT insert_action(new.*) AS insert_action
Partitions: example_table_2020_08_25_0_2159 FOR VALUES FROM (0) TO (2159)

更新(UPDATE)

替代数据库分区表默认更新操作,需要考虑以下几个因素:普通更新和分区移动。

普通更新

对于分区列相同的更新,不需要执行删除命令,也不需要创建新不存在的分区表,直接UPDATE目标表。

分区移动

对于更新了分区列的数据,要查找新的分区表是否存在,如果不存在需要创建。然后将旧数据删除,在新表中插入新纪录。

具体实现

一、创建存储过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
CREATE OR REPLACE FUNCTION
update_action(new_row miner_actors, old_row miner_actors)
RETURNS VOID AS
$$
DECLARE
main_table VARCHAR = 'example_table';
part_table_type SMALLINT = 0;
new_range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(new_row.epoch);
new_part_def PARTITION_TABLE_RANGE_DEFINITION = calc_partition_range_by_day(new_range_ts);
new_part_table_name VARCHAR = pg_get_part_table_name(main_table, new_part_def.suffix);
old_range_ts TIMESTAMPTZ;
old_part_def PARTITION_TABLE_RANGE_DEFINITION;
old_part_table_name VARCHAR;
update_columns TEXT;
constraint_keys TEXT;
BEGIN
constraint_keys = pg_compute_constraint_keys(main_table);
IF new_row.epoch = old_row.epoch THEN
RAISE NOTICE 'update in same table';
-- 旧数据存在或条件不存在,这里只是做单纯的update更新分区数据
update_columns = pg_compute_part_table_update_columns(main_table);
EXECUTE format('UPDATE %s SET %s WHERE %s', new_part_table_name, update_columns, constraint_keys) USING new_row;
RETURN;
END IF;

IF new_part_table_name ISNULL THEN
SELECT create_new_partition_table(main_table, part_table_type, new_part_def)
INTO new_part_table_name;
END IF;

old_range_ts = calc_timestamp_by_epoch(old_row.epoch);
old_part_def = calc_partition_range_by_day(old_range_ts);
old_part_table_name = pg_get_part_table_name(main_table, old_part_def.suffix);

-- delete 动作要放在这里,这里有移动操作。
EXECUTE format('DELETE FROM %s WHERE %s', old_part_table_name, constraint_keys) USING old_row;
EXECUTE format('INSERT INTO %s SELECT $1.*', new_part_table_name) USING new_row;
RETURN;
END
$$ LANGUAGE plpgsql;

二、创建UPDATE RULE

在分区表主表上创建一条更新规则,其方式是INSTEAD默认UPDATE操作。其中newold是内置变量,代表新更新的数据和旧数据行。

这里的旧的数据行是通过where条件获知的,新的数据行是根据update内set获知的。所以在执行该更新执行计划的时候它会先收集数据,然后执行规则。所以old可以根据唯一键来查找。

1
CREATE RULE update_part_data AS ON UPDATE TO example_table DO INSTEAD SELECT update_action(new, old);

三、更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
UPDATE example_table SET epoch = 3000, balance = '99', code = 'Good+' WHERE epoch = 1;

example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Nullable | Default | Storage | Stats target |
---------+------------------------+----------+---------+----------+--------------+-
epoch | integer | | | plain | |
miner | character varying(100) | | | extended | |
balance | numeric | | | main | |
code | character varying(100) | | | extended | |
Partition key: RANGE (epoch)
Indexes:
"uk_epoch_miner" UNIQUE CONSTRAINT, btree (epoch, miner)
Rules:
update_part_data AS
ON UPDATE TO example_table DO INSTEAD SELECT update_action(new.*, old.*) AS update_action
upsert_part_data AS
ON INSERT TO example_table DO INSTEAD SELECT insert_action(new.*) AS insert_action
Partitions: example_table_2020_08_25_0_2159 FOR VALUES FROM (0) TO (2159),
example_table_2020_08_26_2160_5039 FOR VALUES FROM (2160) TO (5039)

分区表维护

规则替换

封装一个存储过程,将创建insert_action函数和update_action函数放入其中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
CREATE OR REPLACE PROCEDURE
create_auto_partition(table_name VARCHAR, range_type VARCHAR DEFAULT 'week')
AS
$$
DECLARE
insert_func_name TEXT := concat_ws('_', table_name, 'insert_action');
update_func_name TEXT := concat_ws('_', table_name, 'update_action');
insert_tpl TEXT;
update_tpl TEXT;
range_func_name TEXT;
BEGIN
IF range_type = 'day' THEN
range_func_name = 'calc_partition_range_by_day';
ELSEIF range_type = 'week' THEN
range_func_name = 'calc_partition_range_by_week_of_year';
ELSEIF range_type = 'month' THEN
range_func_name = 'calc_partition_range_by_month';
ELSEIF range_type = 'year' THEN
range_func_name = 'calc_partition_range_by_year';
ELSE
RAISE EXCEPTION 'unsupported range type(only in day, week, month, year)';
END IF;
insert_tpl := E'CREATE OR REPLACE FUNCTION ' || insert_func_name || '(table_row ' || table_name || ') ' ||
'RETURNS VOID AS ' ||
'$body$ ' ||
'DECLARE ' ||
' main_table VARCHAR = ''' || table_name || '''; ' ||
'part_table_type SMALLINT = 0; ' ||
'range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(table_row.epoch); ' ||
'part_def PARTITION_TABLE_RANGE_DEFINITION = ' || range_func_name || '(range_ts); ' ||
'part_table_name VARCHAR = pg_get_part_table_name(main_table, part_def.suffix); ' ||
'update_columns TEXT; ' ||
'constraint_keys TEXT; ' ||
'BEGIN ' ||
'IF part_table_name ISNULL THEN ' ||
'SELECT create_new_partition_table(main_table, part_table_type, part_def) ' ||
'INTO part_table_name; ' ||
'END IF; ' ||
'EXECUTE format(''INSERT INTO %s SELECT $1.*'', part_table_name) USING table_row; ' ||
'EXCEPTION ' ||
' WHEN unique_violation THEN ' ||
' update_columns = pg_compute_part_table_update_columns(main_table); ' ||
' constraint_keys = pg_compute_constraint_keys(main_table); ' ||
'EXECUTE format(''UPDATE %s SET %s WHERE %s '', part_table_name, update_columns, constraint_keys) USING table_row; ' ||
'END $body$ LANGUAGE plpgsql';
EXECUTE insert_tpl;
EXECUTE format('DROP RULE IF EXISTS upsert_part_data ON %s', table_name);
EXECUTE format('CREATE RULE upsert_part_data AS ON INSERT TO %s DO INSTEAD SELECT %s(new)', table_name, insert_func_name);

update_tpl := 'CREATE OR REPLACE FUNCTION '|| update_func_name || '(new_row '|| table_name ||', old_row '|| table_name ||')' ||
' RETURNS VOID AS ' ||
'$body$ ' ||
'DECLARE ' ||
' main_table VARCHAR = '''|| table_name ||'''; ' ||
'part_table_type SMALLINT = 0; ' ||
'new_range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(new_row.epoch); ' ||
'new_part_def PARTITION_TABLE_RANGE_DEFINITION = '|| range_func_name ||'(new_range_ts); ' ||
'new_part_table_name VARCHAR = pg_get_part_table_name(main_table, new_part_def.suffix); ' ||
'old_range_ts TIMESTAMPTZ; ' ||
'old_part_def PARTITION_TABLE_RANGE_DEFINITION; ' ||
'old_part_table_name VARCHAR; ' ||
'update_columns TEXT; ' ||
'constraint_keys TEXT;' ||
'BEGIN ' ||
'constraint_keys = pg_compute_constraint_keys(main_table); ' ||
'IF new_row.epoch = old_row.epoch THEN ' ||
'update_columns = pg_compute_part_table_update_columns(main_table); ' ||
'EXECUTE format(''UPDATE %s SET %s WHERE %s'', new_part_table_name, update_columns, constraint_keys) USING new_row; ' ||
'RETURN; ' ||
'END IF; ' ||
'IF new_part_table_name ISNULL THEN ' ||
'SELECT create_new_partition_table(main_table, part_table_type, new_part_def) ' ||
'INTO new_part_table_name; ' ||
'END IF; ' ||
'old_range_ts = calc_timestamp_by_epoch(old_row.epoch); ' ||
'old_part_def = ' || range_func_name || '(old_range_ts); ' ||
'old_part_table_name = pg_get_part_table_name(main_table, old_part_def.suffix); ' ||
'EXECUTE format(''DELETE FROM %s WHERE %s'', old_part_table_name, constraint_keys) USING old_row; ' ||
'EXECUTE format(''INSERT INTO %s SELECT $1.*'', new_part_table_name) USING new_row; ' ||
'RETURN; ' ||
'END $body$ LANGUAGE plpgsql';
EXECUTE update_tpl;
EXECUTE format('DROP RULE IF EXISTS update_part_data ON %s', table_name);
EXECUTE format('CREATE RULE update_part_data AS ON UPDATE TO %s DO INSTEAD SELECT %s(new, old)', table_name, update_func_name);
END;
$$ LANGUAGE plpgsql;

调用

1
CALL create_auto_partition('example_table');

上述简化了操作步骤。对于公共函数仍然需要提前部署。

裁剪

//TODO

挂载

//TODO

总结

创建自动分区表的步骤

一、准备公共函数

函数 一章,和 规则替换 一节

二、创建分区表

1
2
3
4
5
6
7
8
CREATE TABLE example_table
(
epoch INT,
miner VARCHAR(100),
balance DECIMAL,
code VARCHAR(100),
CONSTRAINT uk_epoch_miner UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

三、创建INSERT/UPDATE替换函数

1
CALL create_auto_partition('example_table');

测试结果

1
2
3
INSERT INTO example_table(epoch, miner, balance, code) SELECT i, 'f03367', '0', 'Good' FROM generate_series(1, 1500000) g(i);

-- completed in 10 m 26 s 339 ms

插入150万测试数据,花费10分钟左右。

参考资料

官方文档

表分区

附录 A. PostgreSQL错误代码

CREATE RULE

网上文章

PostgreSQL中的函数之日期时间函数

自动创建分区实践(写入触发器)

PostgreSQL 异常捕获(EXCEPTION)