0%

人工智能求职助手

🤖 有针对性的调整简历,助你快速找到心仪的工作。

🤖 AI Resume Assistant, helping you tailor your resume and find your ideal job quickly.

Features

  • More aligned with practices within China
  • Analyzes the 3 most important responsibilities from the input job posting link
  • Strategically highlights key personal strengths
  • Quantifies past work achievements
  • ATS (Applicant Tracking System) friendly format
  • Customizable resume templates
  • Ready-to-use default resume template style
  • Supports multiple API integrations
  • Supports langsmith observability
  • Generate HTML file and PDF file

Installation

To set up the project locally, follow these steps:

  1. Clone the repository:

    1
    2
    git clone https://github.com/alphaqiu/curriculum_vitae_ai_agent.git
    cd curriculum_vitae_ai_agent
  2. Create a virtual environment:

    1
    python -m venv venv
  3. Activate the virtual environment:

    • On Windows:

      1
      venv\Scripts\activate
    • On macOS and Linux:

      1
      source venv/bin/activate
  4. Install the required packages:

    1
    pip install -r requirements.txt

Usage

Create a .env file in the same directory as main.py. The content of the .env file should be as follows:

1
2
3
4
5
6
7
8
9
10
# if enable dashscope, uncomment the following line
DASHSCOPE_API_KEY="your-aliyun-tongyi-key"

# if enable openai, uncomment the following line
#OPENAI_API_KEY="openai-api-key or deepseek-api-key"

# if enable langsmith, uncomment the following lines
#LANGCHAIN_API_KEY="langchain-api-key"
#LANGCHAIN_TRACING_V2="true"
#LANGCHAIN_ENDPOINT="https://api.smith.langchain.com"

To generate a resume, use the following command:

1
python main.py --link "https://example.com/job-description" --config "./data/config.yml"
  • --link or -l: The URL of the job description (required).
  • --config or -c: Path to the configuration file (optional, defaults to ./data/config.yml).

Generate Example

The resume template has been modified based on
https://github.com/hoochanlon/jsonresume-theme-mix

example

Configuration

The configuration file (config.yml) contains settings for the resume generation process, including model usage and file paths. Customize it according to your needs.

The config.yml file supports the following key parameters:

  • model: Specifies the AI model to use for resume generation

    • Currently supports: qwen-plus (Alibaba Cloud’s Tongyi Qianwen model)
    • This parameter determines which AI service will process your resume
  • task_path: Specifies the directory path for task-related files

    • Default value: ./data
    • This directory contains necessary files like templates, configurations, and resume information

Example configuration:

1
2
model: qwen-plus
task_path: ./data

Resume Information

Resume information file is located at data/info.yml, you can customize it or use the default information.

Basic Profile (resume_profile)

  • name: Your full name
  • gender: Your gender
  • age: Your age
  • email: Contact email address
  • education_background: Brief education summary
  • work_experience: Years of work experience
  • skills: Key technical skills, separated by commas
  • projects: Brief project summary
  • awards: Notable achievements or awards
  • mobile_phone: Contact phone number
  • job_seeking_status: Current employment status
  • desired_positions: List of target positions
    • position: Job title
    • location: Preferred location
    • salary: Expected salary range
    • industry: Target industry
    • employment_type: Type of employment (e.g., full-time, part-time)

Personal Advantages (personal_advantage)

List of key strengths and competencies that make you stand out

Education Details (education_details)

  • school: Institution name
  • major: Field of study
  • degree: Degree type
  • start_date: Start date (YYYY-MM format)
  • graduation_date: End date (YYYY-MM format)
  • school_experience: Notable activities during education
  • graduation_project_topic: Final project title
  • graduation_project_description: Project description

Work Experience (work_experience)

For each position:

  • company: Company name
  • position: Job title
  • start_date: Start date (YYYY-MM format)
  • end_date: End date (YYYY-MM format)
  • work_experience: Overall experience summary
  • work_experience_description: Detailed role description
  • work_experience_achievements: Key accomplishments
  • work_experience_skills: Skills utilized
  • work_experience_projects: Projects involved
  • work_experience_awards: Awards received
  • tags: Relevant skill tags

Project Experience (project_experience)

For each project:

  • project_name: Project title
  • project_role: Your role
  • project_start_date: Start date
  • project_end_date: End date
  • project_description: Project overview
  • project_achievements: Key accomplishments
  • project_link: Project URL (if applicable)
  • project_skills: Technologies used

Certifications (qualification_certificate)

  • certificate_name: Name of certification
  • certificate_date: Date obtained
  • certificate_link: Verification link

Volunteer Experience (volunteer_experience)

  • volunteer_experience_name: Activity name
  • start_date: Start date
  • end_date: End date
  • service_duration: Duration of service
  • volunteer_experience_description: Description of activities

Hobbies and Interests (hobbies_and_interests)

  • hobbies_and_interests_name: Interest category
  • hobbies_and_interests_description: Detailed descriptions

Example:

1
2
3
4
5
6
7
8
9
10
11
resume_profile:
name: 'John Doe'
gender: 'Male'
age: 20
email: '[email protected]'
education_background: 'Shanghai Jiao Tong University'
work_experience: '2 years'
skills: 'Python, Java, C++, Machine Learning, Deep Learning'
# ... other fields ...

# Additional sections follow the same structure as shown above

All fields support both English and Chinese content. Customize the information according to your background and target position.

Resume Template

Resume template file is located at data/resume_templates/resume_template.tpl, you can customize it or use the default template.

The resume template is a simple HTML file that uses Tailwind CSS for styling.

报错 too many open files 大致有以下三种可能 [1] [2] [3]
\1. 操作系统打开的文件句柄数过多(内核的限制)
\2. launchd 对进程进行了限制
\3. shell 对进程进行了限制

内核的限制

整个操作系统可以打开的文件数受内核参数影响,可以通过以下命令查看

1
2
$ sysctl kern.maxfiles
$ sysctl kern.maxfilesperproc

在我电脑上输出如下

1
2
kern.maxfiles: 49152
kern.maxfilesperproc: 42576

好像已经很大了。

如果需要临时修改的话,运行如下命令

1
2
$ sudo sysctl -w kern.maxfiles=20480 # 或其他你选择的数字
$ sudo sysctl -w kern.maxfilesperproc=18000 # 或其他你选择的数字

永久修改,需要在 /etc/sysctl.conf 里加上类似的下述内容

1
2
kern.maxfiles=20480
kern.maxfilesperproc=18000

这个文件可能需要自行创建

launchd 对进程的限制

获取当前的限制:

1
$ launchctl limit maxfiles

输出类似这样:

1
maxfiles    256            unlimited

其中前一个是软限制,后一个是硬件限制。

临时修改:

1
$ sudo launchctl limit maxfiles 65536 200000

系统范围内修改则需要在文件夹 /Library/LaunchDaemons 下创建一个 plist 文件 limit.maxfiles.plist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>limit.maxfiles</string>
<key>ProgramArguments</key>
<array>
<string>launchctl</string>
<string>limit</string>
<string>maxfiles</string>
<string>65536</string>
<string>200000</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>ServiceIPC</key>
<false/>
</dict>
</plist>

修改文件权限

1
2
$ sudo chown root:wheel /Library/LaunchDaemons/limit.maxfiles.plist
$ sudo chmod 644 /Library/LaunchDaemons/limit.maxfiles.plist

载入新设定

1
$ sudo launchctl load -w /Library/LaunchDaemons/limit.maxfiles.plist

shell 的限制

通过下述命令查看现有限制

1
$ ulimit -a

得到如下输出

1
2
3
...
-n: file descriptors 256
...

通过 ulimit -S -n 4096 来修改。如果需要保持修改,可以将这一句命令加入你的 .bash_profile.zshrc 等。

总结

一般来说,修改了上述三个限制,重启一下,这个问题就可以解决了。

在实际操作中,我修改到第二步重启,第三个就自动修改了。

7月5日,FileCoin网络在一段时间内显示的算力增量较前一日相比,下降很明显,全网掉了48P的算力。需要检查全网终止的扇区、故障的扇区和总扇区数的变化。

设想这样一个场景:服务器需要同步时间服务,希望访问若干个时间同步服务,取最快且正确返回结果的服务作为结果。通过Go并行访问这若干个服务器,只取最快返回且正确的,其它结果统统丢弃。下面是这样的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
var (
ntpServers = []string{
"us.pool.ntp.org:123",
"time.cloudflare.com:123",
"time.windows.com:123",
"time.apple.com:123",
"time.google.com:123",
"time.nist.gov:123",
"ntp.ntsc.ac.cn:123",
"time.smg.gov.mo:123",
"stdtime.gov.hk:123",
}
)

func NewGroupVisitor[T any]() *GroupVisitor[T] {
return &GroupVisitor[T]{result: make(chan T)}
}

type GroupVisitor[T any] struct {
wg sync.WaitGroup
doneOnce sync.Once
result chan T
}

func (g *GroupVisitor[T]) Go(f func() (T, string, bool, error)) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if data, host, done, err := f(); done && err == nil {
g.doneOnce.Do(func() {
g.result <- data
})
}
}()
}

func (g *GroupVisitor[T]) Wait(duration time.Duration) (T, error) {
var empty T
select {
case <-time.After(duration):
return empty, errors.New("request timeout")
case ret := <-g.result:
return ret, nil
}
}

func main() {
g := NewGroupVisitor[*time.Time]()
for _, host := range ntpServers {
host := host
g.Go(func() (*time.Time, string, bool, error) {
respTime, err := ntp.Time(host)
if err != nil {
return nil, host, false, err
}

return &respTime, host, true, nil
})
}

clock, err := g.Wait(6 * time.Second)
if err != nil {
return
}
}

公钥直接加密法

我们先来看看如果直接进行数据分享,有没有什么简单的方法。下面以Alice(数据拥有者)向Bob(数据接收者)分享一份文件为例,说明公钥直接加密法。

1、Alice使用Bob的公钥对文件进行加密。

2、将数据分享给Bob。

3、Bob解密文件,获取信息。

这个过程简单而直接,这就是公钥直接加密法。但如果面对使用存储网络进行数据分享的情况公钥直接加密法有没有优势呢?

由于数据存储在存储网络中,且自己的私钥不能泄漏,所以Alice需要先下载文件,解密、重新使用Bob的公钥加密再上传到存储网络中,然后Bob可以从存储网络中获取文件。

还有一个问题,如果Alice有一天又想把数据分享给Julia,那么他需要再重新将上面的操作进行一遍。

分析上面的问题,我们其实只使用了存储网络的存储功能,而没有使用存储网络的计算功能。所以“直接公钥加密”是一种非常笨拙的方法。

代理重加密(Proxy Re-Encryption, 简称PRE)

代理重加密(Proxy Re-Encryption)是一种密码学技术,用于在保持数据的机密性的同时,允许一个代理将加密数据从一个密钥(发送者密钥)转换为另一个密钥(接收者密钥),而无需解密和重新加密数据。

代理重加密的工作方式如下:

  1. 发送者生成一对非对称密钥,包括公钥和私钥。发送者使用公钥将数据进行加密,并将加密后的数据发送给代理。
  2. 代理在接收到加密数据后,使用代理密钥对数据进行重加密。代理密钥是由代理生成的,与发送者和接收者的密钥相关联。重加密的过程允许代理将数据从发送者密钥转换为接收者密钥,而无需解密数据内容。
  3. 接收者使用自己的私钥解密代理重加密后的数据,以获取原始的明文数据。

代理重加密的主要优势在于,它允许数据所有者(发送者)委托一个代理来转发加密数据给接收者,而无需泄露自己的私钥或要求代理解密数据。这种委托可以在不破坏数据机密性的前提下进行,因为代理只是在加密域中进行转换操作,而不需要访问数据的明文内容。

代理重加密在很多场景中都有应用,如安全数据共享、云存储、跨域认证和密钥管理等。它提供了一种灵活且安全的方式,使得数据所有者能够控制数据的访问权限,并在需要时委托代理进行数据的转发和解密操作,同时确保数据的机密性得到保护。

代理重加密的特点

(1).单向性 (Unidirectional): 在代理重加密方案中,如果代理者仅仅能将授权者的密文转化成受理者的密文,而不能将受理者的密文转化成授权者的密文,则称该方案是单向的。反之如果代理者既能够将授权者的密文转换成受理者的密文,也能够将受理者的密文转化成授权者的密文,则该方案称为双向的(bidirectional)。显然在代理重加密方案中,单向代理重加密方案比双向代理重加密方案更优越。首先,在实际应用中,这种授权关系往往不是双向的。其次双向的代理重加密方案可以有两个单向的代理重加密方案构造,反之单向代理重加密则不能由双向单向代理重加密获得。

(2).多用性 (Multi-use): 在单向代理重加密方案中,如果已经被重加密的密文还能被代理者重加密,则称为多用代理重加密。反之,如果已经被加密的密文不能再被代理者重加密,则称为单用 (single-use) 单向代理重加密。

(3).非交互性(Non-interactive): 在单向代理重加密方案中,授权者在产生代理重加密密钥的过程中不需要受理者或者其他第三方的参与,则称为非交互性代理重加密。反之,如果授权者需要与受理者或者其他第三方交互来产生代理重加密密钥,则称为交互式 (Interavtive)代理重加密。

(4).透明性 (Transparent): 在代理重加密方案中,如果代理者是透明的,即发送者和受理者都不知道代理者的存在,则称该代理重加密方案是透明的。在透明代理重加密方案中,重加密后发送给受理者的密文与原本就发送给受理者的密文是不可区分的。

(5).密钥最优(Key optimal): 在代理重加密方案中,如果受理者所需要存储的私钥是固定的,并不因为该受理者可能是多个授权者的受理者而增加。则称该代理重加密方案是密钥最优的。

(6).非传递性(Non-transitive): 在代理重加密方案中,如果代理者拥有将 Alice 的密文转化成 Bob 的密文的代理重加密密钥rka-B和将 Bob 的密文转化成 Carlo 的密文的代理重加密钥rkB-C,但是代理者不能够自己计算出将 Alice 的密文转化成 Carlo 的密文的代理重加密密钥rkA-C,则该代理重加密方案是非传递性的。

(7).非转移性(Non-transferable): 在代理重加密方案中,如果代理者拥有将 Alice 的密文转化成 Bob 的密文的代理重加密密钥rkA-B,即使代理者和受理者合谋,也不能够获得将Alice 的密文转化成 Carlo 的密文的代理重加密密钥rkA-C,则该代理重加密方案是非转移性的注意,非传递性指代理者自己不能生成新的代理重加密密钥,而非转移性指,代理者不能和受理者合谋生成新的代理重加密密钥。

(8).临时性 (Temporary): 在代理重加密方案中,授权者授权代理者重加密自己的密钥,这种授权关系是只是在一定的时间范围内有效,或者授权者可以通过某种机制去撤销这种授权关系,则称该代理重加密方案是临时的。

(9).抗合谋 (Collusion resistant): 在代理重加密方案中,即使代理者和授权者合谋,也不能够获得授权者的私钥,则称该代理重加密方案是抗合谋的。显然如果一个代理重加密方案是非转移的,则该代理重加密方案是抗合谋的,反之则不然。

(10)源隐藏(Source hiding): 在代理重加密方案中,如果攻击者 (包括受理者) 无法确定某个原始密文是否是某个重加密密文的原密文,则称该代理重加密方案是源隐藏的。

代理重加密的一种方案

本文涉及的代理重加密,并不会对要加密的原文进行二次加密,而是对原文进行加密的密码进行加密。参与重加密的服务器无法得知加密密码,因此密文可以保存在服务器端存储,且不会泄露任何信息。服务器对加密后的密码重加密,授权一侧的用户通过自身的私钥进行密码解析,从而对密文进行反加密。

常用符号

表示大素数,表示集合{}

G表示阶数为素数的有限域上的循环群,g表示群G的生成元。

表示元素s是从集合S中随机选取。

算法逻辑

Alice 和Bob 分别有属于自己的公私钥对。

Alice (, , 其中 , 是私钥,从大素数有限域中随机挑选的一个数,是公钥)。

Bob(, 其中, 是公钥)。

一、Alice 使用自己的公钥创造一个加密密码和加密密码胶囊(胶囊中有加密密码由本次执行所创造的证明)

h is a function that calculate hash on the curve

加密密码胶囊

计算S采用 Schnorr 签名方案

加密密码

二、Alice为Bob授权,生成授权公钥和提供给服务器侧的重加密密钥

由于authKey是授权时生成的授权密钥,d值也是映射到椭圆曲线上的点,当alice的私钥和d值进行乘法计算,得到的结果将隐藏Alice私钥的细节。authKey的将被丢弃,对外只输出pubAuthKey,想要还原的值,只有Bob的私钥+pubAuthKey才能做到。

给到服务器的输出是pubAuthKey。服务器无从知晓authKey,所以无法推得d也就无法知晓

三、服务器端进行重加密

检查胶囊中E,V,S是否能够通过验证

根据素数有限域群的交换律,分别计算$gsV+Eh(E||V)$在椭圆曲线上的点是否重合,如果相等,证明本次胶囊有效,进行服务端重加密。

四、Bob 获取加密密码,需要 这三样东西。

五、Alice自己也可以根据胶囊信息恢复出加密密码。

根据第四步骤中推演的结果

参考

使用代理重加密+PlatONE,来保证数据可信、安全地共享

代理重加密若干问题研究

Proxy Re-Encryption — Allowing Alice To Share Her Protect Secret Key With Bob

Improved Proxy Re-Encryption Schemes with Applications to Secure Distributed Storage

代理重加密综述

代理重加密

PostgreSQL 是一个现代化的数据库,开源,免费试用,社区活跃,和Oracle具有同等强大的功能。作为一款高度可扩展的企业级关系型数据库管理系统,其内置的分区表功能在处理大规模数据场景中扮演着重要角色。尤其是自动表分区技术,通过自动化管理分区,极大地简化了分区表的创建和维护过程,提高了数据库的性能和管理效率。支持JSON格式和自定义类型,支持并行查询,聚合函数、窗口函数和数据透视表非常适合作为统计数据的数据源。

合理规划数据是作为数据库使用的重要手段。目前Filscan作为首款后端使用PostgreSQL数据库的项目,目前已经支撑了亿级以上的Filecoin庞大数据量,后续随着Filecoin链继续发展,数据还将继续膨胀。

Filecoin的状态数据的数量非常庞大,如果在一张表内保存,会非常影响查询效率。为达到保存一段时间内的数据,且可以根据时间范围伸缩,提高查询效率,需要对巨量数据表进行表分区。让数据库按需自动的创建新的表来存放按照规则划分的数据,是可以提高自动化维护表程度,减少人为的干扰和出错的可能。

表分区方案

表分区方案是一直存在的,有三种方式:

预先创建一批分区表

这种方式最大的弊端是需要定期的创建一批新分区表以适应新数据范围分区,其次分区表不能按实际数据区间创建分区表,所有的数据范围都必须紧密存在,才不会漏存对应的数据到分区中。定期创建分区是容易被忽视的一个环节,一旦忘记定期创建,尤其是定期周期较长,就会导致数据丢失。如果有默认分区存在,新表记录会被保存到默认分区内,对后续可持续维护带来了额外的停机时间开销。

使用程序对表进行分区

这种方式没有问题,不过需要做一些额外适配工作,以适应不同的分区表的动态创建,分区表逻辑的更新依赖程序的更新。而基于数据库实现自动表分区,只需要即时更新存储过程或函数就可立即生效,也能够享受数据库自身的特性带来的便捷性,减少开发周期,降低维护成本。

第三方插件

pg_pathman 是一个PostgreSQL数据库的第三方插件。该插件的好处是封装实现了一套维护分区表的完整操作。阿里云RDS PostgreSQL版本支持安装该插件,下面是阿里云RDS各个PostgreSQL版本所支持该插件的情况。

插件名 14 13 12 11 10 9.4 描述
pg_pathman 1.5 1.5 1.5 1.5 高性能分区表插件

https://help.aliyun.com/document_detail/142340.html

这里有关于该插件的全部用法:https://help.aliyun.com/document_detail/140900.html

但是该插件官网做了以下提示:

注意:该项目不再处于开发阶段

pg_pathman支持 Postgres 版本 [9.5..13],但很可能不会移植到 14 及更高版本。声明式分区现在已经相当成熟,几乎所有的东西都在pg_pathman; 我们鼓励用户切换到它。我们仍在维护该项目(修复受支持版本中的错误),但这里不会发生新的开发。

这个项目官方预计停止后续的支持,目前处于维护阶段,提示官方声明式分区版本已经做得足够好,可以不需要依赖第三方插件。从这点上说,选择声明式分区无疑是最优的选择,不会因为服务器版本升级而需要额外升级依赖第三方实现。

声明式分区(DDL-Partition)

PostgreSQL提供了一种方法指定如何把一个表划分成称为分区的片段。被划分的表被称作分区表

表分区是解决一些因单表过大引用的性能问题的方式,比如某张表过大就会造成查询变慢,可能分区是一种解决方案。一般建议当单表大小超过内存就可以考虑表分区了。

PostgreSQL表分区有两种实现方式,声明式分区和表继承。它们各自有优缺点,比如,对声明式分区来说,分区必须具有和分区表正好相同的列集合,而在表继承中,子表可以有父表中没有出现过的额外列。除此之外,声明式分区也有着优秀的特点,可以减轻对分区表的维护代价、提供了操作的便捷性和查询性能,使用表继承将要做更多的工作才能达到同样的效果。

特点

默认分区

PosgtreSQL 11开始,支持为声明式分区表创建一个默认(DEFAULT)的分区,用于存储无法匹配其他任何分区的数据。只有 RANGE 分区表和 LIST 分区表需要默认分区。创建默认分区时使用 DEFAULT 子句替代 FOR VALUES 子句。

1
2
3
4
5
6
7
8
9
CREATE TABLE example_table(
epoch BIGINT NOT NULL ,
data text
) PARTITION BY RANGE (epoch);

CREATE TABLE example_table_1_100 PARTITION OF example_table FOR VALUES FROM (MINVALUE) TO (101);
CREATE TABLE example_table_101_200 PARTITION OF example_table FOR VALUES FROM (101) TO (201);
-- 创建默认分区时使用 DEFAULT 子句替代 FOR VALUES 子句。
CREATE TABLE example_table_default PARTITION OF example_table DEFAULT ;

如果没有创建默认分区,在插入epoch=201时,会报错,提示插入错误。

1
2
3
-- [23514] 错误: 没有为行找到关系"example_table"的分区
-- 详细:失败行的分区键包含(epoch) = (201).
INSERT INTO example_table(epoch, data) VALUES (201, 'good');

默认分区存在以下限制:

  • 一个分区表只能拥有一个 DEFAULT 分区;
  • 对于已经存储在 DEFAULT 分区中的数据,不能再创建相应的分区;参见下文示例;
  • 如果将已有的表挂载为 DEFAULT 分区,将会检查该表中的所有数据;如果在已有的分区中存在相同的数据,将会产生一个错误;
  • 哈希分区表不支持 DEFAULT 分区,实际上也不需要支持。

以列表分区为例:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE example_table_list(
miner varchar(100) NOT NULL ,
epoch BIGINT NOT NULL ,
data varchar(50)
) PARTITION BY LIST (miner);

CREATE TABLE example_table_list_02438 PARTITION OF example_table_list FOR VALUES IN ('f02438');
CREATE TABLE example_table_list_025002_025005 PARTITION OF example_table_list FOR VALUES IN ('f025002', 'f025005');
CREATE TABLE example_table_list_default PARTITION OF example_table_list DEFAULT ;

INSERT INTO example_table_list VALUES ('f03721', 98, 'Green');

如果此时未创建对f03721的分区表,上面的语句会插入到example_table_list_default表内。如果此时开始创建

1
CREATE TABLE example_table_list_03721 PARTITION OF example_table_list FOR VALUES IN ('f03721');

操作将会失败。因为添加新的分区需要修改默认分区的范围。但是默认分区中已经存在f02731的分区数据项。

错误: 某些行将违反默认分区”example_table_list_default”的更新分区约束

为了解决这个问题,可以先将默认分区从分区表中卸载(DETACH PARTITION),创建新的分区,将默认分区中的相应的数据移动到新的分区,最后重新挂载默认分区。

1
2
3
4
5
6
7
8
9
10
11
-- 临时卸载默认分区
ALTER TABLE example_table_list DETACH PARTITION example_table_list_default;
-- 创建一个新的列表分区表
CREATE TABLE example_table_list_03721 PARTITION OF example_table_list FOR VALUES IN ('f03721');
-- 查询默认分区表内的对应数据并插入到新分区表内
INSERT INTO example_table_list_03721
SELECT * FROM example_table_list_default WHERE miner = 'f03721';
-- 删除默认分区表内的数据
DELETE FROM example_table_list_default WHERE miner = 'f03721';
-- 重新挂载默认分区表
ALTER TABLE example_table_list ATTACH PARTITION example_table_list_default DEFAULT ;

分区自动索引

在 PostgreSQL10 中,分区上的索引需要基于各个分区手动创建,而不能基于分区的父表创建索引。PostgreSQL 11 及以后的版本,声明式分区表可以基于分区表创建索引。分区表上的索引并不会创建一个物理上的索引,而是为每个分区上的索引创建一个模板。表继承方式的分区表需要为每一张分区表创建索引和约束。

如果在声明式分区表上创建了一个索引,PostgreSQL 自动为每个分区创建具有相同属性的索引。

1
2
3
4
5
-- 在主表上创建索引,会在分表上自动创建对应的索引。
-- 在创建新的分区表时,也会自动创建好对应的索引。
CREATE UNIQUE INDEX example_table_epoch ON example_table(epoch DESC);
-- 在主表上执行删除索引时,也会自动在分区表上删除对应的索引。
DROP INDEX example_table_epoch;
  • 自动创建的索引,名称按照 “{partition name}{column name}_idx” 的模式定义。多个字段的复合索引使用下划线()连接字段名称。如果索引名称已经存在,在名称的最后添加一个数字。如果名称过长,使用缩写。

  • 随后新增的分区或者通过 ATTACH PARTITION 挂载的分区都会自动创建相应的索引。

  • 自动创建的索引不能单独删除,可以通过分区表统一删除。

跨分区移动数据

PostgreSQL 10 中,声明式分区表如果 UPDATE 语句修改了分区字段的值,导致数据需要移动到其他分区时,语句将会失败。PostgreSQL 11+ 以后能够正确处理更新分区字段的操作。根据提交记录,这种 UPDATE 语句实际上分为两步执行:从旧的分区中 DELETE相应记录,在新的分区中 INSERT相应记录。另外,跨分区移动数据的 UPDATE 语句将会导致触发器的执行顺序更加复杂。

限制

分区表有下列限制:

  • 没有办法创建跨越所有分区的排除约束,只可能单个约束每个叶子分区。
  • 分区表上的惟一约束(也就是主键)必须包括所有分区键列。存在此限制是因为PostgreSQL只能每个分区中分别强制实施唯一性。
  • BEFORE ROW 触发器无法更改哪个分区是新行的最终目标。
  • 不允许在同一个分区树中混杂临时关系和持久关系。因此,如果分区表是持久的,则其分区也必须是持久的,反之亦然。在使用临时关系时,分区数的所有成员都必须来自于同一个会话。

执行流程

厘清执行流程有助于从宏观上理解在哪个步骤中适合实现自动创建分区表。从数据库提供的机制上,有存储过程和触发器两种武器,分别可以实现在不同阶段处理不同拦截逻辑。

对于声明式分区表,所有被插入的行将被基于分区键的值,路由到分区中。每个分区都有一个由其分区边界定义的数据子集。当前支持的分区方法是范围、列表以及哈希。也就是说分区表的插入/更新操作,又一个内置的路由规则。

根据定义,结合对表的插入/更新操作以及触发器流程,一条数据在插入到分区表时,将经历以下步骤:

  1. 触发分区主表的statement before insert
  2. 自动路由到符合范围的分区子表
  3. 触发符合范围的分区子表row before insert
  4. 数据插入
  5. 触发符合范围的分区子表row after insert
  6. 触发分区主表的statement after insert

下面验证一下上述流程。

触发器行为

创建分区表

举个例子,创建一张测试分区表,并创建三个分区,不创建默认分区。由于默认分区的限制,对于已经存储在 DEFAULT 分区中的数据,不能再创建相应的分区,需要先DETACH默认分区手动做数据转移,然后才可以创建新范围分区,这个步骤无法实现自动创建。

1
2
3
4
5
DROP TABLE example_table CASCADE ;
CREATE TABLE example_table(col1 INT) PARTITION BY RANGE (col1);
CREATE TABLE example_table_1 PARTITION OF example_table FOR VALUES FROM (0) TO (10);
CREATE TABLE example_table_2 PARTITION OF example_table FOR VALUES FROM (11) TO (20);
CREATE TABLE example_table_3 PARTITION OF example_table FOR VALUES FROM (21) TO (30);

创建触发器函数

创建一个用于观察触发器触发阶段的函数

1
2
3
4
5
6
7
8
CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS
$$
BEGIN
RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%',
tg_name, tg_table_name, tg_level, tg_op, tg_when, new;
RETURN new;
END
$$ LANGUAGE plpgsql;

创建触发器

在分区表主表example_table上绑定测试分区表INSERT和UPDATE触发器,分别是语句级别和行级别,与before 和after的组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
CREATE TRIGGER show_notice_before_insert_stmt
BEFORE INSERT
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_insert_stmt
AFTER INSERT
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_before_insert_row
BEFORE INSERT
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_insert_row
AFTER INSERT
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();

CREATE TRIGGER show_notice_before_update_stmt
BEFORE UPDATE
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_update_stmt
AFTER UPDATE
ON example_table
FOR STATEMENT
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_before_update_row
BEFORE UPDATE
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();
CREATE TRIGGER show_notice_after_update_row
AFTER UPDATE
ON example_table
FOR ROW
EXECUTE FUNCTION notice_trigger();

通过观察表结构定义我们可以看到,主表上创建出8个触发器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+---------+--------------+-------------
col1 | integer | | | | plain | |
Partition key: RANGE (col1)
Triggers:
show_notice_after_insert_row AFTER INSERT ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_after_insert_stmt AFTER INSERT ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
show_notice_after_update_row AFTER UPDATE ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_after_update_stmt AFTER UPDATE ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
show_notice_before_insert_row BEFORE INSERT ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_before_insert_stmt BEFORE INSERT ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
show_notice_before_update_row BEFORE UPDATE ON example_table FOR EACH ROW EXECUTE FUNCTION notice_trigger()
show_notice_before_update_stmt BEFORE UPDATE ON example_table FOR EACH STATEMENT EXECUTE FUNCTION notice_trigger()
Partitions: example_table_1 FOR VALUES FROM (0) TO (10),
example_table_2 FOR VALUES FROM (11) TO (20),
example_table_3 FOR VALUES FROM (21) TO (30)

分区表上也自动创建出了4个行级触发器。

1
2
3
4
5
6
7
8
9
10
11
example=# \d example_table_1
Table "public.example_table_1"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
col1 | integer | | |
Partition of: example_table FOR VALUES FROM (0) TO (10)
Triggers:
show_notice_after_insert_row AFTER INSERT ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table
show_notice_after_update_row AFTER UPDATE ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table
show_notice_before_insert_row BEFORE INSERT ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table
show_notice_before_update_row BEFORE UPDATE ON example_table_1 FOR EACH ROW EXECUTE FUNCTION notice_trigger(), ON TABLE example_table

分区表主表类型为(partitioned table),分区表类型为(table)。

  • 在主表上创建触发器,会为每一个子表创建触发器。
  • FOR EACH STATEMENT 只会在主表上创建
  • FOR EACH ROW 才会为每一个子表创建(如下查询结果)。

插入一条数据

1
2
3
4
5
6
7
8
INSERT INTO example_table VALUES (3);

-- Output
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>
[00000] trigger name:show_notice_before_insert_row; table_name:example_table_1; level:ROW; op:INSERT; when:BEFORE; value:(3)
[00000] trigger name:show_notice_after_insert_row; table_name:example_table_1; level:ROW; op:INSERT; when:AFTER; value:(3)
[00000] trigger name:show_notice_after_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:AFTER; value:<NULL>
1 row affected in 60 ms

分区表INSERT触发器的执行顺序:

  1. 主表example_table statement 级别 INSERT BEFOER触发器
  2. 主表example_table statement 级别 INSERT AFTER 触发器
  3. 分区表example_table_1 行级别 INSERT BEFORE 触发器
  4. 分区表example_table_1 行级别 INSERT AFTER 触发器

插入不存在分区的数据

1
2
3
4
5
6
INSERT INTO example_table VALUES (32);

-- Output
[23514] 错误: 没有为行找到关系"example_table"的分区
详细:失败行的分区键包含(col1) = (32).
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>

插入一条不存在分区的数据时,会导致插入错误,提示没找到分区。分区表INSERT触发器的执行顺序:

  1. 主表example_table statement 级别 INSERT BEFORE 触发器
  2. 主表example_table statement 级别 INSERT AFTER 触发器

修改触发器函数

修改触发器函数notice_trigger使它拥有创建表的能力。以下代码仅供测试,认为新插入数据时为不存在分区范围的数据,且分区列数值在31-40之间。并定义触发器在行级、插入前触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS
$$
BEGIN
RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%',
tg_name, tg_table_name, tg_level, tg_op, tg_when, new;
IF tg_level = 'ROW' AND tg_op = 'INSERT' AND tg_when = 'BEFORE' THEN
CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31) TO (40);
INSERT INTO example_table_4 SELECT new.*;
RETURN NULL;
END IF;

RETURN new;
END
$$ LANGUAGE plpgsql;

再次执行数据插入

1
2
3
4
5
6
INSERT INTO example_table VALUES (32);

-- Output
[23514] 错误: 没有为行找到关系"example_table"的分区
详细:失败行的分区键包含(col1) = (32).
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>

主表上由于分区表的默认插入行为,并没有触发行级触发器,并由于自动路由,错误提示找不到关系分区报错退出。

再次修改触发器函数

这次计划通过语句级别INSERT BEFORE触发器来创建新分区表。

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE OR REPLACE FUNCTION notice_trigger() RETURNS TRIGGER AS
$$
BEGIN
RAISE NOTICE 'trigger name:%; table_name:%; level:%; op:%; when:%; value:%',
tg_name, tg_table_name, tg_level, tg_op, tg_when, new;
IF tg_level = 'STATEMENT' AND tg_op = 'INSERT' AND tg_when = 'BEFORE' THEN
CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31) TO (40);
RETURN NULL;
END IF;

RETURN new;
END
$$ LANGUAGE plpgsql;

再次尝试插入

1
2
3
4
5
6
7
INSERT INTO example_table VALUES (32);

-- Output
[55006] 错误: 无法CREATE TABLE .. PARTITION OF "example_table" 因为它正在被这个会话中的活动查询使用
在位置:SQL 语句 "CREATE TABLE example_table_4 PARTITION OF example_table FOR VALUES FROM (31) TO (40)"
SQL语句的第6行的PL/pgSQL函数notice_trigger()
[00000] trigger name:show_notice_before_insert_stmt; table_name:example_table; level:STATEMENT; op:INSERT; when:BEFORE; value:<NULL>

从上得知,BEFORE ROW 触发器无法更改哪个分区是新行的最终目标。

自动路由流程

从以上动作得知:

  • 分区表主表只会触发语句级别的触发器
  • 分区表只会触发行级触发器
  • 数据库会对插入和更新动作自动做路由规则处理

自动路由处理流程

整个插入过程在一个事务内,在插入开始期间,会收集整个分区表信息;声明式分区表有一个内置的路由行为,该行为发生在语句级别before触发器之后。它会根据分区列将实际的INSERT/UPDATE动作定向到分区中进行插入和更新。此时主分区表不会触发它的行级触发器,而是触发分区表中的行级触发器,且不会触发分区表的语句级触发器。无法在语句级别动态创建新分区,到达自动路由阶段,对不存在分区的插入会导致插入失败,不再执行后续动作。

规则系统

考虑到高可维护性,简便易行和最大程度使用数据库的特点,最终考虑使用声明式分区来实现自动表分区。

既然无法使用触发器达到在插入数据时,自动创建不存在范围的分区表的行为,有没有办法不通过触发器,而通过数据库自身的特性达到同样的目的呢?

答案是有的。经过调研,PostgreSQL数据库支持重写规则(RULE)。

用法

1
2
3
4
5
6
7
CREATE [ OR REPLACE ] RULE name AS ON event
TO table_name [ WHERE condition ]
DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }

其中 event 可以是以下之一:

SELECT | INSERT | UPDATE | DELETE

PostgreSQL规则系统允许我们定义 针对数据库表中插入、更新或者删除动作上的替代动作。大约来说,当在 一个给定表上执行给定命令时,一条规则会导致执行额外的命令。或者, INSTEAD规则可以用另一个命令替换给定的命令,或者导致一个命令根本不被执行。

使用规则来改变插入的流程,绕开分区表触发器的执行顺序限制,直接插入到目标表内。

原理是使用规则重定向插入,将INSERT/UPDATE操作替换成用函数来插入。函数插入时判断是否报错了,如果报错,新增表。

设计流程

在分区表主表上创建两个规则,分别是INSERT和 UPDATE,替换数据库向主表默认插入和更新操作。此时的触发器和流程从下图左边变成了右边的流程,规则接管默认插入机制,在插入数据前做一层条件判断,达到自动创建分区的目的。由于规则系统仅支持INSERT/UPDATE/SELECT/DELETE操作,因此需要编写一个函数来托管整个过程。

重写规则

在这个大框架下还需要解决自动化创建表时一致性行为,以及接管一些数据库默认插入/更新时的功能:

  • 表分区名称约定
  • 结合Filecoin业务,比如根据区块高度对数据分区,对高度计算得到分区表后缀
  • 获取分区表名称是否存在
  • 唯一键冲突
  • 分区移动

函数(存储过程)

表分区名称约定

表名称约定

通常,对表的数据进行分区,对数据行记录时间进行划分。以此为基础便于以时间范围查询,因此分区表将设置成主表+后缀名的方式命名。后缀名定义以下4种按时间划分的分区规则:

  1. 按天分区 (分区表命名规则:table_name_2022_02_18_1560240_1563119)
  2. 按周分区 (分区表命名规则:table_name_2022_w7_1548720_1568879)
  3. 按月分区 (分区表命名规则:table_name_2022_02_1560240_1563119)
  4. 按年分区 (分区表命名规则:table_name_2022_1560240_1563119)

以上分区规则统一按照北京时间为基准,以一天的零点作为开始,23点59分59秒作为结束。

按天分区

分区表命名规则为 主表____起始epoch_终止epoch

其中月份为两位数,不足两位前缀补0。

按周分区

分区表命名规则为 主表__w全年周数_起始epoch_终止epoch

按月分区

分区表命名规则为 主表___起始epoch_终止epoch

其中月份为两位数,不足两位前缀补0。

按年分区

分区表命名规则为 主表__起始epoch_终止epoch

以下内容为实现分区表命名规则而设立的函数,执行函数可以计算得到上述规则后缀和分区范围。

分区范围类型定义

定义数据库的类型,拥有以下6个字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
DROP TYPE IF EXISTS PARTITION_TABLE_RANGE_DEFINITION CASCADE ;
-- 创建统一的分区规则类型,定义后缀字段、触发查询时的日期、
-- 分区的起始时间(起始epoch)和终止时间(终止epoch)
-- 终止epoch与终止时间不是一一对应,终止epoch通常是23:59:30。链的高度每30秒变更一次。
CREATE TYPE PARTITION_TABLE_RANGE_DEFINITION AS
(
suffix VARCHAR,
hit_date DATE,
begin_epoch BIGINT,
begin_time TIMESTAMPTZ,
end_epoch BIGINT,
end_time TIMESTAMPTZ
);
分区范围字段 说明
suffix 分区表的后缀名称,配合主表名+后缀定义出分区表名称
hit_date 查询分区范围定义的日期
begin_epoch 分区表开始高度(Filecoin epoch, 下同)
begin_time 分区表开始时间(参考,分区以begin_epoch为主,下同)
end_epoch 分区表结束高度
end_time 分区表结束时间

分区范围实现

注意!分区表FROM (MINVALUE) TO (101) 是前闭后开。也就是插入的分区列应满足 epoch ∈[MINVALUE, 101)

1
2
3
4
5
6
7
CREATE TABLE test_part_table(col1 INT) PARTITION BY RANGE (col1);
CREATE TABLE test_part_table_1 PARTITION OF test_part_table FOR VALUES FROM (0) TO (10);
CREATE TABLE test_part_table_2 PARTITION OF test_part_table FOR VALUES FROM (10) TO (20);
INSERT INTO test_part_table SELECT i FROM generate_series(0, 10) g(i);

-- 插入到 test_part_table_1 的值 从 0-9;
-- 插入到 test_part_table_2 的值 为10.

按天分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
-- 按日分区,返回的分区表的表后缀规则为 '年_月_日'_开始epoch_结束epoch 
-- (例如:table_name_2022_02_18_1560240_1563119)
-- 时间范围 一天的的00:00:00 ~ 一天的23:59:59
-- 高度范围 一天日期的00:00:00 ~ 一天日期的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_day(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
"year" INT = extract(YEAR FROM current_ts);
"month" INT = extract(MONTHS FROM current_ts);
"day" INT = extract(DAY FROM current_ts);
begin_time TIMESTAMPTZ = date_trunc('day', current_ts);
begin_epoch BIGINT = calc_epoch_by_ts(begin_time);
day_interval INTERVAL = INTERVAL '1 DAY' - INTERVAL '1 SECOND';
end_time TIMESTAMPTZ = begin_time + day_interval;
end_epoch BIGINT = calc_epoch_by_ts(end_time + INTERVAL '1 SECOND');
mm_month VARCHAR = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
begin_time = startup_time;
begin_epoch = 0;
"year" = extract(YEAR FROM begin_time);
"month" = extract(MONTHS FROM begin_time);
"day" = extract(DAY FROM begin_time);
end_time = date_trunc('day', begin_time) + day_interval;
end_epoch = calc_epoch_by_ts(end_time + INTERVAL '1 SECOND');
mm_month = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END);
END IF;
RAISE NOTICE '[calc_partition_range_by_day]current_ts: %', current_ts;
RETURN QUERY SELECT concat_ws('_', "year", mm_month, "day", begin_epoch, end_epoch)::VARCHAR,
current_ts::DATE,
begin_epoch,
begin_time,
end_epoch,
end_time;
END;
$$ LANGUAGE plpgsql;

按周分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-- 按周分区,返回的分区表的表后缀规则为 '年份'_w'第n周'_开始epoch_结束epoch 
-- (例如:table_name_2022_w7_1548720_1568879)
-- 时间范围 周一的00:00:00 ~ 周日的23:59:59
-- 高度范围 周一日期的00:00:00 ~ 周日的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_week_of_year(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
week_of_year INT = extract(WEEK FROM current_ts);
iso_year INT = extract(ISOYEAR FROM current_ts);
begin_of_week TIMESTAMPTZ = date_trunc('WEEK', current_ts);
begin_epoch BIGINT = calc_epoch_by_ts(begin_of_week);
week_interval INTERVAL = INTERVAL '7 DAYS';
end_of_week TIMESTAMPTZ = begin_of_week + week_interval;
end_epoch BIGINT = calc_epoch_by_ts(end_of_week);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
begin_of_week = startup_time;
begin_epoch = 0;
week_of_year = extract(WEEK FROM begin_of_week);
iso_year = extract(ISOYEAR FROM begin_of_week);
end_of_week = date_trunc('WEEK', begin_of_week) + week_interval;
end_epoch = calc_epoch_by_ts(end_of_week);
END IF;
RETURN QUERY SELECT (concat_ws('_', iso_year, concat('w', week_of_year), begin_epoch, end_epoch))::VARCHAR,
current_ts::DATE,
begin_epoch,
begin_of_week,
end_epoch,
end_of_week;
END
$$ LANGUAGE plpgsql;

按月分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- 按月分区,返回的分区表的表后缀规则为 '年_月'_开始epoch_结束epoch 
-- (例如:table_name_2022_02_1560240_1563119)
-- 时间范围 每个月的第一天的的00:00:00 ~ 每个月的最后一天的23:59:59
-- 高度范围 每个月的第一天日期的00:00:00 ~ 每个月的最后一天日期的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_month(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
begin_time TIMESTAMPTZ = date_trunc('month', current_ts);
"year" INT = extract(YEAR FROM begin_time);
"month" INT = extract(MONTHS FROM begin_time);
begin_epoch BIGINT = calc_epoch_by_ts(begin_time);
month_interval INTERVAL = INTERVAL '1 month';
end_time TIMESTAMPTZ = begin_time + month_interval;
end_epoch BIGINT = calc_epoch_by_ts(end_time);
mm_month VARCHAR = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
begin_time = startup_time;
begin_epoch = 0;
"year" = extract(YEAR FROM begin_time);
"month" = extract(MONTHS FROM begin_time);
end_time = date_trunc('month', begin_time) + month_interval;
end_epoch = calc_epoch_by_ts(end_time);
mm_month = (CASE WHEN "month" < 10 THEN '0' || "month" ELSE '' || "month" END)::varchar;
END IF;
RETURN QUERY SELECT (concat_ws('_', "year", mm_month, begin_epoch, end_epoch))::VARCHAR,
current_ts::DATE,
begin_epoch,
begin_time,
end_epoch,
end_time;
END;
$$ LANGUAGE plpgsql;

按年分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
-- 按年分区,返回的分区表的表后缀规则为 '年'_开始epoch_结束epoch 
-- (例如:table_name_2022_1560240_1563119)
-- 时间范围 每年的第一天的的00:00:00 ~ 每年的最后一天的23:59:59
-- 高度范围 每年的第一天日期的00:00:00 ~ 每年的最后一天日期的23:59:30
CREATE OR REPLACE FUNCTION calc_partition_range_by_year(current_ts TIMESTAMPTZ = current_timestamp)
RETURNS SETOF PARTITION_TABLE_RANGE_DEFINITION
IMMUTABLE STRICT
AS
$$
DECLARE
startup_time timestamptz = '2020-08-25T06:00:00+08:00'::timestamptz;
first_day_of_year TIMESTAMPTZ = date_trunc('year', current_ts);
year_interval INTERVAL = INTERVAL '1 year';
last_day_of_year TIMESTAMPTZ = first_day_of_year + year_interval;
begin_epoch BIGINT = calc_epoch_by_ts(first_day_of_year);
end_epoch BIGINT = calc_epoch_by_ts(last_day_of_year);
"year" INT = extract(YEAR FROM first_day_of_year);
BEGIN
-- 考虑边界条件,当计算得出的起始高度小于0,说明时间已经小于主网上线时间。
IF begin_epoch < 0 THEN
first_day_of_year = startup_time;
begin_epoch = 0;
last_day_of_year = date_trunc('year', first_day_of_year) + year_interval;
end_epoch = calc_epoch_by_ts(last_day_of_year);
"year" = extract(YEAR FROM first_day_of_year);
END IF;
RETURN QUERY SELECT (concat_ws('_', "year", begin_epoch, end_epoch))::VARCHAR,
current_ts::DATE,
begin_epoch,
first_day_of_year,
end_epoch,
last_day_of_year;
END;
$$ LANGUAGE plpgsql;

区块高度与时间

2020-08-25T06:00:00+08:00 是Filecoin 上线时间,每30秒一个纪元(高度)。通过与上线时间与高度换算的相对秒数,求得高度对应的时间。反之亦然。其计算方式如下:

高度转换时间

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 根据当前时间计算出对应的FileCoin高度,如果不传高度,默认高度为0,即链的起始高度。
CREATE OR REPLACE FUNCTION calc_timestamp_by_epoch(height BIGINT = 0) RETURNS TIMESTAMP WITH TIME ZONE
IMMUTABLE STRICT
LANGUAGE plpgsql
AS
$$
DECLARE
baseEpoch INTEGER = extract(EPOCH FROM TIMESTAMPTZ '2020-08-25T06:00:00+08:00')::INTEGER;
BEGIN
-- 根据高度计算时间
RETURN to_timestamp(baseEpoch + height * 30);
END
$$;

时间转换高度

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 根据FileCoin epoch计算出对应的时间,传入的参数如不填,默认当前时间
CREATE OR REPLACE FUNCTION calc_epoch_by_ts(ts timestamp with time zone = current_timestamp) RETURNS bigint
IMMUTABLE STRICT
LANGUAGE plpgsql
AS
$$
DECLARE
baseTime TIMESTAMP = TIMESTAMPTZ '2020-08-25T06:00:00+08:00';
BEGIN
-- 根据时间计算高度
RETURN extract(EPOCH FROM (ts - baseTime))::INTEGER / 30;
END
$$;

获取分区名称

通过系统数据库(pg_catalog)中的表获得分区表名称。如果不存在分区表,返回NULL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE OR REPLACE FUNCTION
pg_get_part_table_name(main_table VARCHAR, suffix VARCHAR)
RETURNS VARCHAR
IMMUTABLE
AS
$$
DECLARE
part_table_name VARCHAR;
BEGIN
WITH o AS (SELECT inhrelid FROM pg_inherits WHERE inhparent = (concat_ws('.','public', main_table))::REGCLASS)
SELECT relname
FROM pg_class
WHERE oid IN (SELECT * FROM o)
AND relname = concat_ws('_', main_table, suffix)
INTO part_table_name;
RETURN part_table_name;
END
$$ LANGUAGE plpgsql;

获取分区表分区键(废弃⚠️ at 2022-03-25)

这个函数用来插入时作为更新键,但实际作为更新键通常是唯一键。唯一键可以是复合键。该函数不适用。

pg_get_partkeydef PostgreSQL 内置函数,位于pg_catalog 数据库中,用于提取分区表的分区键定义。strpossubstr函数同样位于pg_catalog数据库中定义,这两个函数参数用法与C语言相同。

通过pg_get_partkeydef函数返回的结果如下:

1
2
select pg_get_partkeydef('test'::regclass);
-- Output: RANGE (epoch)

使用strpossubstr得到上述返回结果定义列名称。完整实现如下:

通过调用pg_get_part_table_range_key 输入分区表(主表)名称转换成oid,返回分区键名称。非分区表返回结果为NULL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE OR REPLACE FUNCTION pg_get_part_table_range_key(oid REGCLASS)
RETURNS VARCHAR
IMMUTABLE STRICT
AS
$$
DECLARE
part_key_def TEXT = pg_get_partkeydef(oid); -- Output: RANGE (epoch)
begin_position INT = strpos(part_key_def, '(') + 1;
def_len INT = length(part_key_def);
-- substr(text, int, int) 第二个参数代表位置,第三个参数代表长度
part_key TEXT = substr(part_key_def, begin_position, def_len - begin_position);
BEGIN
-- 如果不是分区表,返回null
RETURN cast(part_key AS VARCHAR);
END
$$ LANGUAGE plpgsql;

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- Range分区
CREATE TABLE test11
(
epoch INTEGER NOT NULL,
miner VARCHAR(100) NOT NULL,
balance NUMERIC,
CONSTRAINT uk_epoch_miner
UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

select pg_get_part_table_range_key('test11'::regclass);
-- Output: epoch

-- 哈希分区 以及分区表
CREATE TABLE test22(
miner varchar(50)
) PARTITION BY HASH (miner);

CREATE TABLE test22_1 PARTITION OF test22 FOR VALUES WITH (MODULUS 2, REMAINDER 0);

select pg_get_part_table_range_key('test22'::regclass);
-- Output: miner

-- 列表分区
CREATE TABLE test33(
miner varchar(50)
) PARTITION BY LIST (miner);

CREATE TABLE test33_1 PARTITION OF test33 FOR VALUES IN ('f02439');

select pg_get_part_table_range_key('test33'::regclass);
-- Output: miner

-- 普通非分区表
create table test66(miner varchar(50));

select pg_get_part_table_range_key('test66'::regclass);
-- Output: <NULL>

获取更新唯一键(2022-03-25 新增)

💡 这里约定,自动建分区表,需要建立一个且只有一个constraint定义。例如:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE IF NOT EXISTS miner_actors
(
epoch BIGINT NOT NULL,
miner VARCHAR NOT NULL,
owner VARCHAR NOT NULL,
worker VARCHAR,
controllers VARCHAR[],
sector_size BIGINT,
power DECIMAL,
CONSTRAINT uk_epoch_miner_actors UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

以上有一个唯一性约束uk_epoch_miner_actors,且只要定义一个,其它的约束以添加index方式在外部添加。

然后以下面的语句查询约束字段。

1
2
3
4
5
6
7
SELECT
DISTINCT kcu.column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'UNIQUE' AND tc.table_name = 'miner_actors';

作用于下面的代码

获取分区表更新列模版

modified at 2022-03-25

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE FUNCTION pg_compute_part_table_update_columns(main_table CHARACTER VARYING) RETURNS TEXT
IMMUTABLE
LANGUAGE plpgsql
AS
$$
DECLARE
update_columns TEXT;
BEGIN
SELECT string_agg(concat(column_name, '=$1.', column_name), ', ') AS columns
INTO update_columns
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = main_table
AND column_name NOT IN (SELECT
DISTINCT kcu.column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'UNIQUE' AND tc.table_name = main_table);
RETURN update_columns;
END
$$;

创建update 更新约束字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE OR REPLACE FUNCTION pg_compute_constraint_keys(main_table VARCHAR) RETURNS TEXT
IMMUTABLE
LANGUAGE plpgsql
AS
$$
DECLARE
ukeys TEXT;
BEGIN
WITH t AS (SELECT DISTINCT kcu.column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'UNIQUE'
AND tc.table_name = main_table)
SELECT string_agg(concat(t.column_name, '=$1.', t.column_name), ' AND ')
FROM t
INTO ukeys;
RETURN ukeys;
END
$$;

创建新分区

分区Meta表

分区meta表记录了分区表的分区信息、创建时间和分区范围。该表内的记录应在创建新分区时被记录。part_type字段表示范围分区、哈希分区或列表分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE IF NOT EXISTS part_tables
(
id SERIAL NOT NULL PRIMARY KEY,
tb_name VARCHAR(100),
part_type SMALLINT NOT NULL DEFAULT 0,
part_table VARCHAR(100),
begin_time TIMESTAMPTZ,
end_time TIMESTAMPTZ,
created_time TIMESTAMPTZ DEFAULT current_timestamp,
-- 分区表需要唯一
CONSTRAINT uk_part_table_name UNIQUE (tb_name, part_table),
CHECK (part_type IN (0, 1, 2)) -- 0 range / 1 hash /2 list
);

主函数

这里创建新分区一定要加上 IF NOT EXISTS判断,在同一个事务中,批量插入时,pg_catalog表内未完成实际分区表的创建,所以查询出来的结果是空,会重复创建。通过IF NOT EXISTS规避同一事务中的重复建表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CREATE OR REPLACE FUNCTION
create_new_partition_table(main_table VARCHAR, part_table_type SMALLINT, part_def PARTITION_TABLE_RANGE_DEFINITION)
RETURNS VARCHAR
AS
$$
DECLARE
part_table_name TEXT;
BEGIN
part_table_name = concat_ws('_', main_table, part_def.suffix);
EXECUTE format('CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)',
part_table_name, main_table, part_def.begin_epoch, part_def.end_epoch);

CREATE TABLE IF NOT EXISTS part_tables
(
id SERIAL NOT NULL PRIMARY KEY,
tb_name VARCHAR(100),
part_type SMALLINT NOT NULL DEFAULT 0,
part_table VARCHAR(100),
begin_time TIMESTAMPTZ,
end_time TIMESTAMPTZ,
created_time TIMESTAMPTZ DEFAULT current_timestamp,
-- 分区表需要唯一
CONSTRAINT uk_part_table_name UNIQUE (tb_name, part_table),
CHECK (part_type IN (0, 1, 2)) -- 0 range / 1 hash /2 list
);
INSERT INTO part_tables(tb_name, part_type, part_table, begin_time, end_time, created_time)
VALUES (main_table, part_table_type, part_table_name,
part_def.begin_time, part_def.end_time, current_timestamp)
ON CONFLICT (tb_name, part_table) DO NOTHING ;

RETURN part_table_name;
END
$$ LANGUAGE plpgsql;

重写规则

通过PostgreSQL数据库规则系统,改变数据库分区表默认的INSERT/UPDATE操作,来达到在插入或更新前,判断目标分区是否存在,如不存在创建之的目的。

为方便后续对分区表的维护,额外建立一张独立的数据表,存放新创建的分区以及创建时间。可以根据时间对分区表合理裁剪。

准备分区表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE TABLE example_table
(
epoch INT,
miner VARCHAR(100),
balance DECIMAL,
code VARCHAR(100),
CONSTRAINT uk_epoch_miner UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Nullable | Default | Storage | Stats target |
---------+------------------------+----------+---------+----------+--------------+-
epoch | integer | | | plain | |
miner | character varying(100) | | | extended | |
balance | numeric | | | main | |
code | character varying(100) | | | extended | |
Partition key: RANGE (epoch)
Indexes:
"uk_epoch_miner" UNIQUE CONSTRAINT, btree (epoch, miner)
Number of partitions: 0

插入(INSERT)

替代数据库分区表默认的插入操作,需要考虑默认插入动作的几个关键要素:自动路由到对应的表分区进行插入,处理唯一键冲突。

自动路由

自动路由的前提是对应的表分区已存在。它分成两个步骤:

  • 创建新分区

在插入行数据前,使用函数pg_get_part_table_name查找新插入的记录分区字段所对应的表分区,如果在数据库中不存在,则新创建。

  • 插入数据

向具体的表分区插入数据。

处理唯一键冲突

使用重写规则有一个限制,就是无法处理唯一键冲突更新。

无法对具有INSERT或者UPDATE规则的表使用带有ON CONFLICT子句的INSERT

1
INSERT INTO from_table VALUES (3, '333') ON CONFLICT(col1) DO UPDATE SET data = excluded.data;
1
0A000  feature_not_supported

因此,需要处理unique_violation的执行错误。使用EXCEPTION捕获unique_violation的错误,执行分区表UPDATE。

注意,一旦捕获unique_violation并处理实现唯一键冲突更新的操作,意味着替代的INSERT操作将天生具有upsert的功能。

具体实现

一、创建插入函数。

根据数据库规则语法,规则仅支持SELECT/INSERT/UPDATE/DELETE操作,因此要执行一系列动作需要创建一个存储过程来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CREATE OR REPLACE FUNCTION insert_action(table_row miner_actors) RETURNS VOID
AS $$
DECLARE
main_table VARCHAR = 'example_table';
part_table_type SMALLINT = 0;
range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(table_row.epoch);
part_def PARTITION_TABLE_RANGE_DEFINITION = calc_partition_range_by_day(range_ts);
part_table_name VARCHAR = pg_get_part_table_name(main_table, part_def.suffix);
update_columns TEXT;
constraint_keys TEXT;
BEGIN
RAISE NOTICE 'suffix:%; begin_epoch:%; end_epoch:%; begin_time:%; end_time:%',
part_def.suffix, part_def.begin_epoch, part_def.end_epoch, part_def.begin_time, part_def.end_time;
RAISE NOTICE 'part_table_name: %', part_table_name;
IF part_table_name ISNULL THEN
SELECT create_new_partition_table(main_table, part_table_type, part_def)
INTO part_table_name;
END IF;
EXECUTE format('INSERT INTO %s SELECT $1.*', part_table_name) USING table_row;
EXCEPTION
WHEN unique_violation THEN
update_columns = pg_compute_part_table_update_columns(main_table);
constraint_keys = pg_compute_constraint_keys(main_table);
RAISE NOTICE '%', format('UPDATE %s SET %s WHERE %s', part_table_name, update_columns, constraint_keys);
EXECUTE format('UPDATE %s SET %s WHERE %s', part_table_name, update_columns, constraint_keys) USING table_row;
END
$$ LANGUAGE plpgsql;

二、创建INSERT RULE。

在分区表主表上创建一条规则,其方式是INSTEAD默认INSERT操作。其中new是一个内置变量,代表新插入的记录行。

1
CREATE RULE upsert_part_data AS ON INSERT TO example_table DO INSTEAD SELECT insert_action(new);

三、插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
INSERT INTO example_table(epoch, miner, balance, code) VALUES (1, 'f02438', '100', 'Good');

example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Nullable | Default | Storage | Stats target |
---------+------------------------+----------+---------+----------+--------------+-
epoch | integer | | | plain | |
miner | character varying(100) | | | extended | |
balance | numeric | | | main | |
code | character varying(100) | | | extended | |
Partition key: RANGE (epoch)
Indexes:
"uk_epoch_miner" UNIQUE CONSTRAINT, btree (epoch, miner)
Rules:
upsert_part_data AS
ON INSERT TO example_table DO INSTEAD SELECT insert_action(new.*) AS insert_action
Partitions: example_table_2020_08_25_0_2159 FOR VALUES FROM (0) TO (2159)

更新(UPDATE)

替代数据库分区表默认更新操作,需要考虑以下几个因素:普通更新和分区移动。

普通更新

对于分区列相同的更新,不需要执行删除命令,也不需要创建新不存在的分区表,直接UPDATE目标表。

分区移动

对于更新了分区列的数据,要查找新的分区表是否存在,如果不存在需要创建。然后将旧数据删除,在新表中插入新纪录。

具体实现

一、创建存储过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
CREATE OR REPLACE FUNCTION
update_action(new_row miner_actors, old_row miner_actors)
RETURNS VOID AS
$$
DECLARE
main_table VARCHAR = 'example_table';
part_table_type SMALLINT = 0;
new_range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(new_row.epoch);
new_part_def PARTITION_TABLE_RANGE_DEFINITION = calc_partition_range_by_day(new_range_ts);
new_part_table_name VARCHAR = pg_get_part_table_name(main_table, new_part_def.suffix);
old_range_ts TIMESTAMPTZ;
old_part_def PARTITION_TABLE_RANGE_DEFINITION;
old_part_table_name VARCHAR;
update_columns TEXT;
constraint_keys TEXT;
BEGIN
constraint_keys = pg_compute_constraint_keys(main_table);
IF new_row.epoch = old_row.epoch THEN
RAISE NOTICE 'update in same table';
-- 旧数据存在或条件不存在,这里只是做单纯的update更新分区数据
update_columns = pg_compute_part_table_update_columns(main_table);
EXECUTE format('UPDATE %s SET %s WHERE %s', new_part_table_name, update_columns, constraint_keys) USING new_row;
RETURN;
END IF;

IF new_part_table_name ISNULL THEN
SELECT create_new_partition_table(main_table, part_table_type, new_part_def)
INTO new_part_table_name;
END IF;

old_range_ts = calc_timestamp_by_epoch(old_row.epoch);
old_part_def = calc_partition_range_by_day(old_range_ts);
old_part_table_name = pg_get_part_table_name(main_table, old_part_def.suffix);

-- delete 动作要放在这里,这里有移动操作。
EXECUTE format('DELETE FROM %s WHERE %s', old_part_table_name, constraint_keys) USING old_row;
EXECUTE format('INSERT INTO %s SELECT $1.*', new_part_table_name) USING new_row;
RETURN;
END
$$ LANGUAGE plpgsql;

二、创建UPDATE RULE

在分区表主表上创建一条更新规则,其方式是INSTEAD默认UPDATE操作。其中newold是内置变量,代表新更新的数据和旧数据行。

这里的旧的数据行是通过where条件获知的,新的数据行是根据update内set获知的。所以在执行该更新执行计划的时候它会先收集数据,然后执行规则。所以old可以根据唯一键来查找。

1
CREATE RULE update_part_data AS ON UPDATE TO example_table DO INSTEAD SELECT update_action(new, old);

三、更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
UPDATE example_table SET epoch = 3000, balance = '99', code = 'Good+' WHERE epoch = 1;

example=# \d+ example_table
Partitioned table "public.example_table"
Column | Type | Nullable | Default | Storage | Stats target |
---------+------------------------+----------+---------+----------+--------------+-
epoch | integer | | | plain | |
miner | character varying(100) | | | extended | |
balance | numeric | | | main | |
code | character varying(100) | | | extended | |
Partition key: RANGE (epoch)
Indexes:
"uk_epoch_miner" UNIQUE CONSTRAINT, btree (epoch, miner)
Rules:
update_part_data AS
ON UPDATE TO example_table DO INSTEAD SELECT update_action(new.*, old.*) AS update_action
upsert_part_data AS
ON INSERT TO example_table DO INSTEAD SELECT insert_action(new.*) AS insert_action
Partitions: example_table_2020_08_25_0_2159 FOR VALUES FROM (0) TO (2159),
example_table_2020_08_26_2160_5039 FOR VALUES FROM (2160) TO (5039)

分区表维护

规则替换

封装一个存储过程,将创建insert_action函数和update_action函数放入其中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
CREATE OR REPLACE PROCEDURE
create_auto_partition(table_name VARCHAR, range_type VARCHAR DEFAULT 'week')
AS
$$
DECLARE
insert_func_name TEXT := concat_ws('_', table_name, 'insert_action');
update_func_name TEXT := concat_ws('_', table_name, 'update_action');
insert_tpl TEXT;
update_tpl TEXT;
range_func_name TEXT;
BEGIN
IF range_type = 'day' THEN
range_func_name = 'calc_partition_range_by_day';
ELSEIF range_type = 'week' THEN
range_func_name = 'calc_partition_range_by_week_of_year';
ELSEIF range_type = 'month' THEN
range_func_name = 'calc_partition_range_by_month';
ELSEIF range_type = 'year' THEN
range_func_name = 'calc_partition_range_by_year';
ELSE
RAISE EXCEPTION 'unsupported range type(only in day, week, month, year)';
END IF;
insert_tpl := E'CREATE OR REPLACE FUNCTION ' || insert_func_name || '(table_row ' || table_name || ') ' ||
'RETURNS VOID AS ' ||
'$body$ ' ||
'DECLARE ' ||
' main_table VARCHAR = ''' || table_name || '''; ' ||
'part_table_type SMALLINT = 0; ' ||
'range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(table_row.epoch); ' ||
'part_def PARTITION_TABLE_RANGE_DEFINITION = ' || range_func_name || '(range_ts); ' ||
'part_table_name VARCHAR = pg_get_part_table_name(main_table, part_def.suffix); ' ||
'update_columns TEXT; ' ||
'constraint_keys TEXT; ' ||
'BEGIN ' ||
'IF part_table_name ISNULL THEN ' ||
'SELECT create_new_partition_table(main_table, part_table_type, part_def) ' ||
'INTO part_table_name; ' ||
'END IF; ' ||
'EXECUTE format(''INSERT INTO %s SELECT $1.*'', part_table_name) USING table_row; ' ||
'EXCEPTION ' ||
' WHEN unique_violation THEN ' ||
' update_columns = pg_compute_part_table_update_columns(main_table); ' ||
' constraint_keys = pg_compute_constraint_keys(main_table); ' ||
'EXECUTE format(''UPDATE %s SET %s WHERE %s '', part_table_name, update_columns, constraint_keys) USING table_row; ' ||
'END $body$ LANGUAGE plpgsql';
EXECUTE insert_tpl;
EXECUTE format('DROP RULE IF EXISTS upsert_part_data ON %s', table_name);
EXECUTE format('CREATE RULE upsert_part_data AS ON INSERT TO %s DO INSTEAD SELECT %s(new)', table_name, insert_func_name);

update_tpl := 'CREATE OR REPLACE FUNCTION '|| update_func_name || '(new_row '|| table_name ||', old_row '|| table_name ||')' ||
' RETURNS VOID AS ' ||
'$body$ ' ||
'DECLARE ' ||
' main_table VARCHAR = '''|| table_name ||'''; ' ||
'part_table_type SMALLINT = 0; ' ||
'new_range_ts TIMESTAMPTZ = calc_timestamp_by_epoch(new_row.epoch); ' ||
'new_part_def PARTITION_TABLE_RANGE_DEFINITION = '|| range_func_name ||'(new_range_ts); ' ||
'new_part_table_name VARCHAR = pg_get_part_table_name(main_table, new_part_def.suffix); ' ||
'old_range_ts TIMESTAMPTZ; ' ||
'old_part_def PARTITION_TABLE_RANGE_DEFINITION; ' ||
'old_part_table_name VARCHAR; ' ||
'update_columns TEXT; ' ||
'constraint_keys TEXT;' ||
'BEGIN ' ||
'constraint_keys = pg_compute_constraint_keys(main_table); ' ||
'IF new_row.epoch = old_row.epoch THEN ' ||
'update_columns = pg_compute_part_table_update_columns(main_table); ' ||
'EXECUTE format(''UPDATE %s SET %s WHERE %s'', new_part_table_name, update_columns, constraint_keys) USING new_row; ' ||
'RETURN; ' ||
'END IF; ' ||
'IF new_part_table_name ISNULL THEN ' ||
'SELECT create_new_partition_table(main_table, part_table_type, new_part_def) ' ||
'INTO new_part_table_name; ' ||
'END IF; ' ||
'old_range_ts = calc_timestamp_by_epoch(old_row.epoch); ' ||
'old_part_def = ' || range_func_name || '(old_range_ts); ' ||
'old_part_table_name = pg_get_part_table_name(main_table, old_part_def.suffix); ' ||
'EXECUTE format(''DELETE FROM %s WHERE %s'', old_part_table_name, constraint_keys) USING old_row; ' ||
'EXECUTE format(''INSERT INTO %s SELECT $1.*'', new_part_table_name) USING new_row; ' ||
'RETURN; ' ||
'END $body$ LANGUAGE plpgsql';
EXECUTE update_tpl;
EXECUTE format('DROP RULE IF EXISTS update_part_data ON %s', table_name);
EXECUTE format('CREATE RULE update_part_data AS ON UPDATE TO %s DO INSTEAD SELECT %s(new, old)', table_name, update_func_name);
END;
$$ LANGUAGE plpgsql;

调用

1
CALL create_auto_partition('example_table');

上述简化了操作步骤。对于公共函数仍然需要提前部署。

裁剪

//TODO

挂载

//TODO

总结

创建自动分区表的步骤

一、准备公共函数

函数 一章,和 规则替换 一节

二、创建分区表

1
2
3
4
5
6
7
8
CREATE TABLE example_table
(
epoch INT,
miner VARCHAR(100),
balance DECIMAL,
code VARCHAR(100),
CONSTRAINT uk_epoch_miner UNIQUE (epoch, miner)
) PARTITION BY RANGE (epoch);

三、创建INSERT/UPDATE替换函数

1
CALL create_auto_partition('example_table');

测试结果

1
2
3
INSERT INTO example_table(epoch, miner, balance, code) SELECT i, 'f03367', '0', 'Good' FROM generate_series(1, 1500000) g(i);

-- completed in 10 m 26 s 339 ms

插入150万测试数据,花费10分钟左右。

参考资料

官方文档

表分区

附录 A. PostgreSQL错误代码

CREATE RULE

网上文章

PostgreSQL中的函数之日期时间函数

自动创建分区实践(写入触发器)

PostgreSQL 异常捕获(EXCEPTION)

何谓10倍有效算力

存有DataCap订单的封装扇区,将可获得最高10倍于没有任何订单的封装扇区(承诺容量扇区,简称CC扇区)或非DataCap订单的封装扇区所获得的有效算力。这部分通过封装存有DataCap订单的扇区所获得的有效算力,我们统称为10倍有效算力。

DataCap是什么

作为一个区块链网络,Filecoin提供了一个稳健的分布式平台,任何人都可以在上面存储数据。网络上的存储提供者根据其提供的存储服务的可靠性获得区块奖励。

为了最大化Filecoin可以支持的有用存储数量,Filecoin Plus 作为一个社交信任层被提出来。用户可以使用一个新型的资源DataCap(数据配额)来与特定的存储提供者达成交易提议,这些存储提供者则有强大的激励因素来存储这些数据,因为这会在未来增加他们的区块奖励分成份额。

DataCap 是Filecoin网络里的一种资源,它从公证人手上流转到可信的用户。除了FIL外,这些用户根据协议价格来使用DataCap来达成存储交易提议。DataCap是一个“一次性积分”,即它一旦使用就会被消耗掉。

这时,DataCap无法在用户地址之间互转。在网络上,存储提供者可以通过提供由DataCap达成的交易提议来获得额外的奖励。

10倍有效算力正确获得方式

有效算力计算公式

对含有DataCap订单 的扇区,参与有效算力计算,将有两部分组成:计算订单权重,计算扇区有效算力。

计算订单权重

订单空间大小 = PieceSize(例如32G或64G,Piece没有固定大小,以扇区大小为上限)

订单存续周期 = EndEpoch - StartEpoch (这里可以理解为订单天数,例如:540天)

订单权重 = 订单空间大小 x 订单存续周期

计算扇区有效算力

扇区存续周期 = 扇区Expiration - Activation (这里可以理解为扇区的存续天数,例如:540天)

*扇区存续周期是首次封装时的时长

扇区权重 = 扇区大小 x 扇区存续周期

有效扇区 = 扇区大小 x [((扇区权重 - 订单权重)x10+订单权重x100)/扇区权重x10]

简化上述公式得出以下公式:

有效扇区 = 扇区大小 x [(扇区权重x10 - 订单权重x90)/扇区权重x10]

当且仅当订单权重=扇区权重时,有效扇区才是10倍于普通扇区。这意味着,订单的存续周期和订单的空间大小与扇区的存续周期以及扇区的空间大小一致时,才是10倍有效算力。

术语说明

Piece

Filecoin Piece 是用户在 Filecoin 网络上存储的数据的主要协商单元。 Filecoin Piece 不是一个存储单元,它没有特定的大小,而是以扇区大小为上限。 Filecoin Piece 可以是任意大小,但如果 Piece 大于矿工支持的 Sector 的大小,则必须将其拆分为更多 Pieces,以便每个 Piece 适合一个 Sector。

Piece 是代表文件的全部或部分的对象,由存储客户端和存储矿工在交易中使用。 存储客户雇用存储矿工来存储碎片。

Piece 数据结构设计用于证明任意 IPLD 图和客户端数据的存储。 此图显示了 Piece 及其证明树的详细组成,包括完整和带宽优化的 Piece 数据结构。

参考资料

Filecoin Plus: 存储提供者如何与已验证用户互动

维基百科上对于BitField是这样解释的:

https://zh.wikipedia.org/wiki/%E4%BD%8D%E6%AE%B5

位段(或称“位域”,Bit field)为一种数据结构,可以把数据以的形式紧凑的储存,并允许程序员对此结构的位进行操作。这种数据结构的好处:

  • 可以使数据单元节省储存空间,当程序需要成千上万个数据单元时,这种方法就显得尤为重要。
  • 位段可以很方便的访问一个整数值的部分内容从而可以简化程序源代码。

FileCoin 有一个 具体的实现:[Go-bitfield][https://github.com/filecoin-project/go-bitfield]

Advanced RLE+ implementation

Features iterator based primitives that scale with number of runs instead of number of bits.

这个实现包含一个Run-length-encoder(RLE)。援引维基百科上面的解释:

游程编码(英语:run-length encoding,缩写RLE),又称行程长度编码变动长度编码法,是一种与资料性质无关的无损数据压缩技术,基于“使用变动长度的码来取代连续重复出现的原始资料”来实现压缩。

说明

举例来说,一组资料串”AAAABBBCCDEEEE”,由4个A、3个B、2个C、1个D、4个E组成,经过变动长度编码法可将资料压缩为4A3B2C1D4E(由14个单位转成10个单位)。

简言之,其优点在于将重复性高的资料量压缩成小单位;然而,其缺点在于─若该资料出现频率不高,可能导致压缩结果资料量比原始资料大,例如:原始资料”ABCDE”,压缩结果为”1A1B1C1D1E”(由5个单位转成10个单位)。

知道了以上大体的功能和说明,针对FileCoin其对BitField的实现版本,目标要将保存在BitField内的数据以JSON格式化形式表示出来。

阅读全文 »