PostgreSQL libpq connection pool

  • Tutorial
For working with PostgreSQL in C ++, there is a wonderful libpq library. The library is well-documented, there is even a full translation into Russian from PostgresPRO .

When writing a server backend, I was faced with the fact that there was no connection pool in this library, and working with the database was supposed to be in fairly intensive mode and there was clearly not enough one connection. Each time, establishing a connection to send the received data would be simply insane, because connection is the longest operation, it was decided to write your own connection pool.

The idea is that at the start of the program we create several connections and store them in the queue.

When data arrives, we simply take a free connection from the queue, and if there are no free connections, wait for it to appear, use it to insert data, and then put the connection back. The idea is quite simple, quickly implemented and most importantly, the speed of work is very high.

Create a database in PostgreSQL with the name demo, a demo plate like this

the structure
-- Table: public.demo
-- DROP TABLE public.demo;
CREATE TABLE public.demo
(
  id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass),
  name character varying(256),
  CONSTRAINT demo_pk PRIMARY KEY (id)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE public.demo
  OWNER TO postgres;


We are writing a class that will be a connection to the database, the connection parameters will be written directly in the code to simplify it, in reality, of course, they must be stored in the configuration file and read from there at startup, so that when changing the server settings, you do not have to recompile the program.

pgconnection.h
#ifndef PGCONNECTION_H
#define PGCONNECTION_H
#include 
#include 
#include 
class PGConnection
{
public:
    PGConnection();
    std::shared_ptr connection() const;
private:
    void establish_connection();
    std::string m_dbhost = "localhost";
    int         m_dbport = 5432;
    std::string m_dbname = "demo";
    std::string m_dbuser = "postgres";
    std::string m_dbpass = "postgres";
    std::shared_ptr  m_connection;
};
#endif //PGCONNECTION_H

pgconnection.cpp
#include "pgconnection.h"
PGConnection::PGConnection()
{
    m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish );
    if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 )
    {
       throw std::runtime_error( PQerrorMessage( m_connection.get() ) );
    }
}
std::shared_ptr PGConnection::connection() const
{
    return m_connection;
}


To prevent possible leakage of resources, we will store the connection in a smart pointer.

In the constructor, we call the PQsetdbLogin function, which establishes a connection to the database, returning a pointer to the PGconn * connection and putting the connection into asynchronous operation mode.

Upon completion, the connection must be removed by the PQfinish function, which is passed the pointer returned by the PQsetdbLogin function. Therefore, the last parameter in the m_connection.reset () call is the address of the & PQfinish function. When the smart pointer goes out of scope and the reference counter is reset, it will call this function, thereby correctly terminating the connection.

Now we need a class that will create, store and manage the work of the connection pool.

pgbackend.h

#ifndef PGBACKEND_H
#define PGBACKEND_H
#include 
#include 
#include 
#include 
#include 
#include 
#include "pgconnection.h"
class PGBackend
{
public:
    PGBackend();
    std::shared_ptr connection();
    void freeConnection(std::shared_ptr);
private:
    void createPool();
    std::mutex m_mutex;
    std::condition_variable m_condition;
    std::queue> m_pool;
    const int POOL = 10;
};
#endif //PGBACKEND_H


pgbackend.cpp

#include 
#include 
#include 
#include 
#include "pgbackend.h"
PGBackend::PGBackend()
{
    createPool();
}
void PGBackend::createPool()
{
    std::lock_guard locker_( m_mutex );
    for ( auto i = 0; i< POOL; ++i ){
         m_pool.emplace ( std::make_shared() );
    }
}
std::shared_ptr PGBackend::connection()
{
    std::unique_lock lock_( m_mutex );
    while ( m_pool.empty() ){
            m_condition.wait( lock_ );
    }
    auto conn_ = m_pool.front();
    m_pool.pop();
    return  conn_;
}
void PGBackend::freeConnection(std::shared_ptr conn_)
{
    std::unique_lock lock_( m_mutex );
    m_pool.push( conn_ );
    lock_.unlock();
    m_condition.notify_one();
}


In the createPool function, we create a pool of connections, I established 10 connections. Next, we create the PGBackend class, and work with it through the connection functions - which returns a free connection to the database, and freeConnection - which puts the connection back into the queue.

All this works on the basis of conditional variables, if the queue is empty, then there are no free connections, and the thread falls asleep until it is woken up through the conditional variable.

The simplest example, which uses our backend with a connection pool, is given in the main.cpp file. In "combat conditions" you will certainly have some kind of cycle of events, upon the occurrence of which work with the database will be carried out. I have this boost :: asio, which works asynchronously and receives events from the network, writes everything to the database. It is unnecessary to bring it here so as not to complicate the idea with the pool of connections. Here we simply create 50 threads that work with the server through one PGBackend instance.

main.cpp

#include 
#include 
#include "pgbackend.h"
void testConnection(std::shared_ptr pgbackend)
{
//получаем свободное соединение
    auto conn = pgbackend->connection();
    std::string demo = "SELECT max(id) FROM demo; " ;
    PQsendQuery( conn->connection().get(), demo.c_str() );
    while ( auto res_ = PQgetResult( conn->connection().get()) ) {
        if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) {
            auto ID = PQgetvalue (res_ ,0, 0);
            std::cout<< ID<freeConnection(conn);
}
int main(int argc, char const *argv[])
{
	auto pgbackend = std::make_shared();
    std::vector> vec;
    for ( size_t i = 0; i< 50 ; ++i ){
        vec.push_back(std::make_shared(std::thread(testConnection, pgbackend)));
    }
    for(auto &i : vec) {
        i.get()->join();
    }
    return 0;
}


This is compiled with the command:

g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthread

Be careful with the number of database connections - this parameter is set by the max_connections (integer) parameter .

Source

Also popular now: