Data migration from MySQL to PostgreSQL

as you work with databases, examine their pros and cons, there is a moment when the decision of migrating from one DBMS to another. In this case, the problem arose of transferring services from MySQL to PostgreSQL. Here is a small list of the Goodies that you expect from the transition to PostgreSQL 9.2 (with a more detailed list of features can be found here):
the
    the
  • inheritance tables (there are limitations to the promise in the future to fix)
  • the
  • ranges: int4range, numrange, daterange
  • the
  • out of the box support for multiple languages for stored functions (PL/pgSQL, PL/Tcl, PL/Perl, PL/Python and naked C)
  • the
  • operator WITH to allow to do recursive queries
  • (planned) materialized views (partially, they are available now — like the IUD, the rules for submission)

    (planned) of a trigger on DDL operations


Typically, existing solutions rely on a job with a ready SQL dump, which is converted in accordance with the syntax of the target database. But in some cases (widely used web application with a large amount of information) this option carries certain costs for the creation of SQL dump from the database, convert it and load the resulting dump back in the database. So optimal is an online-variant (straight from DBMS to DBMS) Converter that can significantly reduce simple services.

Language to implement the chosen C++ (with some features from C++11x), libraries for connecting to MySQL and PostgreSQL has been used native, as IDE has been involved Qt Creator.

The migration algorithm consists of the following. It is understood that in DB-recipient already created the table structure corresponding to the structure in the database. Create a list of tables to migrate the data, which is then allocated in the thread pool. Each thread has a connection to the database source and database-to the recipient. Ie is transferred in parallel on multiple tables. Profit!

Traditionally any application has some frame — set of system components which are based on other components — work with a configuration file, a log error handler, memory Manager and so on. In our case, we used only the most necessary for solving the problem. First, was overridden (for convenience only) some fundamental and compound types (Yes, I know there's a аlias templates to use, but it happened):
simple types
typedef bool t_bool;

typedef char t_char;
typedef unsigned char t_uchar;
typedef signed char t_schar;

typedef int t_int;
typedef unsigned int t_uint;

typedef float t_float;
typedef double t_double;

map
template<typename T, typename U>
class CMap
: public std::map<T, U>
{
public: 
CMap();
virtual ~CMap();

};

template<typename T, typename U>
CMap<T, U>::CMap()
{
}

template<typename T, typename U>
CMap<T, U>::~CMap()
{
}

vector
template<typename T>
class CVector
: public std::vector<T>
{
public:
CVector();
virtual ~CVector();

};

template<typename T>
CVector<T>::CVector()
{
}

template<typename T>
CVector<T>::~CVector()
{
}

fstream
class CFileStream
: public std::fstream
{
public:
CFileStream();
virtual ~CFileStream();

};

Of the distinct patterns used only a singleton:
classic Meyers singleton
template<typename T>
class CSingleton
{
public:
static T* instance();
void free();

protected:
CSingleton();
virtual ~CSingleton();

};

template<typename T>
T* CSingleton<T>::instance()
{
static T *instance = new T();

return instance;
}

template<typename T>
void CSingleton<T>::free()
{
delete this;
}

template<typename T>
CSingleton<T>::CSingleton()
{
}

template<typename T>
CSingleton<T>::~CSingleton()
{
}

Base classes for problem (runs in a separate thread) and system (starts the execution of the task):
task.h
class CTask

public: 
CTask();
virtual ~CTask();

void execute();

t_uint taskID();
t_bool isExecuted();

protected:
virtual void executeEvent() = 0;

private:
m_task_id t_uint;
t_bool m_executed;

};

task.cpp
CTask::CTask()
: m_executed(false)
{
static t_uint task_id = 0;

m_task_id = task_id++;
}

CTask::~CTask()
{
}

void CTask::execute()
{
executeEvent();

m_executed = true;
}

t_uint CTask::taskID()
{
return m_task_id;
}

t_bool CTask::isExecuted()
{
return m_executed;
}

system.h
class CSystem
{
public:
CSystem();
virtual ~CSystem() = 0;

protected:
void executeTask(CTask *task);

};

system.cpp
CSystem::CSystem()
{
}

CSystem::~CSystem()
{
}

void CSystem::executeTask(CTask *task)
{
CTask& task_ref = *task;

std::thread thread ([&] () { task_ref.execute(); });

thread.detach();
}

At the conclusion of the consideration of the base types need to mention class lines, which had to write from scratch in order for some operations (replacement of a substring and concatenation) had the opportunity to work with the transferred buffer (described below) without additional memory allocations and some things (converting the string to number and number to string) to make the members of the class (provided the class Declaration):
string.h
class CString
{
public:
CString(const t_char *data = nullptr);
CString(const CString&s);
~CString();

const t_char* ptr() const;
void setPtr(t_char *p);

CString&operator= (const CString&s);
CString operator+ (const t_char *p) const;
CString operator+ (t_char c) const;
CString operator+ (const CString&s) const;
friend CString operator+ (const t_char *p, const CString&s);
CString&operator+= (const t_char *p);
CString&operator+= (t_char c);
CString&operator+= (const CString&s);

t_bool operator== (const CString&s) const;
t_bool operator!= (const CString&s) const;

t_bool operator< (const CString&s) const;
t_bool operator > (const CString&s) const;
t_bool operator < = (const CString&s) const;
t_bool operator > = (const CString&s) const;

t_char& at(t_uint index);
t_char at(t_uint index) const;

t_uint length() const;
t_bool isEmpty() const;

void clear();

t_int search(const CString& s, t_uint from = 0) const;
CString substr(t_uint from, t_int count = -1) const;
Replace CString(const CString&before, const CString& after) const;

static CString fromNumber(t_uint value);
static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr);

CVector<CString> split(const CString& splitter) const;
t_bool match(const CString&pattern) const;

static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer);
static t_uint lengthPtr(const t_char *src);
static t_uint concatenatePtr(const t_char *src, char *buffer);

private:
t_char *m_data;

t_uint length(const t_char *src) const;
t_char* copy(const t_char *src) const;
t_char* concatenate(const t_char *src0, t_char c) const;
t_char* concatenate(const t_char *src0, const t_char *src1) const;
t_int compare(const t_char *src0, const t_char *src1) const;
};

CString operator+ (const t_char *p, const CString&s);

As inevitable for the application, a little more than "Hello,world", these are the log and configuration file. In a method of recording messages in the log were used for the mutex, since each task as table processing writes about it in the log. Melkorazmernye blocking and lockfree algorithms are not considered due to the fact that the log is not a bottleneck in the application:
log.h
class CLog
: public CSingleton<CLog>
{
public:
enum MessageType
{
Information
Warning
Error
};

CLog();
virtual ~CLog();

void information(const CString&message);
void warning(const CString&message);
void error(const CString&message);

private:
std::mutex m_mutex;

CFileStream m_stream;

void writeTimestamp();
void writeHeader();
void writeFooter();
void writeMessage(MessageType type, const CString&message);

};

log.cpp
CLog::CLog()
{
m_stream.open("log.txt" std::ios_base::out);

writeHeader();
}

CLog::~CLog()
{
writeFooter();

m_stream.flush();
m_stream.close();
}

void CLog::information(const CString&message)
{
writeMessage(Information, message);
}

void CLog::warning(const CString&message)
{
writeMessage(Warning, message);
}

void CLog::error(const CString&message)
{
writeMessage ("Error", message);
}

void CLog::writeTimestamp()
{
time_t rawtime;
tm *timeinfo;
t_char buffer[32];

time(&rawtime);
timeinfo = localtime(&rawtime);

strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo);

m_stream << buffer << " ";
}

void CLog::writeHeader()
{
writeMessage(Information, "Log started");
}

void CLog::writeFooter()
{
writeMessage(Information, "Log ended");
}

void CLog::writeMessage(MessageType type, const CString&message)
{
std::lock_guard<std::mutex > guard(m_mutex);

writeTimestamp();

switch (type)
{
case Information:
{
m_stream << "Information" << message.ptr();

break;
}

case Warning:
{
m_stream << Warning << message.ptr();

break;
}

case Error:
{


break;
}

default:
{
break;
}
}

m_stream << "\n";

m_stream.flush();
}

config.h
class CConfig
: public CSingleton<CConfig>
{
public:
CConfig();
virtual ~CConfig();

Value CString(const CString&name, CString const&defvalue = "") const;

private:
CFileStream m_stream;
CMap<CString, CString> m_values;

};

config.cpp
CConfig::CConfig()
{
m_stream.open("mysql2psql.conf", std::ios_base::in);

if (m_stream.is_open())
{
CString line;

const t_uint buffer_size = 256;
t_char buffer[buffer_size];

while (m_stream.getline(buffer, buffer_size))
{
line = buffer;

if (!line.isEmpty() && line.at(0) != '#')
{
t_int pos = line.search("=");

CString name = line.substr(0, pos);
CString value = line.substr(pos + 1);

m_values.insert(std::pair<CString, CString>(name, value));
}
}

m_stream.close();

CLog::instance()->information("Config loaded");
}
else
{
CLog::instance ()- > warning("Can't load config");
}
}

CConfig::~CConfig()
{
}

CString CConfig::value(const CString&name, CString const&defvalue) const
{
CMap<CString, CString>::is assigned to a const_iterator iter = m_values.find(name);

if (iter != m_values.end())
{
return iter- > second;
}

return defvalue;
}

mysql2psql.conf
# the MySQL connection
mysql_host=localhost
the mysql_port=3306
mysql_database=mysqldb
mysql_username=root
mysql_password=rootpwd
mysql_encoding=UTF8

# PostgreSQL connection
psql_host=localhost
psql_port=5432
psql_database=psqldb
psql_username=postgres
psql_password=postgrespwd
psql_encoding=UTF8

# Migration
# (!) Note: source_schema == mysql_database
source_schema=mysqldb
destination_schema=public
tables=*
use_insert=0

# Other settings
threads=16

Now, what about adding the data into PostgreSQL. There are two options — use the INSERT queries on the big data array is not really proved itself in terms of performance (transactional mechanism), or via the command COPY, which allows you to continuously send data portion, sending the end of transmission of special marker (symbol-terminator). Another caveat is associated with the type definition (table field) in PostgreSQL. Documentation indicated (may not have been reading between the lines of the documentation) how to return to human readable type identifier, therefore, was prepared according the oid (almost a unique identifier for each object in a DB) and type:

the
case 20: // int8
case 21: // int2
case 23: // int4
case 1005: // int2
case 1007: // int4
case 1016: // int8
case 700: // float4
case 701: // float8
case 1021: // float4
case 1022: // float8
case 1700: // numeric
case 18: // char
case 25: // text
case 1002: // char
case 1009: // text
case 1015: // varchar
case 1082: // date
case 1182: // date
case 1083: // time
case 1114: // timestamp
case 1115: // the timestamp
case 1183: // time
case 1185: // timestamptz
case 16: // bool
case 1000: // bool


Preparation and execution of the tasks is as follows:
the
    the
  • creates a list of tables
  • the
  • create connection (number of tasks) to the database-source and database receiver
  • the
  • are allocated ranges of tables list tasks
  • the
  • tasks are launched for execution (with the given range of tables and connections to the database)
  • the
  • expected task execution (main thread + created threads)

In each task there are three static buffer of 50 MB, which is preparing data for the COPY command (escaping special characters and concatenating field values):
code fragment c preparation tasks
// create connection pool

t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1"));
CLog::instance()->information("Count of working threads:" + CString::fromNumber(threads));

if (!createConnectionPool(threads - 1))
{
return false;
}

// create tasks

CString destination_schema = CConfig::instance()->value("destination_schema");

t_uint range_begin = 0;
t_uint range_end = 0;

t_uint range = m_tables.size() / threads;

for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j)
{
range_begin = i;
range_end = i + range;

std::unique_ptr < CTask > task = std::unique_ptr < CTask > (new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end));

m_migration_tasks.push_back(std::move(task));
}

range_begin = range_end + 1;
range_end = m_tables.size() - 1;

std::unique_ptr < CTask > task = std::unique_ptr < CTask > (new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end));

// executing tasks

for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
executeTask(m_migration_tasks.at(i).get());
}

task->execute();

// wait for completion

for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
while (!m_migration_tasks.at(i)- > isExecuted())
{
}
}


the code snippet with the preparation of task data for COPY
t_uint count = 0;


CString copy_query = "COPY" + m_destination_schema + "." + table + " ( ";

m_buffer[0] = '\0';
m_buffer_temp0[0] = '\0';
m_buffer_temp1[0] = '\0';

if (result- > nextRecord())
{
for (t_uint i = 0; i < result->columnCount(); ++i)
{
if (i != 0)
{
copy_query += ", ";
CString::concatenatePtr("\t", m_buffer);
}

copy_query += result- > columnName(i);

if (!result->isColumnNull(i))
{
value = result->columnValuePtr(i);

CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "\v" "\\v", m_buffer_temp0);

CString::concatenatePtr(m_buffer_temp0, m_buffer);
}
else
{
CString::concatenatePtr("\\N", m_buffer);
}
}

copy_query += ") FROM STDIN";

if (!m_destination_connection->copyOpen(copy_query))
{
CLog::instance()->error("Can't execute query '" + copy_query + "', error: "+ m_destination_connection->lastError());

return false;
}

CString::concatenatePtr("\n", m_buffer);

if (!m_destination_connection->copyDataPtr(m_buffer))
{
CLog::instance()->error("Can't copy data, error:" + m_destination_connection->lastError());

return false;
}

++count;

while (result- > nextRecord())
{
m_buffer[0] = '\0';

for (t_uint i = 0; i < result->columnCount(); ++i)
{
if (i != 0)
{
CString::concatenatePtr("\t", m_buffer);
}

if (!result->isColumnNull(i))
{ 
value = result->columnValuePtr(i);

CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "\v" "\\v", m_buffer_temp0);

CString::concatenatePtr(m_buffer_temp0, m_buffer);
}
else
{

CString::concatenatePtr("\\N", m_buffer);
}
}

CString::concatenatePtr("\n", m_buffer);

if (!m_destination_connection->copyDataPtr(m_buffer))
{
CLog::instance()->error("Can't copy data, error:" + m_destination_connection->lastError());

return false;
}

++count;

if (count % 250000 == 0)
{
CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":\t\ttable "+ table + "processing, record count:" + CString::fromNumber(count));
}
}
}



Results

To transfer 2 GB of data in PostgreSQL, c enabled WAL archiving, it took about 10 minutes (16 threads).

what is think

the
    the
  • Definition at run time the number of task/threads — based on the amount of data and available hardware capabilities
  • the
  • determination of the amount of memory required for a buffer, which prepare the data to COPY
  • the
  • the Distribution of tables between tasks by range, and necessary tasks take a table from a threadsafe stack

Source code

Source code is available on github.
Article based on information from habrahabr.ru

Комментарии

Популярные сообщения из этого блога

When the basin is small, or it's time to choose VPS server

Performance comparison of hierarchical models, Django and PostgreSQL

From Tomsk to Silicon Valley and Back