PostgreSQL libpq connection pool
- Tutorial
For working with PostgreSQL in C ++, there is a wonderful libpq library. The library is well-documented, there is even a full translation into Russian from PostgresPRO .
When writing a server backend, I was faced with the fact that there was no connection pool in this library, and working with the database was supposed to be in fairly intensive mode and there was clearly not enough one connection. Each time, establishing a connection to send the received data would be simply insane, because connection is the longest operation, it was decided to write your own connection pool.
The idea is that at the start of the program we create several connections and store them in the queue.
When data arrives, we simply take a free connection from the queue, and if there are no free connections, wait for it to appear, use it to insert data, and then put the connection back. The idea is quite simple, quickly implemented and most importantly, the speed of work is very high.
Create a database in PostgreSQL with the name demo, a demo plate like this
We are writing a class that will be a connection to the database, the connection parameters will be written directly in the code to simplify it, in reality, of course, they must be stored in the configuration file and read from there at startup, so that when changing the server settings, you do not have to recompile the program.
To prevent possible leakage of resources, we will store the connection in a smart pointer.
In the constructor, we call the PQsetdbLogin function, which establishes a connection to the database, returning a pointer to the PGconn * connection and putting the connection into asynchronous operation mode.
Upon completion, the connection must be removed by the PQfinish function, which is passed the pointer returned by the PQsetdbLogin function. Therefore, the last parameter in the m_connection.reset () call is the address of the & PQfinish function. When the smart pointer goes out of scope and the reference counter is reset, it will call this function, thereby correctly terminating the connection.
Now we need a class that will create, store and manage the work of the connection pool.
In the createPool function, we create a pool of connections, I established 10 connections. Next, we create the PGBackend class, and work with it through the connection functions - which returns a free connection to the database, and freeConnection - which puts the connection back into the queue.
All this works on the basis of conditional variables, if the queue is empty, then there are no free connections, and the thread falls asleep until it is woken up through the conditional variable.
The simplest example, which uses our backend with a connection pool, is given in the main.cpp file. In "combat conditions" you will certainly have some kind of cycle of events, upon the occurrence of which work with the database will be carried out. I have this boost :: asio, which works asynchronously and receives events from the network, writes everything to the database. It is unnecessary to bring it here so as not to complicate the idea with the pool of connections. Here we simply create 50 threads that work with the server through one PGBackend instance.
This is compiled with the command:
Be careful with the number of database connections - this parameter is set by the max_connections (integer) parameter .
Source
When writing a server backend, I was faced with the fact that there was no connection pool in this library, and working with the database was supposed to be in fairly intensive mode and there was clearly not enough one connection. Each time, establishing a connection to send the received data would be simply insane, because connection is the longest operation, it was decided to write your own connection pool.
The idea is that at the start of the program we create several connections and store them in the queue.
When data arrives, we simply take a free connection from the queue, and if there are no free connections, wait for it to appear, use it to insert data, and then put the connection back. The idea is quite simple, quickly implemented and most importantly, the speed of work is very high.
Create a database in PostgreSQL with the name demo, a demo plate like this
the structure
-- Table: public.demo
-- DROP TABLE public.demo;
CREATE TABLE public.demo
(
id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass),
name character varying(256),
CONSTRAINT demo_pk PRIMARY KEY (id)
)
WITH (
OIDS=FALSE
);
ALTER TABLE public.demo
OWNER TO postgres;
We are writing a class that will be a connection to the database, the connection parameters will be written directly in the code to simplify it, in reality, of course, they must be stored in the configuration file and read from there at startup, so that when changing the server settings, you do not have to recompile the program.
pgconnection.h
#ifndef PGCONNECTION_H
#define PGCONNECTION_H
#include
#include
#include
class PGConnection
{
public:
PGConnection();
std::shared_ptr connection() const;
private:
void establish_connection();
std::string m_dbhost = "localhost";
int m_dbport = 5432;
std::string m_dbname = "demo";
std::string m_dbuser = "postgres";
std::string m_dbpass = "postgres";
std::shared_ptr m_connection;
};
#endif //PGCONNECTION_H pgconnection.cpp
#include "pgconnection.h"
PGConnection::PGConnection()
{
m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish );
if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 )
{
throw std::runtime_error( PQerrorMessage( m_connection.get() ) );
}
}
std::shared_ptr PGConnection::connection() const
{
return m_connection;
}
To prevent possible leakage of resources, we will store the connection in a smart pointer.
In the constructor, we call the PQsetdbLogin function, which establishes a connection to the database, returning a pointer to the PGconn * connection and putting the connection into asynchronous operation mode.
Upon completion, the connection must be removed by the PQfinish function, which is passed the pointer returned by the PQsetdbLogin function. Therefore, the last parameter in the m_connection.reset () call is the address of the & PQfinish function. When the smart pointer goes out of scope and the reference counter is reset, it will call this function, thereby correctly terminating the connection.
Now we need a class that will create, store and manage the work of the connection pool.
pgbackend.h
#ifndef PGBACKEND_H
#define PGBACKEND_H
#include
#include
#include
#include
#include
#include
#include "pgconnection.h"
class PGBackend
{
public:
PGBackend();
std::shared_ptr connection();
void freeConnection(std::shared_ptr);
private:
void createPool();
std::mutex m_mutex;
std::condition_variable m_condition;
std::queue> m_pool;
const int POOL = 10;
};
#endif //PGBACKEND_H pgbackend.cpp
#include
#include
#include
#include
#include "pgbackend.h"
PGBackend::PGBackend()
{
createPool();
}
void PGBackend::createPool()
{
std::lock_guard locker_( m_mutex );
for ( auto i = 0; i< POOL; ++i ){
m_pool.emplace ( std::make_shared() );
}
}
std::shared_ptr PGBackend::connection()
{
std::unique_lock lock_( m_mutex );
while ( m_pool.empty() ){
m_condition.wait( lock_ );
}
auto conn_ = m_pool.front();
m_pool.pop();
return conn_;
}
void PGBackend::freeConnection(std::shared_ptr conn_)
{
std::unique_lock lock_( m_mutex );
m_pool.push( conn_ );
lock_.unlock();
m_condition.notify_one();
}
In the createPool function, we create a pool of connections, I established 10 connections. Next, we create the PGBackend class, and work with it through the connection functions - which returns a free connection to the database, and freeConnection - which puts the connection back into the queue.
All this works on the basis of conditional variables, if the queue is empty, then there are no free connections, and the thread falls asleep until it is woken up through the conditional variable.
The simplest example, which uses our backend with a connection pool, is given in the main.cpp file. In "combat conditions" you will certainly have some kind of cycle of events, upon the occurrence of which work with the database will be carried out. I have this boost :: asio, which works asynchronously and receives events from the network, writes everything to the database. It is unnecessary to bring it here so as not to complicate the idea with the pool of connections. Here we simply create 50 threads that work with the server through one PGBackend instance.
main.cpp
#include
#include
#include "pgbackend.h"
void testConnection(std::shared_ptr pgbackend)
{
//получаем свободное соединение
auto conn = pgbackend->connection();
std::string demo = "SELECT max(id) FROM demo; " ;
PQsendQuery( conn->connection().get(), demo.c_str() );
while ( auto res_ = PQgetResult( conn->connection().get()) ) {
if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) {
auto ID = PQgetvalue (res_ ,0, 0);
std::cout<< ID<freeConnection(conn);
}
int main(int argc, char const *argv[])
{
auto pgbackend = std::make_shared();
std::vector> vec;
for ( size_t i = 0; i< 50 ; ++i ){
vec.push_back(std::make_shared(std::thread(testConnection, pgbackend)));
}
for(auto &i : vec) {
i.get()->join();
}
return 0;
}
This is compiled with the command:
g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthreadBe careful with the number of database connections - this parameter is set by the max_connections (integer) parameter .
Source