Partitioning and live snapshots of data in PostgreSQL

  • Tutorial
Although the topic of sectioning has already been raised earlier , I want to return to it to talk about my experience in solving this problem that arose in connection with the need for analytical processing of large amounts of data. In addition to sectioning, I will consider the extremely simplified implementation of "snapshots" of aggregated queries, automatically updated when the source data changes.

One of the main requirements for the developed system was the use of free software, and therefore, the choice fell on PostgreSQL. At the time I started working on the project, I knew PostgreSQL quite superficially, but I was quite familiar with the capabilities of Oracle Database. Since it was about analytical processing, I wanted to have analogues of Oracle options such as Partitioning and Materialized Views . After getting acquainted with the features of PostgreSQL , it became clear that this functionality, one way or another, would have to be written manually.

Of course, we were not talking about any full-fledged implementation of Materialized Views, which provides for rewriting requests. For my needs, the ability to create automatically updated aggregated single-table samples was quite enough (support for joining tables will most likely be added in the near future). For partitioning, I planned to use the repeatedly described approach using inherited tables, with data insertion controlled by a trigger. I had an idea to use Rules to control the partitioning , but I refused it, because, in my case, inserting data with single records prevailed.

I started, of course, with tables for storing metadata:

ps_tables.sql
createsequence ps_table_seq;
createtable    ps_table (
  idbigintdefaultnextval('ps_table_seq') notnull,
  namevarchar(50)    notnullunique,
  primary key(id)
);
createsequence ps_column_seq;
createtable    ps_column (
  idbigintdefaultnextval('ps_column_seq') notnull,
  table_id      bigintnotnullreferences ps_table(id),
  namevarchar(50)    notnull,
  parent_name   varchar(50),
  type_name     varchar(8)     notnullcheck (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')),
  unique (table_id, name),
  primary key(id)
);
createtable    ps_range_partition (
  table_id      bigintnotnullreferences ps_table(id),
  type_name     varchar(10)    notnullcheck (type_name in ('day', 'week', 'month', 'year')),
  start_value   datenotnull,
  end_value     datenotnull,
  primary key(table_id, start_value)
);
createtable    ps_snapshot (
  snapshot_id   bigintnotnullreferences ps_table(id),
  table_id      bigintnotnullreferences ps_table(id),
  type_name     varchar(10)    notnullcheck (type_name in ('day', 'week', 'month', 'year')),
  primary key(snapshot_id)
);


Everything is pretty obvious here. The only things worth mentioning are the column types:
Type of
Description
date
The column containing the calendar date used for partitioning and aggregating data (date and timestamp PostgreSQL types are supported)
key
The key used in the group by clause in data aggregation (all PostgreSQL integer types are supported)
nullable
A key used in data aggregation, possibly containing null
sum
Summation of values
min
Minimum value
max
Maximum value
cnt
Counting non-null values

The basis of the whole solution was the function that performs the rebuilding of the trigger functions for the table containing the source data:

ps_trigger_regenerate (bigint)
createorreplacefunction ps_trigger_regenerate(in p_table bigint) returnsvoidas $$
declare
  l_sql         text;
  l_table_name  varchar(50);
  l_date_column varchar(50);
  l_flag        boolean;
  tabs          record;
  columns       record;
beginselectnameinto l_table_name
  from   ps_table whereid = p_table;
  l_sql := 
 'createorreplacefunction ps_' || l_table_name || '_insert_trigger() returnstrigger' ||
 'as $'|| '$ ' ||
 'begin';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update' || tabs.table_name || 'set';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    not type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';
          end if;
          if columns.type_name = 'min' then
             l_sql := l_sql ||
             columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
          end if;
          if columns.type_name = 'max' then
             l_sql := l_sql ||
             columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' + casewhen NEW.' || columns.parent_name || 'isnullthen0else1end';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || 'and';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || '';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ' ||
     'if not FOUND then ' ||
     'insertinto' || tabs.table_name || '(';
      l_flag = FALSE;
      for columns in
        select name, type_name
        from   ps_column
        where  table_id = tabs.id
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          l_sql := l_sql || columns.name;
        end loop;
      l_sql := l_sql || ') values (';
      l_flag = FALSE;
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';
          elsif columns.type_name = 'cnt' then
             l_sql := l_sql || 'casewhen NEW.' || columns.parent_name || 'isnullthen0else1end';
          elsif columns.type_name in ('nullable', 'sum') then
             l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';
          else
             l_sql := l_sql || 'NEW.' || columns.parent_name;
          end if;
        end loop;
      l_sql := l_sql || '); ' ||
     'endif; ';
    endloop;
    selectnameinto l_date_column
    from   ps_column
    where  table_id = p_table
    and    type_name = 'date';
    for tabs in
      select to_char(start_value, 'YYYYMMDD') as start_value,
             to_char(end_value, 'YYYYMMDD') as end_value,
             type_name
      from   ps_range_partition
      where  table_id = p_table
      orderby start_value descloop
        l_sql := l_sql ||
       'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||
          'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||
          'return null; ' ||
       'end if; ';
      endloop;
  l_sql := l_sql ||
 'return NEW; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;
  l_sql := 
 'createorreplacefunction ps_' || l_table_name || '_raise_trigger() returnstrigger' ||
 'as $'|| '$ ' ||
 'begin' ||
   'raiseEXCEPTION''Can''''t support % onMINorMAXaggregate'', TG_OP;' ||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;
  l_sql := 
 'createorreplacefunction ps_' || l_table_name || '_delete_trigger() returnstrigger' ||
 'as $'|| '$ ' ||
 'begin';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update' || tabs.table_name || 'set';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('sum', 'cnt')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || '';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - casewhen OLD.' || columns.parent_name || 'isnullthen0else1end';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag  then
             l_sql := l_sql || 'and';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || '';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ';
    endloop;
  l_sql := l_sql ||
 'return null; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;
  l_sql := 
 'createorreplacefunction ps_' || l_table_name || '_update_trigger() returnstrigger' ||
 'as $'|| '$ ' ||
 'begin';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update' || tabs.table_name || 'set';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('sum', 'cnt')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || '';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name ||
             ' - casewhen OLD.' || columns.parent_name || 'isnullthen0else1end' ||
             ' + casewhen NEW.' || columns.parent_name || 'isnullthen0else1end';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || 'and';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || '';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ';
    endloop;
  l_sql := l_sql ||
 'return null; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;
end;
$$ language plpgsql;


Despite its awesome look, this function is quite simple. Its task is to form (based on available metadata) four functions used in the construction of triggers:

  • ps_TABLE_insert_trigger () - Function that controls the insertion of data
  • ps_TABLE_update_trigger () - Function that controls data refresh
  • ps_TABLE_delete_trigger () - Function that controls data deletion
  • ps_TABLE_raise_trigger () - Function prohibiting updating and deleting data

Here, instead of TABLE, the name of the table containing the source data is substituted. A typical definition of ps_TABLE_insert_trigger () will look like this:

createorreplacefunction ps_data_insert_trigger() returnstriggeras $$
beginupdate data_month set
    sum_field = sum_field + NEW.sum_field
  , min_field = least(min_field, NEW.min_field)
  where date_field = date_trunc('month', NEW.date_field)
  and   key_field = NEW.key_field;
  if not FOUND then
     insertinto data_month(date_field, key_field, sum_field, min_field)
     values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);
  endif;
  if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and 
     NEW.date_field < to_date('20130201', 'YYYYMMDD') then
     insertinto data_20130101 values (NEW.*);
     return null;
  endif;
  return NEW;
end;
$$ language plpgsql;

Actually, the function looks a bit more complicated, since null values ​​are handled in a special way. But, as an illustration, the above example is quite adequate. The logic of this code is obvious:

  • When inserting data into the original table, we try to update the counters in the aggregated data_month view
  • If this failed (record in data_month not found), add a new record
  • Next, check if each section is in the date range (in the example, one section), and if successful, insert the record into the corresponding section (since the section is inherited from the main table, you can safely use an asterisk) and return null to prevent the record from being inserted into the main table
  • If none of the sections fits, return NEW, allowing you to paste into the main table


The last paragraph leads to the fact that if a suitable section is not found, the data is added to the main table. In practice, this is quite convenient. Even if we do not create a section in advance or receive data with an incorrect date, the data insert will succeed. Subsequently, you can analyze the contents of the main table by running the query:

select * fromonlydata

After that, create the missing sections (as will be shown below, the data will be automatically transferred from the main table to the created section). In such cases, the number of records that did not fall into your section is usually not large and the cost of data transfer is negligible.

Now it remains to make a harness. Let's start with the function to create a new section:

ps_add_range_partition (varchar, varchar, varchar, date)
createorreplacefunction ps_add_range_partition(in p_table varchar, in p_column varchar, 
                in p_type varchar, in p_start date) returnsvoidas $$
declare
  l_sql       text;
  l_end       date;
  l_start_str varchar(10);
  l_end_str   varchar(10);
  l_table     bigint;
  l_flag      boolean;
  columns     record;
begin
  perform 1from   ps_table a, ps_column b
  where  a.id = b.table_id andlower(a.name) = lower(p_table)
  and    b.type_name = 'date'andlower(b.name) <> lower(p_column);
  if FOUND then
     raise EXCEPTION 'Conflict DATE columns';
  endif;
  l_end := p_start + ('1 ' || p_type)::INTERVAL;
  perform 1
  from   ps_table a, ps_range_partition b
  where  a.id = b.table_id and lower(a.name) = lower(p_table)
  and (( p_start >= b.start_value and p_start < b.end_value ) or
       ( b.start_value >= p_start and b.start_value < l_end ));
  if FOUND then
     raise EXCEPTION 'Range intervals intersects';
  endif;
  perform 1
  from   ps_table
  where  lower(name) = lower(p_table);
  if not FOUND then
     insertinto ps_table(name) values (lower(p_table));
  endif;
  selectidinto l_table
  from   ps_table
  wherelower(name) = lower(p_table);
  perform 1
  from   ps_column
  where  table_id = l_table and type_name = 'date'
  and    lower(name) = lower(p_column);
  if not FOUND then
     insertinto ps_column(table_id, name, type_name)
     values (l_table, lower(p_column), 'date');
  endif;
  insertinto ps_range_partition(table_id, type_name, start_value, end_value)
  values (l_table, p_type, p_start, l_end);
  l_start_str = to_char(p_start, 'YYYYMMDD');
  l_end_str = to_char(l_end, 'YYYYMMDD');
  l_sql :=
 'createtable' || p_table || '_' || l_start_str || '(' ||
   'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and' ||
                p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||
   'primary key (';
    l_flag := FALSE;
    for columns in
      select f.name as name
      from ( select ps_array_to_set(a.conkey) as nn
             from   pg_constraint a, pg_class b
             where  b.oid = a.conrelid
             and    a.contype = 'p'
             and    b.relname = p_table ) c, 
           ( select d.attname as name, d.attnum as nn
             from   pg_attribute d, pg_class e
             where  e.oid = d.attrelid
             and    e.relname = p_table ) f
      where  f.nn = c.nn
      order  by f.nn
      loop
        if l_flag then
           l_sql := l_sql || ', ';
        end if;
        l_flag := TRUE;
        l_sql := l_sql || columns.name;
      end loop;
  l_sql := l_sql ||
 ')) inherits (' || p_table || ')';
  execute l_sql;
  l_sql := 
 'createindex' || p_table || '_' || l_start_str || '_date on' || p_table || '_' || l_start_str || '(' || p_column || ')';
  execute l_sql;
  perform ps_trigger_regenerate(l_table);
  execute 'droptriggerifexists ps_' || p_table || '_before_insert on' || p_table;
  execute 'droptriggerifexists ps_' || p_table || '_after_update on'  || p_table;
  execute 'droptriggerifexists ps_' || p_table || '_after_delete on'  || p_table;
  l_sql := 
 'insertinto' || p_table || '_' || l_start_str || '' ||
 'select * from' || p_table || 'where' ||
  p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and' ||
  p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
  execute l_sql;
  l_sql := 
 'deletefromonly' || p_table || 'where' ||
  p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and' ||
  p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
  execute l_sql;
  l_sql := 
 'createtrigger ps_' || p_table || '_before_insert ' ||
 'beforeinserton' || p_table || 'foreachrow' ||
 'executeprocedure ps_' || p_table || '_insert_trigger()';
  execute l_sql;
  perform 1
  from   ps_snapshot a, ps_column b
  where  b.table_id = a.snapshot_id and a.table_id = l_table
  and    b.type_name in ('min', 'max');
  if FOUND then
     l_sql := 
    'createtrigger ps_' || p_table || '_after_update ' ||
    'afterupdateon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_after_delete ' ||
    'afterdeleteon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
    'afterupdateon' || p_table || '_' || l_start_str || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
    'afterdeleteon' || p_table || '_' || l_start_str || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
  else
     l_sql := 
    'createtrigger ps_' || p_table || '_after_update ' ||
    'afterupdateon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_after_delete ' ||
    'afterdeleteon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
    'afterupdateon' || p_table || '_' || l_start_str || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
    'afterdeleteon' || p_table || '_' || l_start_str || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
  end if;
end;
$$ language plpgsql;


Here, after checking the correctness of the input data, we add the necessary metadata, after which, we create an inherited table. Then, we recreate the functions of the triggers by calling ps_trigger_regenerate, after which we transfer the data falling under the sectioning condition into the created section with a dynamic query and recreate the triggers themselves.

Difficulties arose with two points.

  1. I had to suffer a bit with adding a month, day or year to the starting date (depending on the input parameter p_type:

    l_end := p_start + ('1 '|| p_type)::INTERVAL;
    

  2. Since the primary key is not inherited, I had to compose a request to System Catalogs to get a list of columns of the primary key of the source table (I also found it inappropriate to describe the primary key in my metadata):

    select f.name asnamefrom ( select ps_array_to_set(a.conkey) as nn
                 from   pg_constraint a, pg_class b
                 where  b.oid = a.conrelid
                 and    a.contype = 'p'and    b.relname = p_table ) c, 
               ( select d.attname asname, d.attnum as nn
                 from   pg_attribute d, pg_class e
                 where  e.oid = d.attrelid
                 and    e.relname = p_table ) f
          where  f.nn = c.nn
          orderby f.nn
    


Also, it should be noted that before creating the index, on the partition key (for the created partition), it would be worthwhile to check if it is the leading column of the primary key (so as not to create a duplicate index).

The function for deleting a section is much simpler and does not need special comments:

ps_del_range_partition (varchar, date)
createorreplacefunction ps_del_range_partition(in p_table varchar, in p_start date) 
      returnsvoidas $$
declare
  l_sql       text;
  l_start_str varchar(10);
  l_table     bigint;
beginselectidinto l_table
  from   ps_table
  wherelower(name) = lower(p_table);
  l_start_str = to_char(p_start, 'YYYYMMDD');
  deletefrom ps_range_partition 
  where  table_id = l_table
  and    start_value = p_start;
  perform ps_trigger_regenerate(l_table);
  l_sql := 
 'insertinto' || p_table || '' ||
 'select * from' || p_table || '_' || l_start_str;
  execute l_sql;
  perform 1
  from ( select 1
         from   ps_range_partition
         where  table_id = l_table
         union  all
         select 1
         from   ps_snapshot
         where  table_id = l_table ) a;
  if not FOUND then
     execute 'droptriggerifexists ps_' || p_table || '_before_insert on' || p_table;
     execute 'droptriggerifexists ps_' || p_table || '_after_update on'  || p_table;
     execute 'droptriggerifexists ps_' || p_table || '_after_delete on'  || p_table;
     execute 'dropfunction ps_' || p_table || '_insert_trigger() cascade';
     execute 'dropfunction ps_' || p_table || '_raise_trigger()  cascade';
     execute 'dropfunction ps_' || p_table || '_update_trigger() cascade';
     execute 'dropfunction ps_' || p_table || '_delete_trigger() cascade';
     delete from ps_column where table_id = l_table;
     delete from ps_table where id = l_table;
  end if;
  perform 1
  from   ps_range_partition
  where  table_id = l_table;
  if not FOUND then
     delete from ps_column 
     where  table_id = l_table
     and    type_name = 'date';
  end if;
  execute 'droptable' || p_table || '_' || l_start_str;
end;
$$ language plpgsql;


When a section is deleted, the data, of course, is not lost, but is transferred to the main table (triggers are deleted beforehand, since, as it turned out, the only keyword does not work in the insert statement).

It remains to add functions for managing “live” data snapshots:

ps_add_snapshot_column (varchar, varchar, varchar, varchar)
createorreplacefunction ps_add_snapshot_column(in p_snapshot varchar, 
     in p_column varchar, in p_parent varchar, in p_type varchar) returnsvoidas $$
declare
  l_table bigint;
begin
  perform 1from   ps_table
  wherelower(name) = lower(p_snapshot);
  if not FOUND then
     insertinto ps_table(name) values (lower(p_snapshot));
  endif;
  selectidinto l_table
  from   ps_table
  wherelower(name) = lower(p_snapshot);
  insertinto ps_column(table_id, name, parent_name, type_name)
  values (l_table, lower(p_column), lower(p_parent), p_type);
end;
$$ language plpgsql;


ps_add_snapshot (varchar, varchar, varchar)
createorreplacefunction ps_add_snapshot(in p_table varchar, in p_snapshot varchar, 
     in p_type varchar) returnsvoidas $$
declare
  l_sql      text;
  l_table    bigint;
  l_snapshot bigint;
  l_flag     boolean;
  columns    record;
beginselectidinto l_snapshot
  from   ps_table
  wherelower(name) = lower(p_snapshot);
  perform 1
  from   ps_column
  where  table_id = l_snapshot
  and    type_name in ('date', 'key');
  if not FOUND then
     raise EXCEPTION 'Key columns not found';
  endif;
  perform 1
  from   ps_column
  where  table_id = l_snapshot
  and    not type_name in ('date', 'key', 'nullable');
  if not FOUND then
     raise EXCEPTION 'Aggregate columns not found';
  endif;
  perform 1
  from   ps_table
  where  lower(name) = lower(p_table);
  if not FOUND then
     insertinto ps_table(name) values (lower(p_table));
  endif;
  selectidinto l_table
  from   ps_table
  wherelower(name) = lower(p_table);
  insertinto ps_snapshot(table_id, snapshot_id, type_name)
  values (l_table, l_snapshot, p_type);
  perform ps_trigger_regenerate(l_table);
  l_sql := 'createtable' || p_snapshot || ' (';
  l_flag := FALSE;
  for columns in
    select name, type_name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || columns.name || 'datenotnull';
      else
         l_sql := l_sql || columns.name || 'bigintnotnull';
      end if;
    end loop;
  l_sql := l_sql || ', primary key (';
  l_flag := FALSE;
  for columns in
    select name
    from   ps_column
    where  table_id = l_snapshot
    and    type_name in ('date', 'key', 'nullable')
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      l_sql := l_sql || columns.name;
    end loop;
  l_sql := l_sql || '))';
  execute l_sql;
  execute 'droptriggerifexists ps_' || p_table || '_before_insert on' || p_table;
  execute 'droptriggerifexists ps_' || p_table || '_after_update on'  || p_table;
  execute 'droptriggerifexists ps_' || p_table || '_after_delete on'  || p_table;
  l_sql := 
 'createtrigger ps_' || p_table || '_before_insert ' ||
 'beforeinserton' || p_table || 'foreachrow' ||
 'executeprocedure ps_' || p_table || '_insert_trigger()';
  execute l_sql;
  perform 1
  from   ps_snapshot a, ps_column b
  where  b.table_id = a.snapshot_id and a.table_id = l_table
  and    b.type_name in ('min', 'max');
  if FOUND then
     l_sql := 
    'createtrigger ps_' || p_table || '_after_update ' ||
    'afterupdateon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_after_delete ' ||
    'afterdeleteon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
  else
     l_sql := 
    'createtrigger ps_' || p_table || '_after_update ' ||
    'afterupdateon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'createtrigger ps_' || p_table || '_after_delete ' ||
    'afterdeleteon' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
  end if;
  l_sql := 'insertinto' || p_snapshot || '(';
  l_flag := FALSE;
  for columns in
    select name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      l_sql := l_sql || columns.name;
    end loop;
  l_sql := l_sql || ') select';
  l_flag := FALSE;
  for columns in
    select parent_name as name, type_name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
      end if;
      if columns.type_name = 'key' then
         l_sql := l_sql || columns.name;
      end if;
      if columns.type_name = 'nullable' then
         l_sql := l_sql || 'coalesce(' || columns.name || ', 0)';
      end if;
      if columns.type_name = 'sum' then
         l_sql := l_sql || 'sum(' || columns.name || ')';
      end if;
      if columns.type_name = 'min' then
         l_sql := l_sql || 'min(' || columns.name || ')';
      end if;
      if columns.type_name = 'max' then
         l_sql := l_sql || 'max(' || columns.name || ')';
      end if;
      if columns.type_name = 'cnt' then
         l_sql := l_sql || 'count(' || columns.name || ')';
      end if;
    end loop;
  l_sql := l_sql || 'from' || p_table || 'groupby';
  l_flag := FALSE;
  for columns in
    select parent_name as name, type_name
    from   ps_column
    where  table_id = l_snapshot
    and    type_name in ('date', 'key', 'nullable')
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
      else
         l_sql := l_sql || columns.name;
      end if;
    end loop;
  execute l_sql;
end;
$$ language plpgsql;


ps_del_snapshot (varchar)
createorreplacefunction ps_del_snapshot(in p_snapshot varchar) returnsvoidas $$
declare
  l_sql      text;
  p_table    varchar(50);
  l_table    bigint;
  l_snapshot bigint;
beginselect a.table_id, c.name into l_table, p_table
  from   ps_snapshot a, ps_table b, ps_table c
  where  b.id = a.snapshot_id and c.id = a.table_id
  andlower(b.name) = lower(p_snapshot);
  selectidinto l_snapshot
  from   ps_table
  wherelower(name) = lower(p_snapshot);
  deletefrom ps_snapshot where snapshot_id = l_snapshot;
  deletefrom ps_column where table_id = l_snapshot;
  deletefrom ps_table whereid = l_snapshot;
  execute'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
  execute'drop trigger if exists ps_' || p_table || '_after_update  on ' || p_table;
  execute'drop trigger if exists ps_' || p_table || '_after_delete  on ' || p_table;
  perform 1
  from ( select1from   ps_range_partition
         where  table_id = l_table
         union  all
         select1from   ps_snapshot
         where  table_id = l_table ) a;
  if not FOUND then
     execute'drop function if exists ps_' || p_table || '_insert_trigger() cascade';
     execute'drop function if exists ps_' || p_table || '_raise_trigger()  cascade';
     execute'drop function if exists ps_' || p_table || '_update_trigger() cascade';
     execute'drop function if exists ps_' || p_table || '_delete_trigger() cascade';
  else
     perform ps_trigger_regenerate(l_table);
     l_sql := 
    'createtrigger ps_' || p_table || '_before_insert ' ||
    'beforeinserton' || p_table || 'foreachrow' ||
    'executeprocedure ps_' || p_table || '_insert_trigger()';
     execute l_sql;
     perform 1
     from   ps_snapshot a, ps_column b
     where  b.table_id = a.snapshot_id and a.table_id = l_table
     and    b.type_name in ('min', 'max');
     if FOUND then
        l_sql := 
       'createtrigger ps_' || p_table || '_after_update ' ||
       'afterupdateon' || p_table || 'foreachrow' ||
       'executeprocedure ps_' || p_table || '_raise_trigger()';
        execute l_sql;
        l_sql := 
       'createtrigger ps_' || p_table || '_after_delete ' ||
       'afterdeleteon' || p_table || 'foreachrow' ||
       'executeprocedure ps_' || p_table || '_raise_trigger()';
        execute l_sql;
     else
        l_sql := 
       'createtrigger ps_' || p_table || '_after_update ' ||
       'afterupdateon' || p_table || 'foreachrow' ||
       'executeprocedure ps_' || p_table || '_update_trigger()';
        execute l_sql;
        l_sql := 
       'createtrigger ps_' || p_table || '_after_delete ' ||
       'afterdeleteon' || p_table || 'foreachrow' ||
       'executeprocedure ps_' || p_table || '_delete_trigger()';
        execute l_sql;
     end if;
  end if;
  execute 'droptableifexists' || p_snapshot;
end;
$$ language plpgsql;


Here, too, there is nothing fundamentally new and the only thing I would like to note is that, in the case of using the min or max aggregates, when creating triggers, the ps_TABLE_raise_trigger () function is used, which prohibits deletions and changes to the table, by which built snapshot. This is done because I could not come up with an adequate performance implementation for updating these aggregates when executing the update and delete statements in the source table.

Let's see how it all works. Create a test table:

createsequence test_seq;
createtabletest (
  idbigintdefaultnextval('test_seq') notnull,
  event_time    timestampnotnull,
  customer_id   bigintnotnull,
  valuebigintnotnull,
  primary key(id)
);

Now, to add a section, it is enough to execute the following query:

select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))

As a result, the inherited table test_20130501 will be created, into which all records for the month of May will automatically fall.

To delete a section, you can run the following query:

select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))


Creating snapshots is a bit more complicated as you need to first determine the columns of interest to us:

select ps_add_snapshot_column('test_month', 'customer_id', 'key')
select ps_add_snapshot_column('test_month', 'event_time', 'date')
select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum')
select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt')
select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max')
select ps_add_snapshot('test', 'test_month', 'month')


As a result, an automatically updated table will be created based on the following query:

select customer_id, date_trunc('month', event_time),
       sum(value) as value_sum,
       count(value) as value_cnt,
       max(value) as value_max
fromtestgroupby customer_id, date_trunc('month', event_time)

You can remove snapshot by running the following query:

select ps_del_snapshot('test_month')

That's all for today. Scripts can be picked up on GitHub .

Also popular now: