Partitioning and live snapshots of data in PostgreSQL
- Tutorial
Although the topic of sectioning has already been raised earlier , I want to return to it to talk about my experience in solving this problem that arose in connection with the need for analytical processing of large amounts of data. In addition to sectioning, I will consider the extremely simplified implementation of "snapshots" of aggregated queries, automatically updated when the source data changes.
One of the main requirements for the developed system was the use of free software, and therefore, the choice fell on PostgreSQL. At the time I started working on the project, I knew PostgreSQL quite superficially, but I was quite familiar with the capabilities of Oracle Database. Since it was about analytical processing, I wanted to have analogues of Oracle options such as Partitioning and Materialized Views . After getting acquainted with the features of PostgreSQL , it became clear that this functionality, one way or another, would have to be written manually.
Of course, we were not talking about any full-fledged implementation of Materialized Views, which provides for rewriting requests. For my needs, the ability to create automatically updated aggregated single-table samples was quite enough (support for joining tables will most likely be added in the near future). For partitioning, I planned to use the repeatedly described approach using inherited tables, with data insertion controlled by a trigger. I had an idea to use Rules to control the partitioning , but I refused it, because, in my case, inserting data with single records prevailed.
I started, of course, with tables for storing metadata:
Everything is pretty obvious here. The only things worth mentioning are the column types:
The basis of the whole solution was the function that performs the rebuilding of the trigger functions for the table containing the source data:
Despite its awesome look, this function is quite simple. Its task is to form (based on available metadata) four functions used in the construction of triggers:
Here, instead of TABLE, the name of the table containing the source data is substituted. A typical definition of ps_TABLE_insert_trigger () will look like this:
Actually, the function looks a bit more complicated, since null values are handled in a special way. But, as an illustration, the above example is quite adequate. The logic of this code is obvious:
The last paragraph leads to the fact that if a suitable section is not found, the data is added to the main table. In practice, this is quite convenient. Even if we do not create a section in advance or receive data with an incorrect date, the data insert will succeed. Subsequently, you can analyze the contents of the main table by running the query:
After that, create the missing sections (as will be shown below, the data will be automatically transferred from the main table to the created section). In such cases, the number of records that did not fall into your section is usually not large and the cost of data transfer is negligible.
Now it remains to make a harness. Let's start with the function to create a new section:
Here, after checking the correctness of the input data, we add the necessary metadata, after which, we create an inherited table. Then, we recreate the functions of the triggers by calling ps_trigger_regenerate, after which we transfer the data falling under the sectioning condition into the created section with a dynamic query and recreate the triggers themselves.
Difficulties arose with two points.
Also, it should be noted that before creating the index, on the partition key (for the created partition), it would be worthwhile to check if it is the leading column of the primary key (so as not to create a duplicate index).
The function for deleting a section is much simpler and does not need special comments:
When a section is deleted, the data, of course, is not lost, but is transferred to the main table (triggers are deleted beforehand, since, as it turned out, the only keyword does not work in the insert statement).
It remains to add functions for managing “live” data snapshots:
Here, too, there is nothing fundamentally new and the only thing I would like to note is that, in the case of using the min or max aggregates, when creating triggers, the ps_TABLE_raise_trigger () function is used, which prohibits deletions and changes to the table, by which built snapshot. This is done because I could not come up with an adequate performance implementation for updating these aggregates when executing the update and delete statements in the source table.
Let's see how it all works. Create a test table:
Now, to add a section, it is enough to execute the following query:
As a result, the inherited table test_20130501 will be created, into which all records for the month of May will automatically fall.
To delete a section, you can run the following query:
Creating snapshots is a bit more complicated as you need to first determine the columns of interest to us:
As a result, an automatically updated table will be created based on the following query:
You can remove snapshot by running the following query:
That's all for today. Scripts can be picked up on GitHub .
One of the main requirements for the developed system was the use of free software, and therefore, the choice fell on PostgreSQL. At the time I started working on the project, I knew PostgreSQL quite superficially, but I was quite familiar with the capabilities of Oracle Database. Since it was about analytical processing, I wanted to have analogues of Oracle options such as Partitioning and Materialized Views . After getting acquainted with the features of PostgreSQL , it became clear that this functionality, one way or another, would have to be written manually.
Of course, we were not talking about any full-fledged implementation of Materialized Views, which provides for rewriting requests. For my needs, the ability to create automatically updated aggregated single-table samples was quite enough (support for joining tables will most likely be added in the near future). For partitioning, I planned to use the repeatedly described approach using inherited tables, with data insertion controlled by a trigger. I had an idea to use Rules to control the partitioning , but I refused it, because, in my case, inserting data with single records prevailed.
I started, of course, with tables for storing metadata:
ps_tables.sql
createsequence ps_table_seq;
createtable ps_table (
idbigintdefaultnextval('ps_table_seq') notnull,
namevarchar(50) notnullunique,
primary key(id)
);
createsequence ps_column_seq;
createtable ps_column (
idbigintdefaultnextval('ps_column_seq') notnull,
table_id bigintnotnullreferences ps_table(id),
namevarchar(50) notnull,
parent_name varchar(50),
type_name varchar(8) notnullcheck (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')),
unique (table_id, name),
primary key(id)
);
createtable ps_range_partition (
table_id bigintnotnullreferences ps_table(id),
type_name varchar(10) notnullcheck (type_name in ('day', 'week', 'month', 'year')),
start_value datenotnull,
end_value datenotnull,
primary key(table_id, start_value)
);
createtable ps_snapshot (
snapshot_id bigintnotnullreferences ps_table(id),
table_id bigintnotnullreferences ps_table(id),
type_name varchar(10) notnullcheck (type_name in ('day', 'week', 'month', 'year')),
primary key(snapshot_id)
);
Everything is pretty obvious here. The only things worth mentioning are the column types:
Type of | Description |
date | The column containing the calendar date used for partitioning and aggregating data (date and timestamp PostgreSQL types are supported) |
key | The key used in the group by clause in data aggregation (all PostgreSQL integer types are supported) |
nullable | A key used in data aggregation, possibly containing null |
sum | Summation of values |
min | Minimum value |
max | Maximum value |
cnt | Counting non-null values |
The basis of the whole solution was the function that performs the rebuilding of the trigger functions for the table containing the source data:
ps_trigger_regenerate (bigint)
createorreplacefunction ps_trigger_regenerate(in p_table bigint) returnsvoidas $$
declare
l_sql text;
l_table_name varchar(50);
l_date_column varchar(50);
l_flag boolean;
tabs record;
columns record;
beginselectnameinto l_table_name
from ps_table whereid = p_table;
l_sql :=
'createorreplacefunction ps_' || l_table_name || '_insert_trigger() returnstrigger' ||
'as $'|| '$ ' ||
'begin';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update' || tabs.table_name || 'set';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and not type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';
end if;
if columns.type_name = 'min' then
l_sql := l_sql ||
columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'max' then
l_sql := l_sql ||
columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + casewhen NEW.' || columns.parent_name || 'isnullthen0else1end';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || '';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ' ||
'if not FOUND then ' ||
'insertinto' || tabs.table_name || '(';
l_flag = FALSE;
for columns in
select name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || ') values (';
l_flag = FALSE;
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';
elsif columns.type_name = 'cnt' then
l_sql := l_sql || 'casewhen NEW.' || columns.parent_name || 'isnullthen0else1end';
elsif columns.type_name in ('nullable', 'sum') then
l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';
else
l_sql := l_sql || 'NEW.' || columns.parent_name;
end if;
end loop;
l_sql := l_sql || '); ' ||
'endif; ';
endloop;
selectnameinto l_date_column
from ps_column
where table_id = p_table
and type_name = 'date';
for tabs in
select to_char(start_value, 'YYYYMMDD') as start_value,
to_char(end_value, 'YYYYMMDD') as end_value,
type_name
from ps_range_partition
where table_id = p_table
orderby start_value descloop
l_sql := l_sql ||
'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||
'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||
'return null; ' ||
'end if; ';
endloop;
l_sql := l_sql ||
'return NEW; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'createorreplacefunction ps_' || l_table_name || '_raise_trigger() returnstrigger' ||
'as $'|| '$ ' ||
'begin' ||
'raiseEXCEPTION''Can''''t support % onMINorMAXaggregate'', TG_OP;' ||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'createorreplacefunction ps_' || l_table_name || '_delete_trigger() returnstrigger' ||
'as $'|| '$ ' ||
'begin';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update' || tabs.table_name || 'set';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || '';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - casewhen OLD.' || columns.parent_name || 'isnullthen0else1end';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || '';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
endloop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'createorreplacefunction ps_' || l_table_name || '_update_trigger() returnstrigger' ||
'as $'|| '$ ' ||
'begin';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update' || tabs.table_name || 'set';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || '';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name ||
' - casewhen OLD.' || columns.parent_name || 'isnullthen0else1end' ||
' + casewhen NEW.' || columns.parent_name || 'isnullthen0else1end';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || '';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
endloop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
end;
$$ language plpgsql;
Despite its awesome look, this function is quite simple. Its task is to form (based on available metadata) four functions used in the construction of triggers:
- ps_TABLE_insert_trigger () - Function that controls the insertion of data
- ps_TABLE_update_trigger () - Function that controls data refresh
- ps_TABLE_delete_trigger () - Function that controls data deletion
- ps_TABLE_raise_trigger () - Function prohibiting updating and deleting data
Here, instead of TABLE, the name of the table containing the source data is substituted. A typical definition of ps_TABLE_insert_trigger () will look like this:
createorreplacefunction ps_data_insert_trigger() returnstriggeras $$
beginupdate data_month set
sum_field = sum_field + NEW.sum_field
, min_field = least(min_field, NEW.min_field)
where date_field = date_trunc('month', NEW.date_field)
and key_field = NEW.key_field;
if not FOUND then
insertinto data_month(date_field, key_field, sum_field, min_field)
values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);
endif;
if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and
NEW.date_field < to_date('20130201', 'YYYYMMDD') then
insertinto data_20130101 values (NEW.*);
return null;
endif;
return NEW;
end;
$$ language plpgsql;
Actually, the function looks a bit more complicated, since null values are handled in a special way. But, as an illustration, the above example is quite adequate. The logic of this code is obvious:
- When inserting data into the original table, we try to update the counters in the aggregated data_month view
- If this failed (record in data_month not found), add a new record
- Next, check if each section is in the date range (in the example, one section), and if successful, insert the record into the corresponding section (since the section is inherited from the main table, you can safely use an asterisk) and return null to prevent the record from being inserted into the main table
- If none of the sections fits, return NEW, allowing you to paste into the main table
The last paragraph leads to the fact that if a suitable section is not found, the data is added to the main table. In practice, this is quite convenient. Even if we do not create a section in advance or receive data with an incorrect date, the data insert will succeed. Subsequently, you can analyze the contents of the main table by running the query:
select * fromonlydata
After that, create the missing sections (as will be shown below, the data will be automatically transferred from the main table to the created section). In such cases, the number of records that did not fall into your section is usually not large and the cost of data transfer is negligible.
Now it remains to make a harness. Let's start with the function to create a new section:
ps_add_range_partition (varchar, varchar, varchar, date)
createorreplacefunction ps_add_range_partition(in p_table varchar, in p_column varchar,
in p_type varchar, in p_start date) returnsvoidas $$
declare
l_sql text;
l_end date;
l_start_str varchar(10);
l_end_str varchar(10);
l_table bigint;
l_flag boolean;
columns record;
begin
perform 1from ps_table a, ps_column b
where a.id = b.table_id andlower(a.name) = lower(p_table)
and b.type_name = 'date'andlower(b.name) <> lower(p_column);
if FOUND then
raise EXCEPTION 'Conflict DATE columns';
endif;
l_end := p_start + ('1 ' || p_type)::INTERVAL;
perform 1
from ps_table a, ps_range_partition b
where a.id = b.table_id and lower(a.name) = lower(p_table)
and (( p_start >= b.start_value and p_start < b.end_value ) or
( b.start_value >= p_start and b.start_value < l_end ));
if FOUND then
raise EXCEPTION 'Range intervals intersects';
endif;
perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insertinto ps_table(name) values (lower(p_table));
endif;
selectidinto l_table
from ps_table
wherelower(name) = lower(p_table);
perform 1
from ps_column
where table_id = l_table and type_name = 'date'
and lower(name) = lower(p_column);
if not FOUND then
insertinto ps_column(table_id, name, type_name)
values (l_table, lower(p_column), 'date');
endif;
insertinto ps_range_partition(table_id, type_name, start_value, end_value)
values (l_table, p_type, p_start, l_end);
l_start_str = to_char(p_start, 'YYYYMMDD');
l_end_str = to_char(l_end, 'YYYYMMDD');
l_sql :=
'createtable' || p_table || '_' || l_start_str || '(' ||
'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||
'primary key (';
l_flag := FALSE;
for columns in
select f.name as name
from ( select ps_array_to_set(a.conkey) as nn
from pg_constraint a, pg_class b
where b.oid = a.conrelid
and a.contype = 'p'
and b.relname = p_table ) c,
( select d.attname as name, d.attnum as nn
from pg_attribute d, pg_class e
where e.oid = d.attrelid
and e.relname = p_table ) f
where f.nn = c.nn
order by f.nn
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql ||
')) inherits (' || p_table || ')';
execute l_sql;
l_sql :=
'createindex' || p_table || '_' || l_start_str || '_date on' || p_table || '_' || l_start_str || '(' || p_column || ')';
execute l_sql;
perform ps_trigger_regenerate(l_table);
execute 'droptriggerifexists ps_' || p_table || '_before_insert on' || p_table;
execute 'droptriggerifexists ps_' || p_table || '_after_update on' || p_table;
execute 'droptriggerifexists ps_' || p_table || '_after_delete on' || p_table;
l_sql :=
'insertinto' || p_table || '_' || l_start_str || '' ||
'select * from' || p_table || 'where' ||
p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
execute l_sql;
l_sql :=
'deletefromonly' || p_table || 'where' ||
p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_before_insert ' ||
'beforeinserton' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'createtrigger ps_' || p_table || '_after_update ' ||
'afterupdateon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_after_delete ' ||
'afterdeleteon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
'afterupdateon' || p_table || '_' || l_start_str || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
'afterdeleteon' || p_table || '_' || l_start_str || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'createtrigger ps_' || p_table || '_after_update ' ||
'afterupdateon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_after_delete ' ||
'afterdeleteon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
'afterupdateon' || p_table || '_' || l_start_str || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
'afterdeleteon' || p_table || '_' || l_start_str || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
end;
$$ language plpgsql;
Here, after checking the correctness of the input data, we add the necessary metadata, after which, we create an inherited table. Then, we recreate the functions of the triggers by calling ps_trigger_regenerate, after which we transfer the data falling under the sectioning condition into the created section with a dynamic query and recreate the triggers themselves.
Difficulties arose with two points.
- I had to suffer a bit with adding a month, day or year to the starting date (depending on the input parameter p_type:
l_end := p_start + ('1 '|| p_type)::INTERVAL;
- Since the primary key is not inherited, I had to compose a request to System Catalogs to get a list of columns of the primary key of the source table (I also found it inappropriate to describe the primary key in my metadata):
select f.name asnamefrom ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p'and b.relname = p_table ) c, ( select d.attname asname, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn orderby f.nn
Also, it should be noted that before creating the index, on the partition key (for the created partition), it would be worthwhile to check if it is the leading column of the primary key (so as not to create a duplicate index).
The function for deleting a section is much simpler and does not need special comments:
ps_del_range_partition (varchar, date)
createorreplacefunction ps_del_range_partition(in p_table varchar, in p_start date)
returnsvoidas $$
declare
l_sql text;
l_start_str varchar(10);
l_table bigint;
beginselectidinto l_table
from ps_table
wherelower(name) = lower(p_table);
l_start_str = to_char(p_start, 'YYYYMMDD');
deletefrom ps_range_partition
where table_id = l_table
and start_value = p_start;
perform ps_trigger_regenerate(l_table);
l_sql :=
'insertinto' || p_table || '' ||
'select * from' || p_table || '_' || l_start_str;
execute l_sql;
perform 1
from ( select 1
from ps_range_partition
where table_id = l_table
union all
select 1
from ps_snapshot
where table_id = l_table ) a;
if not FOUND then
execute 'droptriggerifexists ps_' || p_table || '_before_insert on' || p_table;
execute 'droptriggerifexists ps_' || p_table || '_after_update on' || p_table;
execute 'droptriggerifexists ps_' || p_table || '_after_delete on' || p_table;
execute 'dropfunction ps_' || p_table || '_insert_trigger() cascade';
execute 'dropfunction ps_' || p_table || '_raise_trigger() cascade';
execute 'dropfunction ps_' || p_table || '_update_trigger() cascade';
execute 'dropfunction ps_' || p_table || '_delete_trigger() cascade';
delete from ps_column where table_id = l_table;
delete from ps_table where id = l_table;
end if;
perform 1
from ps_range_partition
where table_id = l_table;
if not FOUND then
delete from ps_column
where table_id = l_table
and type_name = 'date';
end if;
execute 'droptable' || p_table || '_' || l_start_str;
end;
$$ language plpgsql;
When a section is deleted, the data, of course, is not lost, but is transferred to the main table (triggers are deleted beforehand, since, as it turned out, the only keyword does not work in the insert statement).
It remains to add functions for managing “live” data snapshots:
ps_add_snapshot_column (varchar, varchar, varchar, varchar)
createorreplacefunction ps_add_snapshot_column(in p_snapshot varchar,
in p_column varchar, in p_parent varchar, in p_type varchar) returnsvoidas $$
declare
l_table bigint;
begin
perform 1from ps_table
wherelower(name) = lower(p_snapshot);
if not FOUND then
insertinto ps_table(name) values (lower(p_snapshot));
endif;
selectidinto l_table
from ps_table
wherelower(name) = lower(p_snapshot);
insertinto ps_column(table_id, name, parent_name, type_name)
values (l_table, lower(p_column), lower(p_parent), p_type);
end;
$$ language plpgsql;
ps_add_snapshot (varchar, varchar, varchar)
createorreplacefunction ps_add_snapshot(in p_table varchar, in p_snapshot varchar,
in p_type varchar) returnsvoidas $$
declare
l_sql text;
l_table bigint;
l_snapshot bigint;
l_flag boolean;
columns record;
beginselectidinto l_snapshot
from ps_table
wherelower(name) = lower(p_snapshot);
perform 1
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key');
if not FOUND then
raise EXCEPTION 'Key columns not found';
endif;
perform 1
from ps_column
where table_id = l_snapshot
and not type_name in ('date', 'key', 'nullable');
if not FOUND then
raise EXCEPTION 'Aggregate columns not found';
endif;
perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insertinto ps_table(name) values (lower(p_table));
endif;
selectidinto l_table
from ps_table
wherelower(name) = lower(p_table);
insertinto ps_snapshot(table_id, snapshot_id, type_name)
values (l_table, l_snapshot, p_type);
perform ps_trigger_regenerate(l_table);
l_sql := 'createtable' || p_snapshot || ' (';
l_flag := FALSE;
for columns in
select name, type_name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || columns.name || 'datenotnull';
else
l_sql := l_sql || columns.name || 'bigintnotnull';
end if;
end loop;
l_sql := l_sql || ', primary key (';
l_flag := FALSE;
for columns in
select name
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || '))';
execute l_sql;
execute 'droptriggerifexists ps_' || p_table || '_before_insert on' || p_table;
execute 'droptriggerifexists ps_' || p_table || '_after_update on' || p_table;
execute 'droptriggerifexists ps_' || p_table || '_after_delete on' || p_table;
l_sql :=
'createtrigger ps_' || p_table || '_before_insert ' ||
'beforeinserton' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'createtrigger ps_' || p_table || '_after_update ' ||
'afterupdateon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_after_delete ' ||
'afterdeleteon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'createtrigger ps_' || p_table || '_after_update ' ||
'afterupdateon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_after_delete ' ||
'afterdeleteon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
l_sql := 'insertinto' || p_snapshot || '(';
l_flag := FALSE;
for columns in
select name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || ') select';
l_flag := FALSE;
for columns in
select parent_name as name, type_name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
end if;
if columns.type_name = 'key' then
l_sql := l_sql || columns.name;
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql || 'coalesce(' || columns.name || ', 0)';
end if;
if columns.type_name = 'sum' then
l_sql := l_sql || 'sum(' || columns.name || ')';
end if;
if columns.type_name = 'min' then
l_sql := l_sql || 'min(' || columns.name || ')';
end if;
if columns.type_name = 'max' then
l_sql := l_sql || 'max(' || columns.name || ')';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql || 'count(' || columns.name || ')';
end if;
end loop;
l_sql := l_sql || 'from' || p_table || 'groupby';
l_flag := FALSE;
for columns in
select parent_name as name, type_name
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
else
l_sql := l_sql || columns.name;
end if;
end loop;
execute l_sql;
end;
$$ language plpgsql;
ps_del_snapshot (varchar)
createorreplacefunction ps_del_snapshot(in p_snapshot varchar) returnsvoidas $$
declare
l_sql text;
p_table varchar(50);
l_table bigint;
l_snapshot bigint;
beginselect a.table_id, c.name into l_table, p_table
from ps_snapshot a, ps_table b, ps_table c
where b.id = a.snapshot_id and c.id = a.table_id
andlower(b.name) = lower(p_snapshot);
selectidinto l_snapshot
from ps_table
wherelower(name) = lower(p_snapshot);
deletefrom ps_snapshot where snapshot_id = l_snapshot;
deletefrom ps_column where table_id = l_snapshot;
deletefrom ps_table whereid = l_snapshot;
execute'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
perform 1
from ( select1from ps_range_partition
where table_id = l_table
union all
select1from ps_snapshot
where table_id = l_table ) a;
if not FOUND then
execute'drop function if exists ps_' || p_table || '_insert_trigger() cascade';
execute'drop function if exists ps_' || p_table || '_raise_trigger() cascade';
execute'drop function if exists ps_' || p_table || '_update_trigger() cascade';
execute'drop function if exists ps_' || p_table || '_delete_trigger() cascade';
else
perform ps_trigger_regenerate(l_table);
l_sql :=
'createtrigger ps_' || p_table || '_before_insert ' ||
'beforeinserton' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'createtrigger ps_' || p_table || '_after_update ' ||
'afterupdateon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_after_delete ' ||
'afterdeleteon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'createtrigger ps_' || p_table || '_after_update ' ||
'afterupdateon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'createtrigger ps_' || p_table || '_after_delete ' ||
'afterdeleteon' || p_table || 'foreachrow' ||
'executeprocedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
end if;
execute 'droptableifexists' || p_snapshot;
end;
$$ language plpgsql;
Here, too, there is nothing fundamentally new and the only thing I would like to note is that, in the case of using the min or max aggregates, when creating triggers, the ps_TABLE_raise_trigger () function is used, which prohibits deletions and changes to the table, by which built snapshot. This is done because I could not come up with an adequate performance implementation for updating these aggregates when executing the update and delete statements in the source table.
Let's see how it all works. Create a test table:
createsequence test_seq;
createtabletest (
idbigintdefaultnextval('test_seq') notnull,
event_time timestampnotnull,
customer_id bigintnotnull,
valuebigintnotnull,
primary key(id)
);
Now, to add a section, it is enough to execute the following query:
select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))
As a result, the inherited table test_20130501 will be created, into which all records for the month of May will automatically fall.
To delete a section, you can run the following query:
select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))
Creating snapshots is a bit more complicated as you need to first determine the columns of interest to us:
select ps_add_snapshot_column('test_month', 'customer_id', 'key')
select ps_add_snapshot_column('test_month', 'event_time', 'date')
select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum')
select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt')
select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max')
select ps_add_snapshot('test', 'test_month', 'month')
As a result, an automatically updated table will be created based on the following query:
select customer_id, date_trunc('month', event_time),
sum(value) as value_sum,
count(value) as value_cnt,
max(value) as value_max
fromtestgroupby customer_id, date_trunc('month', event_time)
You can remove snapshot by running the following query:
select ps_del_snapshot('test_month')
That's all for today. Scripts can be picked up on GitHub .