#PostgreSQL. Accelerate deployment seven times using multi-threading
Hello! We are using PostgreSQL on a GIS housing and communal services project and recently encountered the problem of long execution of SQL scripts due to the rapid increase in data in the database. In February 2018, at PGConf, I talked about how we solved this problem. Presentation slides are available on the conference website . I bring to your attention the text of my speech.


About GIS housing and communal services there was already a detailed article in the blog of the LANIT group on Habré. If in a nutshell, the GIS housing and communal services is the first federal portal in Russia about all the information on housing and communal services, which has been launched in almost all regions (in 2019, Moscow, St. Petersburg and Sevastopol will join). Over the past three months, more than 12 TB of data about houses, personal accounts, facts of payment, and much, much more has been loaded into the system, and in all, more than 24 TB is now in PostgreSQL.
The project is architecturally divided into subsystems. Each subsystem has a separate database. In total, there are now about 60 such databases; they are located on 11 virtual servers. Some subsystems are heavier loaded than others, and their databases can take up 3-6 terabytes in volume.

Now I’ll talk a little more about the problem. I’ll start from afar: we have the application code and the database migration code (by migration I mean transferring the database from one revision to another with all the necessary SQL scripts to do this) are stored together in the version control system. This is possible thanks to the use of Liquibase (more about Liquibase on the project can be found in the report by Misha Balayan at TechGuruDay in LANIT).
Now let's imagine the release of a version. When there is only a couple terabytes of data or less and all tables are within a hundred gigabytes, changes (migrations) of any data or structural changes in any tables are quick (usually).
Now let's imagine that we already have a couple dozen terabytes of data and several terabyte tables and more have appeared (possibly partitioned). In the new version, we need to migrate to one of these tables, or even worse for all at once. And at the same time, the routine maintenance time cannot be increased. And at the same time, the same migration must be carried out on test databases, where the iron is weaker. And at the same time, you need to understand in advance how much in total all time migrations will take. This is where the problem begins.
First we tried the tipsfrom the official PostgreSQL documentation (deleting indexes and FK before mass migration, re-creating tables from scratch, using copy, dynamically changing the config). It gave an effect, but we wanted it even faster and more convenient (here, of course, the subjective matter is - to whom it is convenient :–)). As a result, we implemented parallel execution of mass migrations, which increased the speed on many cases at times (and sometimes by an order of magnitude). Although several processes actually run in parallel, the word “multi-threading” has taken root inside the team.

The main idea of this approach is to divide a large table into disjoint ranges (for example, by the ntile function) and execute an SQL script not immediately on all data, but in parallel on several ranges. Each parallel process takes one range for itself, blocks it and starts executing an SQL script only for data from this range. Once the script has completed, we again look for the unblocked and not yet processed range and repeat the operation. It is important to choose the right key to split. This must be an indexed field with unique values. If there is no such field, you can use the ctid service field.
The first version of "multi-threading" was implemented using the auxiliary table with ranges and the function of taking the next range. The required SQL script was substituted into an anonymous function and launched in the required number of sessions, providing parallel execution.
Although this approach worked quickly, it required a very large number of manual actions. And if the deployment took place at 3 o’clock in the morning, the DBA should have caught the moment of execution of the “multithreaded” script in Liquibase (which executed it, in fact, in one process) and started several more processes with your hands to speed it up.

The previous version of "multithreading" was inconvenient to use. Therefore, we made an application on Go that automates the process (it can also be done in Python, for example, and in many other languages).
First, we break up the data in the mutable table into ranges. After that, we add information about the script to the auxiliary task table — its name (a unique identifier, for example, the name of the task in Jira) and the number of simultaneously launched processes. Then, in the auxiliary table of scripts, we add the SQL migration text, divided into ranges.
When deploying, an application is called on Go, which reads the task configuration and scripts for this task from auxiliary tables and automatically runs scripts with a given number of parallel processes (workers). After execution, control is transferred back to Liquibase.
The application consists of three main abstractions:
In the task.do method , a channel is created into which all statements of the operation are sent. This channel runs the specified number of workers. Inside the workers an infinite loop, it multiplexes on two channels: through which it receives statements and executes them, and an empty channel as a signaling device? what needs to be completed. As soon as the empty channel is closed, worker will shut down - this happens when an error occurs in one of the workers. Because Since channels in Go are a thread – safe structure, by closing one channel we can cancel all workers at once. When the statement in the channel ends, worker simply exits the loop and decreases the total counter for all workers. Since the task always knows how many workers work on it, it simply waits for this counter to be reset to zero and then terminates itself.

Due to this implementation of "multi-threading", several interesting features have appeared:
The main disadvantage is the need to go through the fullscan once on the plate to divide it into ranges if a text field, date or uid is used as the key. If a field with sequentially increasing dense values is selected as the key for partitioning, then there is no such problem (we can specify all ranges in advance by simply setting the required step).

Finally, I will give an example of a comparison of the speed of an UPDATE operation of 500,000,000 rows without and with multithreading. Simple UPDATE took 49 minutes to complete, while multi-threading completed in seven minutes.

All tools are good for certain tasks, and here are a few for multi-threading.

Given

About GIS housing and communal services there was already a detailed article in the blog of the LANIT group on Habré. If in a nutshell, the GIS housing and communal services is the first federal portal in Russia about all the information on housing and communal services, which has been launched in almost all regions (in 2019, Moscow, St. Petersburg and Sevastopol will join). Over the past three months, more than 12 TB of data about houses, personal accounts, facts of payment, and much, much more has been loaded into the system, and in all, more than 24 TB is now in PostgreSQL.
The project is architecturally divided into subsystems. Each subsystem has a separate database. In total, there are now about 60 such databases; they are located on 11 virtual servers. Some subsystems are heavier loaded than others, and their databases can take up 3-6 terabytes in volume.
MCC, we have a problem

Now I’ll talk a little more about the problem. I’ll start from afar: we have the application code and the database migration code (by migration I mean transferring the database from one revision to another with all the necessary SQL scripts to do this) are stored together in the version control system. This is possible thanks to the use of Liquibase (more about Liquibase on the project can be found in the report by Misha Balayan at TechGuruDay in LANIT).
Now let's imagine the release of a version. When there is only a couple terabytes of data or less and all tables are within a hundred gigabytes, changes (migrations) of any data or structural changes in any tables are quick (usually).
Now let's imagine that we already have a couple dozen terabytes of data and several terabyte tables and more have appeared (possibly partitioned). In the new version, we need to migrate to one of these tables, or even worse for all at once. And at the same time, the routine maintenance time cannot be increased. And at the same time, the same migration must be carried out on test databases, where the iron is weaker. And at the same time, you need to understand in advance how much in total all time migrations will take. This is where the problem begins.
First we tried the tipsfrom the official PostgreSQL documentation (deleting indexes and FK before mass migration, re-creating tables from scratch, using copy, dynamically changing the config). It gave an effect, but we wanted it even faster and more convenient (here, of course, the subjective matter is - to whom it is convenient :–)). As a result, we implemented parallel execution of mass migrations, which increased the speed on many cases at times (and sometimes by an order of magnitude). Although several processes actually run in parallel, the word “multi-threading” has taken root inside the team.
Multi-threading

The main idea of this approach is to divide a large table into disjoint ranges (for example, by the ntile function) and execute an SQL script not immediately on all data, but in parallel on several ranges. Each parallel process takes one range for itself, blocks it and starts executing an SQL script only for data from this range. Once the script has completed, we again look for the unblocked and not yet processed range and repeat the operation. It is important to choose the right key to split. This must be an indexed field with unique values. If there is no such field, you can use the ctid service field.
The first version of "multi-threading" was implemented using the auxiliary table with ranges and the function of taking the next range. The required SQL script was substituted into an anonymous function and launched in the required number of sessions, providing parallel execution.
Code example
-- Таблица UPDATE_INFO_STEPS используется для реализации обновления/заполнения -- больших таблиц, выполнения сложных запросов обновления/заполненияCREATETABLE UPDATE_INFO_STEPS (
BEGIN_GUID varchar(36),
END_GUID varchar(36) NOTNULL,
STEP_NO int,
STATUSchar(1),
BEGIN_UPD timestamp,
END_UPD timestamp,
ROWS_UPDATED int,
ROWS_UPDATED_TEXT varchar(30),
DISCR varchar(10)
);
ALTERTABLE UPDATE_INFO_STEPS ADD PRIMARY KEY(discr, step_no);
-- Функция FUNC_UPDATE_INFO_STEPS реализует ключевой функционал. -- Возможность "брать" следующий интервал, если текущий занят.CREATEORREPLACEFUNCTION func_update_info_steps(
pStep_no int,
pDiscr varchar(10)
) RETURNStextAS
$BODY$
DECLARE
lResult text;
BEGINSELECT'SUCCESS'INTO lResult
FROM
update_info_steps
WHERE
step_no = pStep_no
AND discr = pDiscr
ANDstatus = 'N'FORUPDATENOWAIT;
UPDATE
UPDATE_INFO_STEPS
SETstatus = 'A',
begin_upd = now()
WHERE
step_no = pStep_no
AND discr = pDiscr
ANDstatus = 'N';
return lResult;
EXCEPTION WHEN lock_not_available THEN
SELECT'ERROR'INTO lResult;
return lResult;
END;
$BODY$
LANGUAGE PLPGSQL VOLATILE;
-- Пример использования (1 процесс на 1 сессию)-- Шаг 1. Заполняем служебную таблицу интервалами для обработки.DOLANGUAGE PLPGSQL
$$
DECLARE-- Указать количество обрабатываемых записей за одну итерацию
l_count int := 10000;
-- Подставить идентификатор
l_discr VARCHAR(10) := '<discr>';
BEGININSERTINTO UPDATE_INFO_STEPS (
BEGIN_GUID, END_GUID, STEP_NO, STATUS,
DISCR
)
SELECTmin(guid) BEGIN_GUID,
max(guid) END_GUID,
RES2.STEP STEP_NO,
'N' :: char(1) STATUS,
l_discr DISCR
FROM
(
SELECT
guid,
floor(
(ROWNUM - 1) / l_count
) + 1AS STEP
FROM
(
-- Подставить название колонкиSELECT
<column> AS GUID,
-- Подставить название колонки
row_number() over (
ORDERBY
<column>
) ASROWNUMFROM-- Подставить схему и название таблицы
<schema>.<table_name>
ORDERBY1--
) RES1
) RES2
GROUPBY
RES2.step;
END;
$$;
-- Шаг 2. Используя служебную таблицу, выполняем скрипт UPDATE.DOLANGUAGE PLPGSQL
$$
DECLARE
cur record;
vCount int;
vCount_text varchar(30);
vCurStatus char(1);
vCurUpdDate date;
-- Подставить идентификатор
l_discr varchar(10) := '<discr>';
l_upd_res varchar(100);
BEGINFOR cur IN (
SELECT
*
FROM
UPDATE_INFO_STEPS
WHEREstatus = 'N'AND DISCR = l_discr
ORDERBY
step_no
) LOOP
vCount := 0;
-- Внутренняя транзакция обязательна!SELECTresultINTO l_upd_res
FROM
dblink(
'<parameters>',
'SELECT FUNC_UPDATE_INFO_STEPS(' || cur.step_no
|| ','''
|| l_discr
|| ''')'
) AS T (resulttext);
IF l_upd_res = 'SUCCESS' THEN
-- Основной скрипт. В данной секции необходимо выполнять -- требуемые действия по обновлению, вставке и тп.-- Обязательное требование - использовать интервал -- cur.begin_guid - cur.end_guid и dblink на "самого себя".-- Указан примерный скрипт.SELECT
dblink(
'<parameters>',
'UPDATE FOO set level = 42
WHERE id BETWEEN ''' || cur.begin_guid
|| ''' AND '''
|| cur.end_guid
|| ''''
) INTO vCount_text;
-- Конец основного скрипта.SELECT
dblink(
'<parameters>',
'update UPDATE_INFO_STEPS
SET status = ''P'', end_upd = now(),
rows_updated_text = ''' || vCount_text || '''
WHERE step_no = ' || cur.step_no || '
AND discr = ''' || l_discr || ''''
) INTO l_upd_res;
ENDIF;
ENDLOOP;
END;
$$;
-- Мониторинг выполнения.SELECTSUM(CASEstatusWHEN'P'THEN1ELSE0END) done,
SUM(CASEstatusWHEN'A'THEN1ELSE0END) processing,
SUM(CASEstatusWHEN'N'THEN1ELSE0END) LEFT_,
round(
SUM(CASEstatusWHEN'P'THEN1ELSE0END):: numeric / COUNT(*)* 100 :: numeric,
2
) done_proc
FROM
UPDATE_INFO_STEPS
WHERE
discr = '<discr>';
Although this approach worked quickly, it required a very large number of manual actions. And if the deployment took place at 3 o’clock in the morning, the DBA should have caught the moment of execution of the “multithreaded” script in Liquibase (which executed it, in fact, in one process) and started several more processes with your hands to speed it up.
"MnGOpostok 2.0"

The previous version of "multithreading" was inconvenient to use. Therefore, we made an application on Go that automates the process (it can also be done in Python, for example, and in many other languages).
First, we break up the data in the mutable table into ranges. After that, we add information about the script to the auxiliary task table — its name (a unique identifier, for example, the name of the task in Jira) and the number of simultaneously launched processes. Then, in the auxiliary table of scripts, we add the SQL migration text, divided into ranges.
Code example
-- В целевой БД необходимо создать объекты, в которых будет храниться -- конфигурация многопоточного обновления (pg_parallel_task)-- и логи задания (pg_parallel_task_statements).CREATETABLEIFNOTEXISTS public.pg_parallel_task (
nametext primary key, threads_count intnotnullDEFAULT10,
commenttext
);
COMMENTONtable public.pg_parallel_task
IS'Задание параллельного выполнения';
COMMENTONCOLUMN public.pg_parallel_task.name
IS'Уникальный идентификатор';
COMMENTONCOLUMN public.pg_parallel_task.threads_count
IS'Количество одновременных потоков обработки. По умолчанию 10';
COMMENTONCOLUMN public.pg_parallel_task.comment
IS'Комментарий';
CREATETABLEIFNOTEXISTS public.pg_parallel_task_statements (
statement_id bigserial primary key,
task_name textnotnullreferences public.pg_parallel_task (name),
sql_statement textnotnull,
statustextnotnullcheck (
statusin (
'new', 'in progress', 'ok', 'error'
)
) DEFAULT'new',
start_time timestampwithouttime zone,
elapsed_sec float(8),
rows_affected bigint,
err text
);
COMMENTONtable public.pg_parallel_task_statements
IS'Операторы параллельного выполнения';
COMMENTONCOLUMN public.pg_parallel_task_statements.sql_statement
IS'Полный текст выполняемого запроса';
COMMENTONCOLUMN public.pg_parallel_task_statements.status
IS'Статус обработки текущего оператора. Один из new|in progress|ok|error';
COMMENTONCOLUMN public.pg_parallel_task_statements.start_time
IS'Время начала выполнения текущего оператора';
COMMENTONCOLUMN public.pg_parallel_task_statements.elapsed_sec
IS'Для выполненных операторов, затраченное время в секундах';
COMMENTONCOLUMN public.pg_parallel_task_statements.rows_affected
IS'Для выполненных операторов, количество затронутных строк';
COMMENTONCOLUMN public.pg_parallel_task_statements.err
IS'Для выполненных операторов, текст ошибки. NULL, если выполнение успешно.';
-- Основной скриптINSERTINTO PUBLIC.pg_parallel_task (NAME, threads_count)
VALUES ('JIRA-001', 10);
INSERTINTO PUBLIC.pg_parallel_task_statements (task_name, sql_statement)
SELECT'JIRA-001' task_name,
FORMAT(
'UPDATE FOO SET level = 42 where id >= ''%s'' and id <= ''%s''',
MIN(d.id),
MAX(d.id)
) sql_statement
FROM
(
SELECTid,
NTILE(10) OVER (
ORDERBYid
) part
FROM
foo
) d
GROUPBY
d.part;
-- Конец основного скрипта
When deploying, an application is called on Go, which reads the task configuration and scripts for this task from auxiliary tables and automatically runs scripts with a given number of parallel processes (workers). After execution, control is transferred back to Liquibase.
Code
<changeSet id="JIRA-001" author="soldatov">
<executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
<arg value="testdatabase"/><arg value="JIRA-001"/>
</executeCommand>
</changeSet>
The application consists of three main abstractions:
- task - loads the migration parameters, the number of processes and all ranges into memory, starts multi-threading and raises a Web server to track progress;
- statement - represents one range of the operation being performed, it is also responsible for changing the status of the execution of the range, recording the execution time of the range, the number of lines in the range, etc .;
- worker - represents a single thread of execution.
In the task.do method , a channel is created into which all statements of the operation are sent. This channel runs the specified number of workers. Inside the workers an infinite loop, it multiplexes on two channels: through which it receives statements and executes them, and an empty channel as a signaling device? what needs to be completed. As soon as the empty channel is closed, worker will shut down - this happens when an error occurs in one of the workers. Because Since channels in Go are a thread – safe structure, by closing one channel we can cancel all workers at once. When the statement in the channel ends, worker simply exits the loop and decreases the total counter for all workers. Since the task always knows how many workers work on it, it simply waits for this counter to be reset to zero and then terminates itself.
Buns

Due to this implementation of "multi-threading", several interesting features have appeared:
- Integration with Liquibase (called using the executeCommand tag).
- A simple web interface that appears when you start the “multi-threading” and contains all the information about the progress of its implementation.
- Progress bar (we know how much one range is processed, how many parallel processes are running and how many ranges are still left to process - so we can calculate the completion time).
- Dynamic change of parallel processes (while we are doing this with our hands, but in the future we want to automate).
- Logging of information during the execution of multi-threaded scripts for the possibility of further analysis.
- You can perform blocking operations such as update, blocking almost nothing (if you break the plate into very small ranges, all scripts will be executed almost instantly).
- There is a wrapper for calling “multithreads” directly from the database.
Not goodies
The main disadvantage is the need to go through the fullscan once on the plate to divide it into ranges if a text field, date or uid is used as the key. If a field with sequentially increasing dense values is selected as the key for partitioning, then there is no such problem (we can specify all ranges in advance by simply setting the required step).
Speeding up seven times (test on pgbench table)

Finally, I will give an example of a comparison of the speed of an UPDATE operation of 500,000,000 rows without and with multithreading. Simple UPDATE took 49 minutes to complete, while multi-threading completed in seven minutes.
Code example
SELECTcount(1) FROM pgbench_accounts;
count
-------
500000000
(1 row)
SELECT pg_size_pretty(pg_total_relation_size('pgbench_accounts'));
pg_size_pretty
----------------
62 Gb
(1 row)
UPDATE pgbench_accounts
SET abalance = 42;
-- Время выполнения 49 минут
vacuum full analyze verbose pgbench_accounts;
INSERTINTO public.pg_parallel_tASk (name, threads_count) values ('JIRA-002', 25);
INSERTINTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT'JIRA-002' tASk_name,
FORMAT('UPDATE pgbench_accounts
SET abalance = 42
WHERE aid >= ''%s'' AND aid <= ''%s'';',
MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid, ntile(25) over (orderby aid) part
FROM pgbench_accounts) d
GROUPBY d.part;
-- Время выполнения 10 минут-- Можно дробить по ctid, но получится неравномерно и нужно чтобы эту таблицу никто не изменял в процесе многопоточкиINSERTINTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT'JIRA-002-ctid' tASk_name,
FORMAT('UPDATE pgbench_accounts
SET abalance = 45
WHERE (ctid::text::point)[0]::text > ''%s'' AND (ctid::text::point)[0]::text <= ''%s'';',
(d.min_ctid), (d.max_ctid)) sql_statement
FROM (
WITH max_ctid AS (
SELECTMAX((ctid::text::point)[0]::int) FROM pgbench_accounts)
SELECT generate_series - (SELECTmax / 25FROM max_ctid) AS min_ctid, generate_series AS max_ctid
FROM generate_series((SELECTmax / 25FROM max_ctid), (SELECTmaxFROM max_ctid), (SELECTmax / 25FROM max_ctid))) d;
-- Время выполнения 9 мин
./pgpar-linux-amd64 jdbc:postgresql://localhost:5432 soldatov password testdatabase JIRA-002
-- Время выполнения 7 минут
PS Do you need it if:

All tools are good for certain tasks, and here are a few for multi-threading.
- UPDATE tables> 100,000 rows.
- UPDATE with complex logic that can be parallelized (for example, calling functions to calculate something).
- UPDATE without locks. By crushing into very small ranges and starting a small number of processes, instant processing of each range can be achieved. Thus, the lock will also be almost instantaneous.
- Executing changeSets in parallel in Liquibase (e.g. VACUUM).
- Creation and filling with data of new fields in the table.
- Complex reports.
Almost non-blocking UPDATE (50,000 ranges of 10,000 rows each)
<changeSet author="soldatov" id="JIRA-002-01">
<sql>
<![CDATA[
INSERTINTO public.pg_parallel_task (name, threads_count)
VALUES ('JIRA-002', 5);
INSERTINTO public.pg_parallel_task_statements (task_name, sql_statement)
SELECT'JIRA-002' task_name,
FORMAT(
'UPDATE pgbench_accounts
SET abalance = 42
WHERE filler IS NULL
AND aid >= ''%s'' AND aid <= ''%s'';',
MIN(d.aid),
MAX(d.aid)
) sql_statement
FROM
(
SELECT
aid,
ntile(10000) over (
orderby
aid
) part
FROM
pgbench_accounts
WHERE
filler ISNULL
) d
GROUPBY
d.part;
]]>
</sql>
</changeSet>
<changeSet author="soldatov" id="JIRA-002-02">
<executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
<arg value="pgconfdb"/><arg value="JIRA-002"/>
</executeCommand>
</changeSet>
Concurrent changeSets in Liquibase
<changeSet author="soldatov" id="JIRA-003-01">
<sql>
<![CDATA[
INSERTINTO pg_parallel_task (name, threads_count)
VALUES ('JIRA-003', 2);
INSERTINTO pg_parallel_task_statements (task_name, sql_statement)
SELECT'JIRA-003' task_name,
'VACUUM FULL ANALYZE pgbench_accounts;' sql_statement;
INSERTINTO pg_parallel_task_statements (task_name, sql_statement)
SELECT'JIRA-003' task_name,
'VACUUM FULL ANALYZE pgbench_branches;' sql_statement;
]]>
</sql>
</changeSet>
<changeSet author="soldatov" id="JIRA-003-02">
<executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
<arg value="testdatabase"/><arg value="JIRA-003"/>
</executeCommand>
</changeSet>
Almost non-blocking filling of a new field of the table with data (50,000 ranges of 10,000 rows each) with a call to the "multithread" function from the database
-- SQL partALTERTABLE pgbench_accounts ADDCOLUMN account_number text;
INSERTINTO public.pg_parallel_task (name, threads_count) VALUES ('JIRA-004', 5);
INSERTINTO public.pg_parallel_task_statements (task_name, sql_statement)
SELECT'JIRA-004' task_name,
FORMAT('UPDATE pgbench_accounts
SET account_number = aid::text || filler
WHERE aid >= ''%s'' AND aid <= ''%s'';',
MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid,
ntile(50000) over (orderby device_version_guid) part
FROM pgbench_accounts) d
GROUPBY d.part;
SELECT * FROM func_run_parallel_task('testdatabase','JIRA-004');
By the way, we have a vacancy