Migrating data from MySQL to PostgreSQL

As you work with databases, familiarize yourself with their pros and cons, a moment arises when a decision is made to migrate from one DBMS to another. In this case, the problem arose of transferring services from MySQL to PostgreSQL. Here is a small list of the goodies that are expected from the transition to PostgreSQL, version 9.2 (a more detailed list of features can be found here ):
  • table inheritance (there are limitations that promise to be fixed in the future)
  • ranges : int4range, numrange, daterange
  • support out of the box several languages ​​for stored functions (PL / pgSQL, PL / Tcl, PL / Perl, PL / Python and bare C)
  • WITH statement to make recursive queries
  • (planned) materialized submissions (they are partially available now - as IUD rules for submission)
  • (planned) trigger on DDL operation

As a rule, existing solutions rely on working with a ready-made SQL dump, which is converted in accordance with the syntax of the target database. But in some cases (an actively used web application with a large amount of information), this option incurs certain time costs for creating an SQL dump from the DBMS, converting it, and loading the resulting dump again into the DBMS. Therefore, the online option (straight from the DBMS to the DBMS) of the converter will be more optimal, which can significantly reduce the downtime of services.

The language for the implementation was C ++ (with some features from C ++ 11x), the libraries for connecting to MySQL and PostgreSQL were used native, Qt Creator was used as an IDE.

The migration algorithm is as follows. It is understood that the recipient database has already created a table structure corresponding to the structure in the source database. A list of tables for data transfer is formed, which is then distributed in the thread pool. Each thread has a connection to the source database and to the destination database. Those. several tables are transferred in parallel. Profit!

Traditionally, any application has a certain framework - a set of system components that other components rely on - working with a configuration file, log, error handler, memory manager, and more. In our case, only the most necessary is used to solve the problem. Firstly, some fundamental and composite types were redefined (for convenience only) (yes, I know, alias templates could be use, but it turned out like this):
simple types
typedef bool t_bool;
typedef char t_char;
typedef unsigned char t_uchar;
typedef signed char t_schar;
typedef int t_int;
typedef unsigned int t_uint;
typedef float t_float;
typedef double t_double;

map
template
class CMap
    : public std::map
{
public:	
    CMap();
    virtual ~CMap();
};
template
CMap::CMap()
{
}
template
CMap::~CMap()
{
}

vector
template
class CVector
    : public std::vector
{
public:
    CVector();
    virtual ~CVector();
};
template
CVector::CVector()
{
}
template
CVector::~CVector()
{
}

fstream
class CFileStream
    : public std::fstream
{
public:
    CFileStream();
    virtual ~CFileStream();
};

Of the explicit patterns, only singleton is used:
classic singleton meyers
template
class CSingleton
{
public:
    static T* instance();
    void free();
protected:
    CSingleton();
    virtual ~CSingleton();
};
template
T* CSingleton::instance()
{
    static T *instance = new T();
    return instance;
}
template
void CSingleton::free()
{
    delete this;
}
template
CSingleton::CSingleton()
{
}
template
CSingleton::~CSingleton()
{
}

Base classes for the task (performed in a separate thread) and the system (starts to execute the task):
task.h
class CTask
{
public:	
    CTask();
    virtual ~CTask();
    void execute();
    t_uint taskID();
    t_bool isExecuted();
protected:
    virtual void executeEvent() = 0;
private:
    t_uint m_task_id;
    t_bool m_executed;
};

task.cpp
CTask::CTask()
    : m_executed(false)
{
    static t_uint task_id = 0;
    m_task_id = task_id++;
}
CTask::~CTask()
{
}
void CTask::execute()
{
    executeEvent();
    m_executed = true;
}
t_uint CTask::taskID()
{
    return m_task_id;
}
t_bool CTask::isExecuted()
{
    return m_executed;
}

system.h
class CSystem
{
public:
    CSystem();
    virtual ~CSystem() = 0;
protected:
    void executeTask(CTask *task);
};

system.cpp
CSystem::CSystem()
{
}
CSystem::~CSystem()
{
}
void CSystem::executeTask(CTask *task)
{
    CTask& task_ref = *task;
    std::thread thread([&]() { task_ref.execute(); });
    thread.detach();
}

At the end of the consideration of basic types, we need to mention the string class, which had to be written from scratch, so that for some operations (substring substitution and concatenation) it was possible to work with the transferred buffer (about it a bit later) without additional memory allocations and some things (converting the string to a number and numbers in strings) make members of the class (only the class declaration is given):
string.h
class CString
{
public:
    CString(const t_char *data = nullptr);
    CString(const CString& s);
    ~CString();
    const t_char* ptr() const;
    void setPtr(t_char *p);
    CString& operator= (const CString& s);
    CString operator+ (const t_char *p) const;
    CString operator+ (t_char c) const;
    CString operator+ (const CString& s) const;
    friend CString operator+ (const t_char *p, const CString& s);
    CString& operator+= (const t_char *p);
    CString& operator+= (t_char c);
    CString& operator+= (const CString& s);
    t_bool operator== (const CString& s) const;
    t_bool operator!= (const CString& s) const;
    t_bool operator< (const CString& s) const;
    t_bool operator> (const CString& s) const;
    t_bool operator<= (const CString& s) const;
    t_bool operator>= (const CString& s) const;
    t_char& at(t_uint index);
    t_char at(t_uint index) const;
    t_uint length() const;
    t_bool isEmpty() const;
    void clear();
    t_int search(const CString& s, t_uint from = 0) const;
    CString substr(t_uint from, t_int count = -1) const;
    CString replace(const CString& before, const CString& after) const;
    static CString fromNumber(t_uint value);
    static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr);
    CVector split(const CString& splitter) const;
    t_bool match(const CString& pattern) const;
    static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer);
    static t_uint lengthPtr(const t_char *src);
    static t_uint concatenatePtr(const t_char *src, char *buffer);
private:
    t_char *m_data;
    t_uint length(const t_char *src) const;
    t_char* copy(const t_char *src) const;
    t_char* concatenate(const t_char *src0, t_char c) const;
    t_char* concatenate(const t_char *src0, const t_char *src1) const;
    t_int compare(const t_char *src0, const t_char *src1) const;
};
CString operator+ (const t_char *p, const CString& s);

As an inevitability, for the application, a little more than “Hello, world”, it is a log and configuration file. A mutex was used in the method of writing a message to the log, since each task writes about this to the log as the table is processed. Small granular locks and lockfree algorithms were not considered due to the fact that writing to the log is far from a bottleneck in the application:
log.h
class CLog
    : public CSingleton
{
public:
    enum MessageType
    {
        Information,
        Warning,
        Error
    };
    CLog();
    virtual ~CLog();
    void information(const CString& message);
    void warning(const CString& message);
    void error(const CString& message);
private:
    std::mutex m_mutex;
    CFileStream m_stream;
    void writeTimestamp();
    void writeHeader();
    void writeFooter();
    void writeMessage(MessageType type, const CString& message);
};

log.cpp
CLog::CLog()
{
    m_stream.open("log.txt", std::ios_base::out);
    writeHeader();
}
CLog::~CLog()
{
    writeFooter();
    m_stream.flush();
    m_stream.close();
}
void CLog::information(const CString& message)
{
    writeMessage(Information, message);
}
void CLog::warning(const CString& message)
{
    writeMessage(Warning, message);
}
void CLog::error(const CString& message)
{
    writeMessage(Error, message);
}
void CLog::writeTimestamp()
{
    time_t rawtime;
    tm *timeinfo;
    t_char buffer[32];
    time(&rawtime);
    timeinfo = localtime(&rawtime);
    strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo);
    m_stream << buffer << " ";
}
void CLog::writeHeader()
{
    writeMessage(Information, "Log started");
}
void CLog::writeFooter()
{
    writeMessage(Information, "Log ended");
}
void CLog::writeMessage(MessageType type, const CString& message)
{
    std::lock_guard guard(m_mutex);
    writeTimestamp();
    switch (type)
    {
    case Information:
        {
            m_stream << "Information " << message.ptr();
            break;
        }
    case Warning:
        {
            m_stream << "Warning " << message.ptr();
            break;
        }
    case Error:
        {
            m_stream << "Error " << message.ptr();
            break;
        }
    default:
        {
            break;
        }
    }
    m_stream << "\n";
    m_stream.flush();
}

config.h
class CConfig
    : public CSingleton
{
public:
    CConfig();
    virtual ~CConfig();
    CString value(const CString& name, const CString& defvalue = "") const;
private:
    CFileStream m_stream;
    CMap m_values;
};

config.cpp
CConfig::CConfig()
{
    m_stream.open("mysql2psql.conf", std::ios_base::in);
    if (m_stream.is_open())
    {
        CString line;
        const t_uint buffer_size = 256;
        t_char buffer[buffer_size];
        while (m_stream.getline(buffer, buffer_size))
        {
            line = buffer;
            if (!line.isEmpty() && line.at(0) != '#')
            {
                t_int pos = line.search("=");
                CString name = line.substr(0, pos);
                CString value = line.substr(pos + 1);
                m_values.insert(std::pair(name, value));
            }
        }
        m_stream.close();
        CLog::instance()->information("Config loaded");
    }
    else
    {
        CLog::instance()->warning("Can't load config");
    }
}
CConfig::~CConfig()
{
}
CString CConfig::value(const CString& name, const CString& defvalue) const
{
    CMap::const_iterator iter = m_values.find(name);
    if (iter != m_values.end())
    {
        return iter->second;
    }
    return defvalue;
}

mysql2psql.conf
# MySQL connection
mysql_host=localhost
mysql_port=3306
mysql_database=mysqldb
mysql_username=root
mysql_password=rootpwd
mysql_encoding=UTF8
# PostgreSQL connection
psql_host=localhost
psql_port=5432
psql_database=psqldb
psql_username=postgres
psql_password=postgrespwd
psql_encoding=UTF8
# Migration
# (!) Note: source_schema == mysql_database
source_schema=mysqldb
destination_schema=public
tables=*
use_insert=0
# Other settings
threads=16

Now, what about adding data to PostgreSQL. There are two options - to use INSERT queries, which on a large data set have not shown themselves very well in terms of performance (features of the transaction mechanism), or through the COPY command , which allows you to continuously send portions of data by sending a special marker (terminator character) at the end of the transmission. Another nuance is related to the type determination (fields in the table) in PostgreSQL. The documentation did not indicate (perhaps there was no reading between the lines of documentation) how to return a human-identifiable type identifier, so an oid correspondence (an almost unique identifier of each object in the database) and type were compiled :

case 20: // int8
case 21: // int2
case 23: // int4
case 1005: // int2
case 1007: // int4
case 1016: // int8
case 700: // float4
case 701: // float8
case 1021: // float4
case 1022: // float8
case 1700: // numeric
case 18: // char
case 25: // text
case 1002: // char
case 1009: // text
case 1015: // varchar
case 1082: // date
case 1182: // date
case 1083: // time
case 1114: // timestamp
case 1115: // timestamp
case 1183: // time
case 1185: // timestamptz
case 16: // bool
case 1000: // bool


Preparation and execution of tasks is as follows:
  • a list of tables is created
  • connections are created (by the number of tasks) to the source database and the receiver database
  • ranges from the list of tables to tasks are distributed
  • tasks are launched for execution (with a transferred range of tables and database connections)
  • tasks are expected (main thread + created threads)

In each task, three static buffers of 50 MB are allocated, in which data is prepared for the COPY command (escaping special characters and concatenating field values):
code snippet with task preparation
// create connection pool
t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1"));
CLog::instance()->information("Count of working threads: " + CString::fromNumber(threads));
if (!createConnectionPool(threads - 1))
{
    return false;
}
// create tasks
CString destination_schema = CConfig::instance()->value("destination_schema");
t_uint range_begin = 0;
t_uint range_end = 0;
t_uint range = m_tables.size() / threads;
for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j)
{
    range_begin = i;
    range_end = i + range;
    std::unique_ptr task = std::unique_ptr(new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end));
    m_migration_tasks.push_back(std::move(task));
}
range_begin = range_end + 1;
range_end = m_tables.size() - 1;
std::unique_ptr task = std::unique_ptr(new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end));
// executing tasks
for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
    executeTask(m_migration_tasks.at(i).get());
}
task->execute();
// wait for completion
for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
    while (!m_migration_tasks.at(i)->isExecuted())
    {
    }
}


code snippet with preparation in the data task for COPY
t_uint count = 0;
t_char *value;
CString copy_query = "COPY " + m_destination_schema + "." + table + " ( ";
m_buffer[0] = '\0';
m_buffer_temp0[0] = '\0';
m_buffer_temp1[0] = '\0';
if (result->nextRecord())
{
    for (t_uint i = 0; i < result->columnCount(); ++i)
    {
        if (i != 0)
        {
            copy_query += ", ";
            CString::concatenatePtr("\t", m_buffer);
        }
        copy_query += result->columnName(i);
        if (!result->isColumnNull(i))
        {
            value = result->columnValuePtr(i);
            CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0);
            CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1);
            CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0);
            CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1);
            CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0);
            CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1);
            CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0);
            CString::concatenatePtr(m_buffer_temp0, m_buffer);
        }
        else
        {
            CString::concatenatePtr("\\N", m_buffer);
        }
    }
    copy_query += " ) FROM STDIN";
    if (!m_destination_connection->copyOpen(copy_query))
    {
        CLog::instance()->error("Can't execute query '" + copy_query + "', error: " + m_destination_connection->lastError());
        return false;
    }
    CString::concatenatePtr("\n", m_buffer);
    if (!m_destination_connection->copyDataPtr(m_buffer))
    {
        CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());
        return false;
    }
    ++count;
    while (result->nextRecord())
    {
        m_buffer[0] = '\0';
        for (t_uint i = 0; i < result->columnCount(); ++i)
        {
            if (i != 0)
            {
                CString::concatenatePtr("\t", m_buffer);
            }
            if (!result->isColumnNull(i))
            {	
                value = result->columnValuePtr(i);
                CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0);
                CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1);
                CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0);
                CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1);
                CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0);
                CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1);
                CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0);
                CString::concatenatePtr(m_buffer_temp0, m_buffer);
            }
            else
            {
                CString::concatenatePtr("\\N", m_buffer);
            }
        }
        CString::concatenatePtr("\n", m_buffer);
        if (!m_destination_connection->copyDataPtr(m_buffer))
        {
            CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());
            return false;
        }
        ++count;
        if (count % 250000 == 0)
        {
            CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":\t\ttable " + table + " processing, record count: " + CString::fromNumber(count));
        }
    }
}
 



results

It took about 10 minutes to transfer 2 GB of data to PostgreSQL with WAL archiving enabled (16 threads were created).

What to think about

  • Determining the number of tasks / threads at the execution stage - based on the amount of data and available hardware capabilities
  • Determining the amount of memory needed for the buffer in which data is prepared for COPY
  • The distribution of tables between tasks is not by range, but by necessity - tasks take a table from the threadsafe stack

Source

Source code is available on github .

Also popular now: