Fighting duplicates

  • Tutorial
Continuing the topic of using dynamic SQL, I want to talk about one useful tool that I implemented as part of one of the current projects. It will be about duplicates in directories. By duplicates, in this article, I mean the entries made in the directories again, for example, as a result of a spelling error when entering the name.

The essence of my approach is to make it possible to declare any directory entry a duplicate of an existing one. As a result, the duplicate record will be deleted, and all references to it will be corrected so that they will refer to the correct record. It is also very important to provide the ability to roll back such changes, in case they are made by mistake.

Let's start with tables for storing service data:

Service tables
create table    mg_table (
  table_name    varchar(100)   not null,
  pk_name       varchar(100)   not null,
  primary key(table_name)
);
create sequence mg_action_seq;
create table    mg_action (
  id            bigint         default nextval('mg_action_seq') not null,
  table_name    varchar(100)   not null references mg_table(table_name),
  old_id        varchar(50)    not null,
  new_id        varchar(50)    not null,
  action_time   timestamp      default now() not null,
  primary key(id)
);
create sequence mg_action_detail_seq;
create table    mg_action_detail (
  id            bigint         default nextval('mg_action_detail_seq') not null,
  action_id     bigint         not null references mg_action(id),
  table_name    varchar(100)   not null,
  pk_name       varchar(100)   not null,
  column_name   varchar(100)   not null,
  obj_id        varchar(50)    not null,
  primary key(id)
);


Here, the mg_table contains data about tables for which duplicate merging is supported. The only requirement for such tables is that the primary key must consist of a single numeric or string column. We do not have to worry about this table, because it will be filled in automatically. The mg_action and mg_action_detail tables will contain the data needed to roll back the changes.

We define a pair of auxiliary functions:

Secondary functions
create or replace function mg_get_pk_column(in p_table varchar) returns varchar
as $$
declare
  l_pk  text;
  l_cn  int;
begin
  select max(f.name), count(*) as name into l_pk, l_cn
  from ( select ps_array_to_set(a.conkey) as nn
         from   pg_constraint a, pg_class b
         where  b.oid = a.conrelid
         and    a.contype = 'p'
         and    b.relname = lower(p_table) ) c, 
       ( select d.attname as name, d.attnum as nn
         from   pg_attribute d, pg_class e
         where  e.oid = d.attrelid
         and    e.relname = lower(p_table) ) f
  where  f.nn = c.nn;
  if l_cn <> 1 then
     raise EXCEPTION 'Can''t support composite PK';
  end if;
  return l_pk;
end;
$$ language plpgsql;
create or replace function mg_add_dict(in p_table varchar) returns void
as $$
declare
  l_pk  text;
  l_sql text;
begin
  l_pk := mg_get_pk_column(p_table);
  perform 1
  from mg_table where table_name = lower(p_table);
  if not FOUND then
     l_sql := 
    'create table mg_' || lower(p_table) || ' ' ||
    'as select * from ' || lower(p_table) || ' limit 0';
     execute l_sql;
     l_sql :=
    'alter table mg_' || lower(p_table) || ' ' ||
    'add primary key(' || l_pk || ')';
     execute l_sql;
     insert into mg_table(table_name, pk_name) values (lower(p_table), l_pk);
  end if;
end;
$$ language plpgsql;


The mg_get_pk_column function performs the query known to us in the previous article , which returns the name of the primary key column, and also checks that the primary key consists of one column.

The mg_add_dict function, in addition to filling the mg_table, creates a table with the prefix 'mg_', in which deleted duplicates will be stored, in case the change needs to be rolled back. In its structure, this table is completely similar to the original one.

We pass to the most interesting:

mg_merge
create or replace function mg_merge(in p_table varchar, in p_old varchar, in p_new varchar) returns void
as $$
declare
  l_action int;
  l_pk     text;
  l_sql    text;
  tabs     record;
begin
  perform mg_add_dict(p_table);
  select pk_name into l_pk
  from   mg_table where table_name = lower(p_table);
  l_action := nextval('mg_action_seq');
  insert into mg_action(id, table_name, old_id, new_id)
  values (l_action, p_table, p_old, p_new);
  l_sql := 
 'insert into mg_' || lower(p_table) || ' ' ||
 'select * from ' || lower(p_table) || ' ' ||
 'where ' || l_pk || ' = ''' || p_old || '''';
  execute l_sql;
  for tabs in
      select b.relname as table_name, 
             d.attname as column_name
      from   pg_constraint a, pg_class b, pg_class c,
             pg_attribute d
      where  a.contype = 'f'
      and    b.oid = a.conrelid
      and    c.oid = a.confrelid
      and    c.relname = lower(p_table)
      and    d.attrelid = b.oid
      and    a.conkey[1] = d.attnum
      loop
     l_sql := 
    'insert into mg_action_detail(action_id, table_name, column_name, obj_id, pk_name) ' ||
    'select ' || l_action || ', ''' || tabs.table_name || ''', ''' || 
     tabs.column_name || ''', id, ' ||
    '''' || mg_get_pk_column(tabs.table_name::varchar) || ''' ' ||
    'from ' || lower(tabs.table_name) || ' ' ||
    'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';
     execute l_sql;
        l_sql :=
       'update ' || lower(tabs.table_name) || ' ' ||
       'set ' || lower(tabs.column_name) || ' = ''' || p_new || ''' ' ||
       'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';
        execute l_sql;
      end loop;
  l_sql :=
 'delete from ' || lower(p_table) || ' where ' || l_pk || ' = ''' || p_old || '''';
  execute l_sql;
end;
$$ language plpgsql;
create or replace function mg_merge(in p_table varchar, in p_old bigint, in p_new bigint) 
returns void
as $$
declare
begin
  perform mg_merge(p_table, p_old::varchar, p_new::varchar);
end;
$$ language plpgsql;


This function searches for all tables referencing p_table with a foreign key and replaces p_old with p_new in them, saving the data necessary to roll back changes. Since, most often, the primary key column will be numeric, for convenience, the mg_merge function (varchar, bigint, bigint) is overloaded.

It remains to develop a rollback function:

mg_undo
create or replace function mg_undo() returns void
as $$
declare
  l_action int;
  l_old    varchar(50);
  l_table  text;
  l_sql    text;
  tabs     record;
begin
  select max(id) into l_action
  from   mg_action;
  if l_action is null then
     raise EXCEPTION 'Can''t UNDO';
  end if;
  select table_name, old_id into l_table, l_old
  from   mg_action
  where  id = l_action;
  l_sql := 
 'insert into ' || l_table || ' ' ||
 'select * from mg_' || l_table || ' ' ||
 'where id = ''' || l_old || '''';
  execute l_sql;
  for tabs in
      select table_name,
             pk_name,
             column_name
      from   mg_action_detail
      where  action_id = l_action
      group  by table_name, pk_name, column_name
      loop
         l_sql := 
        'update ' || tabs.table_name || ' ' ||
        'set ' || tabs.column_name || ' = ''' || l_old || ''' ' ||
        'where '''' || ' || tabs.pk_name || ' in (' ||
        'select '''' || obj_id from mg_action_detail '||
        'where table_name = ''' || tabs.table_name || ''' ' ||
        'and action_id = ' || l_action || ') ';
         execute l_sql;
      end loop;
  l_sql := 
 'delete from mg_' || l_table || ' where id = ''' || l_old || '''';
  execute l_sql;
  delete from mg_action_detail where action_id = l_action;
  delete from mg_action where id = l_action;
end;
$$ language plpgsql;


Changes will be rolled back in order strictly contrary to their creation. For this reason, no arguments are required for mg_undo to be passed.

Let's see how it all works. Create lookup tables:

create sequence city_seq;
create table    city (
  id            bigint         default nextval('city_seq') not null,
  name          varchar(100)   not null,
  primary key(id)
);
create sequence street_seq;
create table    street (
  id            bigint         default nextval('street_seq') not null,
  city_id       bigint         not null references city(id),
  name          varchar(100)   not null,
  primary key(id)
);
create sequence address_seq;
create table    address (
  id            bigint         default nextval('address_seq') not null,
  street_id     bigint         not null references street(id),
  house         varchar(10)    not null,
  apartment     varchar(10)    not null,
  primary key(id)
);

... and fill them with test data:

insert into city(id, name) values (1, 'Казань');
insert into street(id, city_id, name) values (1, 1, 'Победы');
insert into street(id, city_id, name) values (2, 1, 'Победы проспект');
insert into address(id, street_id, house, apartment) values (1, 1, '10', '1');
insert into address(id, street_id, house, apartment) values (2, 2, '10', '2');


Now, in order to "merge" Victory Avenue street with Victory street, it is enough to run the following command:

select mg_merge('street', 2, 1);

The mg_undo () function, as mentioned above, will roll back the changes.

Hope this was helpful to someone. Sources are posted on GitHub .

Also popular now: