PostgreSQL 是一个现代化的数据库,开源,免费试用,社区活跃,和Oracle具有同等强大的功能。作为一款高度可扩展的企业级关系型数据库管理系统,其内置的分区表功能在处理大规模数据场景中扮演着重要角色。尤其是自动表分区技术,通过自动化管理分区,极大地简化了分区表的创建和维护过程,提高了数据库的性能和管理效率。支持JSON格式和自定义类型,支持并行查询,聚合函数、窗口函数和数据透视表非常适合作为统计数据的数据源。
合理规划数据是作为数据库使用的重要手段。目前Filscan作为首款后端使用PostgreSQL数据库的项目,目前已经支撑了亿级以上的Filecoin庞大数据量,后续随着Filecoin链继续发展,数据还将继续膨胀。
Filecoin的状态数据的数量非常庞大,如果在一张表内保存,会非常影响查询效率。为达到保存一段时间内的数据,且可以根据时间范围伸缩,提高查询效率,需要对巨量数据表进行表分区。让数据库按需自动的创建新的表来存放按照规则划分的数据,是可以提高自动化维护表程度,减少人为的干扰和出错的可能。
表分区方案 表分区方案是一直存在的,有三种方式:
预先创建一批分区表
这种方式最大的弊端是需要定期的创建一批新分区表以适应新数据范围分区,其次分区表不能按实际数据区间创建分区表,所有的数据范围都必须紧密存在,才不会漏存对应的数据到分区中。定期创建分区是容易被忽视的一个环节,一旦忘记定期创建,尤其是定期周期较长,就会导致数据丢失。如果有默认分区存在,新表记录会被保存到默认分区内,对后续可持续维护带来了额外的停机时间开销。
使用程序对表进行分区
这种方式没有问题,不过需要做一些额外适配工作,以适应不同的分区表的动态创建,分区表逻辑的更新依赖程序的更新。而基于数据库实现自动表分区,只需要即时更新存储过程或函数就可立即生效,也能够享受数据库自身的特性带来的便捷性,减少开发周期,降低维护成本。
第三方插件
pg_pathman 是一个PostgreSQL数据库的第三方插件。该插件的好处是封装实现了一套维护分区表的完整操作。阿里云RDS PostgreSQL版本支持安装该插件,下面是阿里云RDS各个PostgreSQL版本所支持该插件的情况。
插件名
14
13
12
11
10
9.4
描述
pg_pathman
无
1.5
1.5
1.5
1.5
无
高性能分区表插件
https://help.aliyun.com/document_detail/142340.html
这里有关于该插件的全部用法:https://help.aliyun.com/document_detail/140900.html
但是该插件官网做了以下提示:
注意:该项目不再处于开发阶段 pg_pathman支持 Postgres 版本 [9.5..13],但很可能不会移植到 14 及更高版本。声明式分区 现在已经相当成熟,几乎所有的东西都在pg_pathman; 我们鼓励用户切换到它。我们仍在维护该项目(修复受支持版本中的错误),但这里不会发生新的开发。
这个项目官方预计停止后续的支持,目前处于维护阶段,提示官方声明式分区版本已经做得足够好,可以不需要依赖第三方插件。从这点上说,选择声明式分区无疑是最优的选择,不会因为服务器版本升级而需要额外升级依赖第三方实现。
声明式分区(DDL-Partition) PostgreSQL提供了一种方法指定如何把一个表划分成称为分区的片段。被划分的表被称作分区表 。
表分区是解决一些因单表过大引用的性能问题的方式,比如某张表过大就会造成查询变慢,可能分区是一种解决方案。一般建议当单表大小超过内存就可以考虑表分区了。
PostgreSQL表分区有两种实现方式,声明式分区和表继承。它们各自有优缺点,比如,对声明式分区来说,分区必须具有和分区表正好相同的列集合,而在表继承中,子表可以有父表中没有出现过的额外列。除此之外,声明式分区也有着优秀的特点,可以减轻对分区表的维护代价、提供了操作的便捷性和查询性能,使用表继承将要做更多的工作才能达到同样的效果。
特点 默认分区 PosgtreSQL 11开始,支持为声明式分区表创建一个默认(DEFAULT)的分区,用于存储无法匹配其他任何分区的数据。只有 RANGE 分区表和 LIST 分区表需要默认分区。创建默认分区时使用 DEFAULT 子句替代 FOR VALUES 子句。
1 2 3 4 5 6 7 8 9 CREATE TABLE example_table( epoch BIGINT NOT NULL , data text ) PARTITION BY RANGE (epoch); CREATE TABLE example_table_1_100 PARTITION OF example_table FOR VALUES FROM (MINVALUE) TO (101 );CREATE TABLE example_table_101_200 PARTITION OF example_table FOR VALUES FROM (101 ) TO (201 );CREATE TABLE example_table_default PARTITION OF example_table DEFAULT ;
如果没有创建默认分区,在插入epoch=201时,会报错,提示插入错误。
1 2 3 INSERT INTO example_table(epoch, data) VALUES (201 , 'good' );
默认分区存在以下限制:
一个分区表只能拥有一个 DEFAULT 分区;
对于已经存储在 DEFAULT 分区中的数据,不能再创建相应的分区;参见下文示例;
如果将已有的表挂载为 DEFAULT 分区,将会检查该表中的所有数据;如果在已有的分区中存在相同的数据,将会产生一个错误;
哈希分区表不支持 DEFAULT 分区,实际上也不需要支持。
以列表分区为例:
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE example_table_list( miner varchar (100 ) NOT NULL , epoch BIGINT NOT NULL , data varchar (50 ) ) PARTITION BY LIST (miner); CREATE TABLE example_table_list_02438 PARTITION OF example_table_list FOR VALUES IN ('f02438' );CREATE TABLE example_table_list_025002_025005 PARTITION OF example_table_list FOR VALUES IN ('f025002' , 'f025005' );CREATE TABLE example_table_list_default PARTITION OF example_table_list DEFAULT ;INSERT INTO example_table_list VALUES ('f03721' , 98 , 'Green' );
如果此时未创建对f03721的分区表,上面的语句会插入到example_table_list_default表内。如果此时开始创建
1 CREATE TABLE example_table_list_03721 PARTITION OF example_table_list FOR VALUES IN ('f03721' );
操作将会失败。因为添加新的分区需要修改默认分区的范围。但是默认分区中已经存在f02731的分区数据项。
错误: 某些行将违反默认分区”example_table_list_default”的更新分区约束
为了解决这个问题,可以先将默认分区从分区表中卸载(DETACH PARTITION ),创建新的分区,将默认分区中的相应的数据移动到新的分区,最后重新挂载默认分区。
1 2 3 4 5 6 7 8 9 10 11 ALTER TABLE example_table_list DETACH PARTITION example_table_list_default;CREATE TABLE example_table_list_03721 PARTITION OF example_table_list FOR VALUES IN ('f03721' );INSERT INTO example_table_list_03721SELECT * FROM example_table_list_default WHERE miner = 'f03721' ;DELETE FROM example_table_list_default WHERE miner = 'f03721' ;ALTER TABLE example_table_list ATTACH PARTITION example_table_list_default DEFAULT ;
分区自动索引 在 PostgreSQL10 中,分区上的索引需要基于各个分区手动创建,而不能基于分区的父表创建索引。PostgreSQL 11 及以后的版本,声明式分区表可以基于分区表创建索引。分区表上的索引并不会创建一个物理上的索引,而是为每个分区上的索引创建一个模板。表继承方式的分区表需要为每一张分区表创建索引和约束。
如果在声明式分区表上创建了一个索引,PostgreSQL 自动为每个分区创建具有相同属性的索引。
1 2 3 4 5 CREATE UNIQUE INDEX example_table_epoch ON example_table(epoch DESC );DROP INDEX example_table_epoch;
自动创建的索引,名称按照 “{partition name}{column name}_idx” 的模式定义。多个字段的复合索引使用下划线( )连接字段名称。如果索引名称已经存在,在名称的最后添加一个数字。如果名称过长,使用缩写。
随后新增的分区或者通过 ATTACH PARTITION 挂载的分区都会自动创建相应的索引。
自动创建的索引不能单独删除,可以通过分区表统一删除。
跨分区移动数据 在 PostgreSQL 10 中,声明式分区表如果 UPDATE 语句修改了分区字段的值,导致数据需要移动到其他分区时,语句将会失败。PostgreSQL 11+ 以后能够正确处理更新分区字段的操作。根据提交记录 ,这种 UPDATE 语句实际上分为两步执行:从旧的分区中 DELETE相应记录,在新的分区中 INSERT相应记录。另外,跨分区移动数据的 UPDATE 语句将会导致触发器的执行顺序更加复杂。
限制 分区表有下列限制:
没有办法创建跨越所有分区的排除约束,只可能单个约束每个叶子分区。
分区表上的惟一约束(也就是主键)必须包括所有分区键列。存在此限制是因为PostgreSQL只能每个分区中分别强制实施唯一性。
BEFORE ROW 触发器无法更改哪个分区是新行的最终目标。
不允许在同一个分区树中混杂临时关系和持久关系。因此,如果分区表是持久的,则其分区也必须是持久的,反之亦然。在使用临时关系时,分区数的所有成员都必须来自于同一个会话。
执行流程 厘清执行流程有助于从宏观上理解在哪个步骤中适合实现自动创建分区表。从数据库提供的机制上,有存储过程和触发器两种武器,分别可以实现在不同阶段处理不同拦截逻辑。
对于声明式分区表,所有被插入的行将被基于分区键的值,路由到分区 中。每个分区都有一个由其分区边界 定义的数据子集。当前支持的分区方法是范围、列表以及哈希。也就是说分区表的插入/更新操作,又一个内置的路由规则。
根据定义,结合对表的插入/更新操作以及触发器流程,一条数据在插入到分区表时,将经历以下步骤:
触发分区主表的statement before insert
自动路由到符合范围的分区子表
触发符合范围的分区子表row before insert
数据插入
触发符合范围的分区子表row after insert
触发分区主表的statement after insert
下面验证一下上述流程。
触发器行为 创建分区表 举个例子,创建一张测试分区表,并创建三个分区,不创建 默认分区。由于默认分区的限制,对于已经存储在 DEFAULT 分区中的数据,不能再创建相应的分区,需要先DETACH默认分区手动做数据转移,然后才可以创建新范围分区,这个步骤无法实现自动创建。
1 2 3 4 5 DROP TABLE example_table CASCADE ;CREATE TABLE example_table(col1 INT ) PARTITION BY RANGE (col1);CREATE TABLE example_table_1 PARTITION OF example_table FOR VALUES FROM (0 ) TO (10 );CREATE TABLE example_table_2 PARTITION OF example_table FOR VALUES FROM (11 ) TO (20 );CREATE TABLE example_table_3 PARTITION OF example_table FOR VALUES FROM (21 ) TO (30 );
创建触发器函数 创建一个用于观察触发器触发阶段的函数
1 2 3 4 5 6 7 8 CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS $$ BEGIN RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%' , tg_name, tg_table_name, tg_level, tg_op, tg_when, new ; RETURN new ; END $$ LANGUAGE plpgsql;
创建触发器 在分区表主表example_table上绑定测试分区表INSERT和UPDATE触发器,分别是语句级别和行级别,与before 和after的组合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 CREATE TRIGGER show_notice_before_insert_stmt BEFORE INSERT ON example_table FOR STATEMENT EXECUTE FUNCTION notice_trigger();CREATE TRIGGER show_notice_after_insert_stmt AFTER INSERT ON example_table FOR STATEMENT EXECUTE FUNCTION notice_trigger();CREATE TRIGGER show_notice_before_insert_row BEFORE INSERT ON example_table FOR ROW EXECUTE FUNCTION notice_trigger();CREATE TRIGGER show_notice_after_insert_row AFTER INSERT ON example_table FOR ROW EXECUTE FUNCTION notice_trigger();CREATE TRIGGER show_notice_before_update_stmt BEFORE UPDATE ON example_table FOR STATEMENT EXECUTE FUNCTION notice_trigger();CREATE TRIGGER show_notice_after_update_stmt AFTER UPDATE ON example_table FOR STATEMENT EXECUTE FUNCTION notice_trigger();CREATE TRIGGER show_notice_before_update_row BEFORE UPDATE ON example_table FOR ROW EXECUTE FUNCTION notice_trigger();CREATE TRIGGER show_notice_after_update_row AFTER UPDATE ON example_table FOR ROW EXECUTE FUNCTION notice_trigger();
通过观察表结构定义我们可以看到,主表上创建出8个触发器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 example= # \d+ example_table Partitioned table "public.example_table" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description col1 | integer | | | | plain | | Partition key: RANGE (col1)Triggers: show_notice_after_insert_row AFTER INSERT ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger() show_notice_after_insert_stmt AFTER INSERT ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger() show_notice_after_update_row AFTER UPDATE ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger() show_notice_after_update_stmt AFTER UPDATE ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger() show_notice_before_insert_row BEFORE INSERT ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger() show_notice_before_insert_stmt BEFORE INSERT ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger() show_notice_before_update_row BEFORE UPDATE ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger() show_notice_before_update_stmt BEFORE UPDATE ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger() Partitions: example_table_1 FOR VALUES FROM (0 ) TO (10 ), example_table_2 FOR VALUES FROM (11 ) TO (20 ), example_table_3 FOR VALUES FROM (21 ) TO (30 )
分区表上也自动创建出了4个行级触发器。
1 2 3 4 5 6 7 8 9 10 11 example= # \d example_table_1 Table "public.example_table_1" Column | Type | Collation | Nullable | Default col1 | integer | | | Partition of : example_table FOR VALUES FROM (0 ) TO (10 )Triggers: show_notice_after_insert_row AFTER INSERT ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table show_notice_after_update_row AFTER UPDATE ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table show_notice_before_insert_row BEFORE INSERT ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table show_notice_before_update_row BEFORE UPDATE ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table
分区表主表类型为(partitioned table),分区表类型为(table)。
在主表上创建触发器,会为每一个子表创建触发器。
FOR EACH STATEMENT 只会在主表上创建
FOR EACH ROW 才会为每一个子表创建(如下查询结果)。
插入一条数据 1 2 3 4 5 6 7 8 INSERT INTO example_table VALUES (3 );[00000 ] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT ; when :BEFORE; value :< NULL > [00000 ] trigger name:show_notice_before_insert_row; table_name:example_table_1; level:ROW ; op:INSERT ; when :BEFORE; value :(3 ) [00000 ] trigger name:show_notice_after_insert_row; table_name:example_table_1; level:ROW ; op:INSERT ; when :AFTER; value :(3 ) [00000 ] trigger name:show_notice_after_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT ; when :AFTER; value :< NULL > 1 row affected in 60 ms
分区表INSERT触发器的执行顺序:
主表example_table statement 级别 INSERT BEFOER触发器
主表example_table statement 级别 INSERT AFTER 触发器
分区表example_table_1 行级别 INSERT BEFORE 触发器
分区表example_table_1 行级别 INSERT AFTER 触发器
插入不存在分区的数据 1 2 3 4 5 6 INSERT INTO example_table VALUES (32 );[23514 ] 错误: 没有为行找到关系"example_table"的分区 详细:失败行的分区键包含(col1) = (32 ). [00000 ] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT ; when :BEFORE; value :< NULL >
插入一条不存在分区的数据时,会导致插入错误,提示没找到分区。分区表INSERT触发器的执行顺序:
主表example_table statement 级别 INSERT BEFORE 触发器
主表example_table statement 级别 INSERT AFTER 触发器
修改触发器函数 修改触发器函数notice_trigger使它拥有创建表的能力。以下代码仅供测试,认为新插入数据时为不存在分区范围的数据,且分区列数值在31-40之间。并定义触发器在行级、插入前触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS $$ BEGIN RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%' , tg_name, tg_table_name, tg_level, tg_op, tg_when, new ; IF tg_level = 'ROW' AND tg_op = 'INSERT' AND tg_when = 'BEFORE' THEN CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31 ) TO (40 ); INSERT INTO example_table_4 SELECT new.* ; RETURN NULL ; END IF; RETURN new ; END $$ LANGUAGE plpgsql;
再次执行数据插入
1 2 3 4 5 6 INSERT INTO example_table VALUES (32 );[23514 ] 错误: 没有为行找到关系"example_table"的分区 详细:失败行的分区键包含(col1) = (32 ). [00000 ] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT ; when :BEFORE; value :< NULL >
主表上由于分区表的默认插入行为,并没有触发行级触发器,并由于自动路由,错误提示找不到关系分区报错退出。
再次修改触发器函数 这次计划通过语句级别INSERT BEFORE触发器来创建新分区表。
1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS $$ BEGIN RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%' , tg_name, tg_table_name, tg_level, tg_op, tg_when, new ; IF tg_level = 'STATEMENT' AND tg_op = 'INSERT' AND tg_when = 'BEFORE' THEN CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31 ) TO (40 ); RETURN NULL ; END IF; RETURN new ; END $$ LANGUAGE plpgsql;
再次尝试插入
1 2 3 4 5 6 7 INSERT INTO example_table VALUES (32 );[55006 ] 错误: 无法CREATE TABLE .. PARTITION OF "example_table" 因为它正在被这个会话中的活动查询使用 在位置:SQL 语句 "CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31) TO (40)" 在SQL 语句的第6 行的PL/ pgSQL函数notice_trigger() [00000 ] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT ; when :BEFORE; value :< NULL >
从上得知,BEFORE ROW 触发器无法更改哪个分区是新行的最终目标。
自动路由流程 从以上动作得知:
分区表主表只会触发语句级别的触发器
分区表只会触发行级触发器
数据库会对插入和更新动作自动做路由规则处理
整个插入过程在一个事务内,在插入开始期间,会收集整个分区表信息;声明式分区表有一个内置的路由行为,该行为发生在语句级别before触发器之后。它会根据分区列将实际的INSERT/UPDATE动作定向到分区中进行插入和更新。此时主分区表不会触发它的行级触发器,而是触发分区表中的行级触发器,且不会触发分区表的语句级触发器。无法在语句级别动态创建新分区,到达自动路由阶段,对不存在分区的插入会导致插入失败,不再执行后续动作。
规则系统 考虑到高可维护性,简便易行和最大程度使用数据库的特点,最终考虑使用声明式分区来实现自动表分区。
既然无法使用触发器达到在插入数据时,自动创建不存在范围的分区表的行为,有没有办法不通过触发器,而通过数据库自身的特性达到同样的目的呢?
答案是有的。经过调研,PostgreSQL数据库支持重写规则(RULE)。
用法 1 2 3 4 5 6 7 CREATE [ OR REPLACE ] RULE name AS ON event TO table_name [ WHERE condition ] DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) } 其中 event 可以是以下之一: SELECT | INSERT | UPDATE | DELETE
PostgreSQL规则系统允许我们定义 针对数据库表中插入、更新或者删除动作上的替代动作。大约来说,当在 一个给定表上执行给定命令时,一条规则会导致执行额外的命令。或者, INSTEAD规则可以用另一个命令替换给定的命令,或者导致一个命令根本不被执行。
使用规则来改变插入的流程,绕开分区表触发器的执行顺序限制,直接插入到目标表内。
原理是使用规则重定向插入,将INSERT/UPDATE操作替换成用函数来插入。函数插入时判断是否报错了,如果报错,新增表。
设计流程 在分区表主表上创建两个规则,分别是INSERT和 UPDATE,替换数据库向主表默认插入和更新操作。此时的触发器和流程从下图左边变成了右边的流程,规则接管默认插入机制,在插入数据前做一层条件判断,达到自动创建分区的目的。由于规则系统仅支持INSERT/UPDATE/SELECT/DELETE操作,因此需要编写一个函数来托管整个过程。
在这个大框架下还需要解决自动化创建表时一致性行为,以及接管一些数据库默认插入/更新时的功能:
表分区名称约定
结合Filecoin业务,比如根据区块高度对数据分区,对高度计算得到分区表后缀
获取分区表名称是否存在
唯一键冲突
分区移动
函数(存储过程) 表分区名称约定 表名称约定 通常,对表的数据进行分区,对数据行记录时间进行划分。以此为基础便于以时间范围查询,因此分区表将设置成主表+后缀名的方式命名。后缀名定义以下4种按时间划分的分区规则:
按天分区 (分区表命名规则:table_name_2022_02_18_1560240_1563119)
按周分区 (分区表命名规则:table_name_2022_w7_1548720_1568879)
按月分区 (分区表命名规则:table_name_2022_02_1560240_1563119)
按年分区 (分区表命名规则:table_name_2022_1560240_1563119)
以上分区规则统一按照北京时间为基准,以一天的零点作为开始,23点59分59秒作为结束。
按天分区
分区表命名规则为 主表_年_月_日_起始epoch_终止epoch
其中月份为两位数,不足两位前缀补0。
按周分区
分区表命名规则为 主表_年_w全年周数_起始epoch_终止epoch
按月分区
分区表命名规则为 主表_年_月_起始epoch_终止epoch
其中月份为两位数,不足两位前缀补0。
按年分区
分区表命名规则为 主表_年_起始epoch_终止epoch
以下内容为实现分区表命名规则而设立的函数,执行函数可以计算得到上述规则后缀和分区范围。
分区范围类型定义 定义数据库的类型,拥有以下6个字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 DROP TYPE IF EXISTS PARTITION_TABLE_RANGE_DEFINITION CASCADE ;CREATE TYPE PARTITION_TABLE_RANGE_DEFINITION AS ( suffix VARCHAR , hit_date DATE , begin_epoch BIGINT , begin_time TIMESTAMPTZ, end_epoch BIGINT , end_time TIMESTAMPTZ );
分区范围字段
说明
suffix
分区表的后缀名称,配合主表名+后缀定义出分区表名称
hit_date
查询分区范围定义的日期
begin_epoch
分区表开始高度(Filecoin epoch, 下同)
begin_time
分区表开始时间(参考,分区以begin_epoch为主,下同)
end_epoch
分区表结束高度
end_time
分区表结束时间
分区范围实现 注意!分区表FROM (MINVALUE) TO (101) 是前闭后开。也就是插入的分区列应满足 epoch ∈[MINVALUE, 101)
1 2 3 4 5 6 7 CREATE TABLE test_part_table(col1 INT ) PARTITION BY RANGE (col1);CREATE TABLE test_part_table_1 PARTITION OF test_part_table FOR VALUES FROM (0 ) TO (10 );CREATE TABLE test_part_table_2 PARTITION OF test_part_table FOR VALUES FROM (10 ) TO (20 );INSERT INTO test_part_table SELECT i FROM generate_series(0 , 10 ) g(i);
按天分区 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 CREATE OR REPLACE FUNCTION calc_partition_range_by_day(current_ts TIMESTAMPTZ = current_timestamp ) RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION IMMUTABLE STRICT AS $$ DECLARE startup_time timestamptz = '2020-08-25T06:00:00+08:00' ::timestamptz; "year" INT = extract (YEAR FROM current_ts); "month" INT = extract (MONTHS FROM current_ts); "day" INT = extract (DAY FROM current_ts); begin_time TIMESTAMPTZ = date_trunc('day' , current_ts); begin_epoch BIGINT = calc_epoch_by_ts(begin_time); day_interval INTERVAL = INTERVAL '1 DAY' - INTERVAL '1 SECOND' ; end_time TIMESTAMPTZ = begin_time + day_interval; end_epoch BIGINT = calc_epoch_by_ts(end_time + INTERVAL '1 SECOND' ); mm_month VARCHAR = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END ); BEGIN IF begin_epoch < 0 THEN begin_time = startup_time; begin_epoch = 0 ; "year" = extract (YEAR FROM begin_time); "month" = extract (MONTHS FROM begin_time); "day" = extract (DAY FROM begin_time); end_time = date_trunc('day' , begin_time) + day_interval; end_epoch = calc_epoch_by_ts(end_time + INTERVAL '1 SECOND' ); mm_month = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END ); END IF; RAISE NOTICE '[calc_partition_range_by_day]current_ts: %' , current_ts; RETURN QUERY SELECT concat_ws('_' , "year", mm_month, "day", begin_epoch, end_epoch)::VARCHAR , current_ts::DATE , begin_epoch, begin_time, end_epoch, end_time; END ;$$ LANGUAGE plpgsql;
按周分区 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 CREATE OR REPLACE FUNCTION calc_partition_range_by_week_of_year(current_ts TIMESTAMPTZ = current_timestamp ) RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION IMMUTABLE STRICT AS $$ DECLARE startup_time timestamptz = '2020-08-25T06:00:00+08:00' ::timestamptz; week_of_year INT = extract (WEEK FROM current_ts); iso_year INT = extract (ISOYEAR FROM current_ts); begin_of_week TIMESTAMPTZ = date_trunc('WEEK' , current_ts); begin_epoch BIGINT = calc_epoch_by_ts(begin_of_week); week_interval INTERVAL = INTERVAL '7 DAYS' ; end_of_week TIMESTAMPTZ = begin_of_week + week_interval; end_epoch BIGINT = calc_epoch_by_ts(end_of_week); BEGIN IF begin_epoch < 0 THEN begin_of_week = startup_time; begin_epoch = 0 ; week_of_year = extract (WEEK FROM begin_of_week); iso_year = extract (ISOYEAR FROM begin_of_week); end_of_week = date_trunc('WEEK' , begin_of_week) + week_interval; end_epoch = calc_epoch_by_ts(end_of_week); END IF; RETURN QUERY SELECT (concat_ws('_' , iso_year, concat('w' , week_of_year), begin_epoch, end_epoch))::VARCHAR , current_ts::DATE , begin_epoch, begin_of_week, end_epoch, end_of_week; END $$ LANGUAGE plpgsql;
按月分区 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 CREATE OR REPLACE FUNCTION calc_partition_range_by_month(current_ts TIMESTAMPTZ = current_timestamp ) RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION IMMUTABLE STRICT AS $$ DECLARE startup_time timestamptz = '2020-08-25T06:00:00+08:00' ::timestamptz; begin_time TIMESTAMPTZ = date_trunc('month' , current_ts); "year" INT = extract (YEAR FROM begin_time); "month" INT = extract (MONTHS FROM begin_time); begin_epoch BIGINT = calc_epoch_by_ts(begin_time); month_interval INTERVAL = INTERVAL '1 month' ; end_time TIMESTAMPTZ = begin_time + month_interval; end_epoch BIGINT = calc_epoch_by_ts(end_time); mm_month VARCHAR = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END ); BEGIN IF begin_epoch < 0 THEN begin_time = startup_time; begin_epoch = 0 ; "year" = extract (YEAR FROM begin_time); "month" = extract (MONTHS FROM begin_time); end_time = date_trunc('month' , begin_time) + month_interval; end_epoch = calc_epoch_by_ts(end_time); mm_month = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END )::varchar ; END IF; RETURN QUERY SELECT (concat_ws('_' , "year", mm_month, begin_epoch, end_epoch))::VARCHAR , current_ts::DATE , begin_epoch, begin_time, end_epoch, end_time; END ;$$ LANGUAGE plpgsql;
按年分区 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 CREATE OR REPLACE FUNCTION calc_partition_range_by_year(current_ts TIMESTAMPTZ = current_timestamp ) RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION IMMUTABLE STRICT AS $$ DECLARE startup_time timestamptz = '2020-08-25T06:00:00+08:00' ::timestamptz; first_day_of_year TIMESTAMPTZ = date_trunc('year' , current_ts); year_interval INTERVAL = INTERVAL '1 year' ; last_day_of_year TIMESTAMPTZ = first_day_of_year + year_interval; begin_epoch BIGINT = calc_epoch_by_ts(first_day_of_year); end_epoch BIGINT = calc_epoch_by_ts(last_day_of_year); "year" INT = extract (YEAR FROM first_day_of_year); BEGIN IF begin_epoch < 0 THEN first_day_of_year = startup_time; begin_epoch = 0 ; last_day_of_year = date_trunc('year' , first_day_of_year) + year_interval; end_epoch = calc_epoch_by_ts(last_day_of_year); "year" = extract (YEAR FROM first_day_of_year); END IF; RETURN QUERY SELECT (concat_ws('_' , "year", begin_epoch, end_epoch))::VARCHAR , current_ts::DATE , begin_epoch, first_day_of_year, end_epoch, last_day_of_year; END ;$$ LANGUAGE plpgsql;
区块高度与时间 2020-08-25T06:00:00+08:00 是Filecoin 上线时间,每30秒一个纪元(高度)。通过与上线时间与高度换算的相对秒数,求得高度对应的时间。反之亦然。其计算方式如下:
高度转换时间 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE OR REPLACE FUNCTION calc_timestamp_by_epoch(height BIGINT = 0 ) RETURNS TIMESTAMP WITH TIME ZONE IMMUTABLE STRICT LANGUAGE plpgsql AS $$ DECLARE baseEpoch INTEGER = extract (EPOCH FROM TIMESTAMPTZ '2020-08-25T06:00:00+08:00' )::INTEGER ; BEGIN RETURN to_timestamp(baseEpoch + height * 30 ); END $$;
时间转换高度 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE OR REPLACE FUNCTION calc_epoch_by_ts(ts timestamp with time zone = current_timestamp ) RETURNS bigint IMMUTABLE STRICT LANGUAGE plpgsql AS $$ DECLARE baseTime TIMESTAMP = TIMESTAMPTZ '2020-08-25T06:00:00+08:00' ; BEGIN RETURN extract (EPOCH FROM (ts - baseTime))::INTEGER / 30 ; END $$;
获取分区名称 通过系统数据库(pg_catalog)中的表获得分区表名称。如果不存在分区表,返回NULL。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE OR REPLACE FUNCTION pg_get_part_table_name(main_table VARCHAR , suffix VARCHAR ) RETURNS VARCHAR IMMUTABLE AS $$ DECLARE part_table_name VARCHAR ; BEGIN WITH o AS (SELECT inhrelid FROM pg_inherits WHERE inhparent = (concat_ws('.' ,'public' , main_table))::REGCLASS) SELECT relname FROM pg_class WHERE oid IN (SELECT * FROM o) AND relname = concat_ws('_' , main_table, suffix) INTO part_table_name; RETURN part_table_name; END $$ LANGUAGE plpgsql;
获取分区表分区键(废弃⚠️ at 2022-03-25) 这个函数用来插入时作为更新键,但实际作为更新键通常是唯一键。唯一键可以是复合键。该函数不适用。
pg_get_partkeydef PostgreSQL 内置函数,位于pg_catalog 数据库中,用于提取分区表的分区键定义。strpos和substr函数同样位于pg_catalog数据库中定义,这两个函数参数用法与C语言相同。
通过pg_get_partkeydef函数返回的结果如下:
1 2 select pg_get_partkeydef('test' ::regclass);
使用strpos和substr得到上述返回结果定义列名称。完整实现如下:
通过调用pg_get_part_table_range_key 输入分区表(主表)名称转换成oid,返回分区键名称。非分区表返回结果为NULL。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE OR REPLACE FUNCTION pg_get_part_table_range_key(oid REGCLASS) RETURNS VARCHAR IMMUTABLE STRICT AS $$ DECLARE part_key_def TEXT = pg_get_partkeydef(oid); begin_position INT = strpos(part_key_def, '(' ) + 1 ; def_len INT = length(part_key_def); part_key TEXT = substr(part_key_def, begin_position, def_len - begin_position); BEGIN RETURN cast (part_key AS VARCHAR ); END $$ LANGUAGE plpgsql;
例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 CREATE TABLE test11( epoch INTEGER NOT NULL , miner VARCHAR (100 ) NOT NULL , balance NUMERIC , CONSTRAINT uk_epoch_miner UNIQUE (epoch, miner) ) PARTITION BY RANGE (epoch); select pg_get_part_table_range_key('test11' ::regclass);CREATE TABLE test22( miner varchar (50 ) ) PARTITION BY HASH (miner); CREATE TABLE test22_1 PARTITION OF test22 FOR VALUES WITH (MODULUS 2 , REMAINDER 0 );select pg_get_part_table_range_key('test22' ::regclass);CREATE TABLE test33( miner varchar (50 ) ) PARTITION BY LIST (miner); CREATE TABLE test33_1 PARTITION OF test33 FOR VALUES IN ('f02439' );select pg_get_part_table_range_key('test33' ::regclass);create table test66(miner varchar (50 ));select pg_get_part_table_range_key('test66' ::regclass);
获取更新唯一键(2022-03-25 新增) 💡 这里约定,自动建分区表,需要建立一个且只有一个constraint定义。例如:
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE IF NOT EXISTS miner_actors( epoch BIGINT NOT NULL , miner VARCHAR NOT NULL , owner VARCHAR NOT NULL , worker VARCHAR , controllers VARCHAR [], sector_size BIGINT , power DECIMAL , CONSTRAINT uk_epoch_miner_actors UNIQUE (epoch, miner) ) PARTITION BY RANGE (epoch);
以上有一个唯一性约束uk_epoch_miner_actors,且只要定义一个,其它的约束以添加index方式在外部添加。
然后以下面的语句查询约束字段。
1 2 3 4 5 6 7 SELECT DISTINCT kcu.column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name WHERE constraint_type = 'UNIQUE' AND tc.table_name = 'miner_actors' ;
作用于下面的代码
获取分区表更新列模版 modified at 2022-03-25
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 CREATE FUNCTION pg_compute_part_table_update_columns(main_table CHARACTER VARYING ) RETURNS TEXT IMMUTABLE LANGUAGE plpgsql AS $$ DECLARE update_columns TEXT; BEGIN SELECT string_agg(concat(column_name, '=$1.' , column_name), ', ' ) AS columns INTO update_columns FROM information_schema.columns WHERE table_schema = 'public' AND table_name = main_table AND column_name NOT IN (SELECT DISTINCT kcu.column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name WHERE constraint_type = 'UNIQUE' AND tc.table_name = main_table); RETURN update_columns; END $$;
创建update 更新约束字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE OR REPLACE FUNCTION pg_compute_constraint_keys(main_table VARCHAR ) RETURNS TEXT IMMUTABLE LANGUAGE plpgsql AS $$ DECLARE ukeys TEXT; BEGIN WITH t AS (SELECT DISTINCT kcu.column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name WHERE constraint_type = 'UNIQUE' AND tc.table_name = main_table) SELECT string_agg(concat(t.column_name, '=$1.' , t.column_name), ' AND ' ) FROM t INTO ukeys; RETURN ukeys; END $$;
创建新分区 分区meta表记录了分区表的分区信息、创建时间和分区范围。该表内的记录应在创建新分区时被记录。part_type字段表示范围分区、哈希分区或列表分区。
1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE IF NOT EXISTS part_tables( id SERIAL NOT NULL PRIMARY KEY , tb_name VARCHAR (100 ), part_type SMALLINT NOT NULL DEFAULT 0 , part_table VARCHAR (100 ), begin_time TIMESTAMPTZ, end_time TIMESTAMPTZ, created_time TIMESTAMPTZ DEFAULT current_timestamp , CONSTRAINT uk_part_table_name UNIQUE (tb_name, part_table), CHECK (part_type IN (0 , 1 , 2 )) );
主函数 这里创建新分区一定要加上 IF NOT EXISTS判断,在同一个事务中,批量插入时,pg_catalog表内未完成实际分区表的创建,所以查询出来的结果是空,会重复创建。通过IF NOT EXISTS规避同一事务中的重复建表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 CREATE OR REPLACE FUNCTION create_new_partition_table(main_table VARCHAR , part_table_type SMALLINT , part_def PARTITION_TABLE_RANGE_DEFINITION) RETURNS VARCHAR AS $$ DECLARE part_table_name TEXT; BEGIN part_table_name = concat_ws('_' , main_table, part_def.suffix); EXECUTE format('CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)' , part_table_name, main_table, part_def.begin_epoch, part_def.end_epoch); CREATE TABLE IF NOT EXISTS part_tables ( id SERIAL NOT NULL PRIMARY KEY , tb_name VARCHAR (100 ), part_type SMALLINT NOT NULL DEFAULT 0 , part_table VARCHAR (100 ), begin_time TIMESTAMPTZ, end_time TIMESTAMPTZ, created_time TIMESTAMPTZ DEFAULT current_timestamp , CONSTRAINT uk_part_table_name UNIQUE (tb_name, part_table), CHECK (part_type IN (0 , 1 , 2 )) ); INSERT INTO part_tables(tb_name, part_type, part_table, begin_time, end_time, created_time) VALUES (main_table, part_table_type, part_table_name, part_def.begin_time, part_def.end_time, current_timestamp ) ON CONFLICT (tb_name, part_table) DO NOTHING ; RETURN part_table_name; END $$ LANGUAGE plpgsql;
重写规则 通过PostgreSQL数据库规则系统,改变数据库分区表默认的INSERT/UPDATE操作,来达到在插入或更新前,判断目标分区是否存在,如不存在创建之的目的。
为方便后续对分区表的维护,额外建立一张独立的数据表,存放新创建的分区以及创建时间。可以根据时间对分区表合理裁剪。
准备分区表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE TABLE example_table( epoch INT , miner VARCHAR (100 ), balance DECIMAL , code VARCHAR (100 ), CONSTRAINT uk_epoch_miner UNIQUE (epoch, miner) ) PARTITION BY RANGE (epoch); example= # \d+ example_table Partitioned table "public.example_table" Column | Type | Nullable | Default | Storage | Stats target | epoch | integer | | | plain | | miner | character varying (100 ) | | | extended | | balance | numeric | | | main | | code | character varying (100 ) | | | extended | | Partition key: RANGE (epoch)Indexes: "uk_epoch_miner" UNIQUE CONSTRAINT , btree (epoch, miner) Number of partitions: 0
插入(INSERT) 替代数据库分区表默认的插入操作,需要考虑默认插入动作的几个关键要素:自动路由到对应的表分区进行插入,处理唯一键冲突。
自动路由 自动路由的前提是对应的表分区已存在。它分成两个步骤:
在插入行数据前,使用函数pg_get_part_table_name查找新插入的记录分区字段所对应的表分区,如果在数据库中不存在,则新创建。
向具体的表分区插入数据。
处理唯一键冲突 使用重写规则有一个限制,就是无法处理唯一键冲突更新。
无法对具有INSERT或者UPDATE规则的表使用带有ON CONFLICT子句的INSERT
1 INSERT INTO from_table VALUES (3 , '333' ) ON CONFLICT(col1) DO UPDATE SET data = excluded.data;
1 0 A000 feature_not_supported
因此,需要处理unique_violation的执行错误。使用EXCEPTION捕获unique_violation的错误,执行分区表UPDATE。
注意,一旦捕获unique_violation并处理实现唯一键冲突更新的操作,意味着替代的INSERT操作将天生具有upsert的功能。
具体实现 一、创建插入函数。
根据数据库规则语法,规则仅支持SELECT/INSERT/UPDATE/DELETE操作,因此要执行一系列动作需要创建一个存储过程来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 CREATE OR REPLACE FUNCTION insert_action(table_row miner_actors) RETURNS VOIDAS $$DECLARE main_table VARCHAR = 'example_table' ; part_table_type SMALLINT = 0 ; range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(table_row.epoch); part_def PARTITION_TABLE_RANGE_DEFINITION = calc_partition_range_by_day(range_ts); part_table_name VARCHAR = pg_get_part_table_name(main_table, part_def.suffix); update_columns TEXT; constraint_keys TEXT; BEGIN RAISE NOTICE 'suffix:%; begin_epoch:%; end_epoch:%; begin_time:%; end_time:%' , part_def.suffix, part_def.begin_epoch, part_def.end_epoch, part_def.begin_time, part_def.end_time; RAISE NOTICE 'part_table_name: %' , part_table_name; IF part_table_name ISNULL THEN SELECT create_new_partition_table(main_table, part_table_type, part_def) INTO part_table_name; END IF; EXECUTE format('INSERT INTO %s SELECT $1.*' , part_table_name) USING table_row; EXCEPTION WHEN unique_violation THEN update_columns = pg_compute_part_table_update_columns(main_table); constraint_keys = pg_compute_constraint_keys(main_table); RAISE NOTICE '%' , format('UPDATE %s SET %s WHERE %s' , part_table_name, update_columns, constraint_keys); EXECUTE format('UPDATE %s SET %s WHERE %s' , part_table_name, update_columns, constraint_keys) USING table_row; END $$ LANGUAGE plpgsql;
二、创建INSERT RULE。
在分区表主表上创建一条规则,其方式是INSTEAD默认INSERT操作。其中new是一个内置变量,代表新插入的记录行。
1 CREATE RULE upsert_part_data AS ON INSERT TO example_table DO INSTEAD SELECT insert_action(new );
三、插入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 INSERT INTO example_table(epoch, miner, balance, code) VALUES (1 , 'f02438' , '100' , 'Good' );example= # \d+ example_table Partitioned table "public.example_table" Column | Type | Nullable | Default | Storage | Stats target | epoch | integer | | | plain | | miner | character varying (100 ) | | | extended | | balance | numeric | | | main | | code | character varying (100 ) | | | extended | | Partition key: RANGE (epoch)Indexes: "uk_epoch_miner" UNIQUE CONSTRAINT , btree (epoch, miner) Rules: upsert_part_data AS ON INSERT TO example_table DO INSTEAD SELECT insert_action(new.* ) AS insert_action Partitions: example_table_2020_08_25_0_2159 FOR VALUES FROM (0 ) TO (2159 )
更新(UPDATE) 替代数据库分区表默认更新操作,需要考虑以下几个因素:普通更新和分区移动。
普通更新 对于分区列相同的更新,不需要执行删除命令,也不需要创建新不存在的分区表,直接UPDATE目标表。
分区移动 对于更新了分区列的数据,要查找新的分区表是否存在,如果不存在需要创建。然后将旧数据删除,在新表中插入新纪录。
具体实现 一、创建存储过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 CREATE OR REPLACE FUNCTION update_action(new_row miner_actors, old_row miner_actors) RETURNS VOID AS $$ DECLARE main_table VARCHAR = 'example_table' ; part_table_type SMALLINT = 0 ; new_range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(new_row.epoch); new_part_def PARTITION_TABLE_RANGE_DEFINITION = calc_partition_range_by_day(new_range_ts); new_part_table_name VARCHAR = pg_get_part_table_name(main_table, new_part_def.suffix); old_range_ts TIMESTAMPTZ; old_part_def PARTITION_TABLE_RANGE_DEFINITION; old_part_table_name VARCHAR ; update_columns TEXT; constraint_keys TEXT; BEGIN constraint_keys = pg_compute_constraint_keys(main_table); IF new_row.epoch = old_row.epoch THEN RAISE NOTICE 'update in same table' ; update_columns = pg_compute_part_table_update_columns(main_table); EXECUTE format('UPDATE %s SET %s WHERE %s' , new_part_table_name, update_columns, constraint_keys) USING new_row; RETURN ; END IF; IF new_part_table_name ISNULL THEN SELECT create_new_partition_table(main_table, part_table_type, new_part_def) INTO new_part_table_name; END IF; old_range_ts = calc_timestamp_by_epoch(old_row.epoch); old_part_def = calc_partition_range_by_day(old_range_ts); old_part_table_name = pg_get_part_table_name(main_table, old_part_def.suffix); EXECUTE format('DELETE FROM %s WHERE %s' , old_part_table_name, constraint_keys) USING old_row; EXECUTE format('INSERT INTO %s SELECT $1.*' , new_part_table_name) USING new_row; RETURN ; END $$ LANGUAGE plpgsql;
二、创建UPDATE RULE
在分区表主表上创建一条更新规则,其方式是INSTEAD默认UPDATE操作。其中new和old是内置变量,代表新更新的数据和旧数据行。
这里的旧的数据行是通过where条件获知的,新的数据行是根据update内set获知的。所以在执行该更新执行计划的时候它会先收集数据,然后执行规则。所以old可以根据唯一键来查找。
1 CREATE RULE update_part_data AS ON UPDATE TO example_table DO INSTEAD SELECT update_action(new , old );
三、更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 UPDATE example_table SET epoch = 3000 , balance = '99' , code = 'Good+' WHERE epoch = 1 ;example= # \d+ example_table Partitioned table "public.example_table" Column | Type | Nullable | Default | Storage | Stats target | epoch | integer | | | plain | | miner | character varying (100 ) | | | extended | | balance | numeric | | | main | | code | character varying (100 ) | | | extended | | Partition key: RANGE (epoch)Indexes: "uk_epoch_miner" UNIQUE CONSTRAINT , btree (epoch, miner) Rules: update_part_data AS ON UPDATE TO example_table DO INSTEAD SELECT update_action(new.* , old.* ) AS update_action upsert_part_data AS ON INSERT TO example_table DO INSTEAD SELECT insert_action(new.* ) AS insert_action Partitions: example_table_2020_08_25_0_2159 FOR VALUES FROM (0 ) TO (2159 ), example_table_2020_08_26_2160_5039 FOR VALUES FROM (2160 ) TO (5039 )
分区表维护 规则替换 封装一个存储过程,将创建insert_action函数和update_action函数放入其中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 CREATE OR REPLACE PROCEDURE create_auto_partition(table_name VARCHAR , range_type VARCHAR DEFAULT 'week' ) AS $$ DECLARE insert_func_name TEXT := concat_ws('_' , table_name, 'insert_action' ); update_func_name TEXT := concat_ws('_' , table_name, 'update_action' ); insert_tpl TEXT; update_tpl TEXT; range_func_name TEXT; BEGIN IF range_type = 'day' THEN range_func_name = 'calc_partition_range_by_day' ; ELSEIF range_type = 'week' THEN range_func_name = 'calc_partition_range_by_week_of_year' ; ELSEIF range_type = 'month' THEN range_func_name = 'calc_partition_range_by_month' ; ELSEIF range_type = 'year' THEN range_func_name = 'calc_partition_range_by_year' ; ELSE RAISE EXCEPTION 'unsupported range type(only in day, week, month, year)' ; END IF; insert_tpl := E'CREATE OR REPLACE FUNCTION ' || insert_func_name || '(table_row ' || table_name || ') ' || 'RETURNS VOID AS ' || '$body$ ' || 'DECLARE ' || ' main_table VARCHAR = ''' || table_name || '''; ' || 'part_table_type SMALLINT = 0; ' || 'range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(table_row.epoch); ' || 'part_def PARTITION_TABLE_RANGE_DEFINITION = ' || range_func_name || '(range_ts); ' || 'part_table_name VARCHAR = pg_get_part_table_name(main_table, part_def.suffix); ' || 'update_columns TEXT; ' || 'constraint_keys TEXT; ' || 'BEGIN ' || 'IF part_table_name ISNULL THEN ' || 'SELECT create_new_partition_table(main_table, part_table_type, part_def) ' || 'INTO part_table_name; ' || 'END IF; ' || 'EXECUTE format(''INSERT INTO %s SELECT $1.*'', part_table_name) USING table_row; ' || 'EXCEPTION ' || ' WHEN unique_violation THEN ' || ' update_columns = pg_compute_part_table_update_columns(main_table); ' || ' constraint_keys = pg_compute_constraint_keys(main_table); ' || 'EXECUTE format(''UPDATE %s SET %s WHERE %s '', part_table_name, update_columns, constraint_keys) USING table_row; ' || 'END $body$ LANGUAGE plpgsql' ; EXECUTE insert_tpl; EXECUTE format('DROP RULE IF EXISTS upsert_part_data ON %s' , table_name); EXECUTE format('CREATE RULE upsert_part_data AS ON INSERT TO %s DO INSTEAD SELECT %s(new)' , table_name, insert_func_name); update_tpl := 'CREATE OR REPLACE FUNCTION ' || update_func_name || '(new_row ' || table_name || ', old_row ' || table_name || ')' || ' RETURNS VOID AS ' || '$body$ ' || 'DECLARE ' || ' main_table VARCHAR = ''' || table_name || '''; ' || 'part_table_type SMALLINT = 0; ' || 'new_range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(new_row.epoch); ' || 'new_part_def PARTITION_TABLE_RANGE_DEFINITION = ' || range_func_name || '(new_range_ts); ' || 'new_part_table_name VARCHAR = pg_get_part_table_name(main_table, new_part_def.suffix); ' || 'old_range_ts TIMESTAMPTZ; ' || 'old_part_def PARTITION_TABLE_RANGE_DEFINITION; ' || 'old_part_table_name VARCHAR; ' || 'update_columns TEXT; ' || 'constraint_keys TEXT;' || 'BEGIN ' || 'constraint_keys = pg_compute_constraint_keys(main_table); ' || 'IF new_row.epoch = old_row.epoch THEN ' || 'update_columns = pg_compute_part_table_update_columns(main_table); ' || 'EXECUTE format(''UPDATE %s SET %s WHERE %s'', new_part_table_name, update_columns, constraint_keys) USING new_row; ' || 'RETURN; ' || 'END IF; ' || 'IF new_part_table_name ISNULL THEN ' || 'SELECT create_new_partition_table(main_table, part_table_type, new_part_def) ' || 'INTO new_part_table_name; ' || 'END IF; ' || 'old_range_ts = calc_timestamp_by_epoch(old_row.epoch); ' || 'old_part_def = ' || range_func_name || '(old_range_ts); ' || 'old_part_table_name = pg_get_part_table_name(main_table, old_part_def.suffix); ' || 'EXECUTE format(''DELETE FROM %s WHERE %s'', old_part_table_name, constraint_keys) USING old_row; ' || 'EXECUTE format(''INSERT INTO %s SELECT $1.*'', new_part_table_name) USING new_row; ' || 'RETURN; ' || 'END $body$ LANGUAGE plpgsql' ; EXECUTE update_tpl; EXECUTE format('DROP RULE IF EXISTS update_part_data ON %s' , table_name); EXECUTE format('CREATE RULE update_part_data AS ON UPDATE TO %s DO INSTEAD SELECT %s(new, old)' , table_name, update_func_name); END ;$$ LANGUAGE plpgsql;
调用
1 CALL create_auto_partition('example_table' );
上述简化了操作步骤。对于公共函数仍然需要提前部署。
裁剪 //TODO
挂载 //TODO
总结 创建自动分区表的步骤 一、准备公共函数
见 函数 一章,和 规则替换 一节
二、创建分区表
1 2 3 4 5 6 7 8 CREATE TABLE example_table( epoch INT , miner VARCHAR (100 ), balance DECIMAL , code VARCHAR (100 ), CONSTRAINT uk_epoch_miner UNIQUE (epoch, miner) ) PARTITION BY RANGE (epoch);
三、创建INSERT/UPDATE替换函数
1 CALL create_auto_partition('example_table' );
测试结果 1 2 3 INSERT INTO example_table(epoch, miner, balance, code) SELECT i, 'f03367' , '0' , 'Good' FROM generate_series(1 , 1500000 ) g(i);
插入150万测试数据,花费10分钟左右。
参考资料 官方文档 表分区
附录 A. PostgreSQL错误代码
CREATE RULE
网上文章 PostgreSQL中的函数之日期时间函数
自动创建分区实践(写入触发器)
PostgreSQL 异常捕获(EXCEPTION)