Multithreaded SOCKS 4 server on Qt

    From time to time, questions related to programming network applications appear on the Runet forums in the Qt branch. One of the problems that torment these people is the approach to organizing the server. Three approaches are usually listed:
    • single-threaded asynchronous;
    • multithreaded, create downstream on the connection;
    • multithreaded, with thread pool on QThreadPool and QRunnable .

    When you talk about a fixed number of worker threads with their own event loop, they ask for an example. The following is an example of a server with a pool of threads, each of which has its own event processing loop.

    Characters:
    • the Server class , which extends connections and distributes tasks to workers;
    • the Worker class , instances of which will create instances of the Client class in workflows;
    • Client class encapsulating client requests and implementing SOCKS 4

    The simplest of these Trinity Worker, it inherits from QObject and implements only one function of creating a client and purely for the "correct" use of threads :
    class Worker: public QObject
    {
    Q_OBJECT
    public:
        Q_INVOKABLE void addClient(qintptr socketDescriptor);
    };
    

    void Worker::addClient(qintptr socketDescriptor)
    {
        new Client(socketDescriptor, this);
    }
    

    The server is just as simple:
    class Server: public QTcpServer
    {
    Q_OBJECT
    public:
        Server(size_t threads = 4, QObject * parent = nullptr);
        ~Server();
    protected:
        virtual void incomingConnection(qintptr socketDescriptor);
    private:
        void initThreads();
    private:
        size_t m_threadCount;
        QVector m_threads;
        QVector m_workers;
        size_t m_rrcounter;
    };
    

    Server::Server(size_t threads, QObject * parent) :
            QTcpServer(parent),
            m_threadCount(threads),
            m_rrcounter(0)
    {
        initThreads();
    }
    Server::~Server()
    {
        for(QThread* thread: m_threads)
        {
            thread->quit();
            thread->wait();
        }
    }
    void Server::initThreads()
    {
        for (size_t i = 0; i < m_threadCount; ++i)
        {
            QThread* thread = new QThread(this);
            Worker* worker = new Worker();
            worker->moveToThread(thread);
            connect(thread, &QThread::finished,
                    worker, &QObject::deleteLater);
            m_threads.push_back(thread);
            m_workers.push_back(worker);
            thread->start();
        }
    }
    void Server::incomingConnection(qintptr socketDescriptor)
    {
        Worker* worker = m_workers[m_rrcounter % m_threadCount];
        ++m_rrcounter;
        QMetaObject::invokeMethod(worker, "addClient",
                Qt::QueuedConnection,
                Q_ARG(qintptr, socketDescriptor));
    }
    

    It creates threads, workers in the designer, and moves workers to threads. He transfers each new connection to the worker. He chooses the worker "by the post", that is, by Round-robin .

    SOCKS 4 is a very simple protocol, you only need:
    1. read IP address, port number;
    2. establish a connection with the "world";
    3. send a message to the client that the request is confirmed;
    4. forward data from one socket to another until someone closes the connection.

    class Client: public QObject
    {
    Q_OBJECT
    public:
        Client(qintptr socketDescriptor, QObject* parent = 0);
    public slots:
        void onRequest();
        void client2world();
        void world2client();
        void sendSocksAnsver();
        void onClientDisconnected();
        void onWorldDisconnected();
    private:
        void done();
    private:
        QTcpSocket m_client;
        QTcpSocket m_world;
    };
    

    namespace
    {
    #pragma pack(push, 1)
        struct socks4request
        {
            uint8_t version;
            uint8_t command;
            uint16_t port;
            uint32_t address;
            uint8_t end;
        };
        struct socks4ansver
        {
            uint8_t empty = 0;
            uint8_t status;
            uint16_t field1 = 0;
            uint32_t field2 = 0;
        };
    #pragma pack(pop)
        enum SocksStatus
        {
            Granted = 0x5a,
            Failed = 0x5b,
            Failed_no_identd = 0x5c,
            Failed_bad_user_id = 0x5d
        };
    }
    Client::Client(qintptr socketDescriptor, QObject* parent) :
            QObject(parent)
    {
        m_client.setSocketDescriptor(socketDescriptor);
        connect(&m_client, &QTcpSocket::readyRead,
                this, &Client::onRequest);
        connect(&m_client,&QTcpSocket::disconnected,
                this, &Client::onClientDisconnected);
        connect(&m_world, &QTcpSocket::connected,
                this, &Client::sendSocksAnsver);
        connect(&m_world, &QTcpSocket::readyRead,
                this, &Client::world2client);
        connect(&m_world,&QTcpSocket::disconnected,
                this, &Client::onWorldDisconnected);
    }
    void Client::onRequest()
    {
        QByteArray request = m_client.readAll();
        socks4request* header = reinterpret_cast(request.data());
    #if Q_BYTE_ORDER == Q_LITTLE_ENDIAN
        const QHostAddress address(qFromBigEndian(header->address));
    #else
        const QHostAddress address(header->address);
    #endif
    #if Q_BYTE_ORDER == Q_LITTLE_ENDIAN
        const uint16_t port = qFromBigEndian(header->port);
    #else
        const uint16_t port = header->port;
    #endif
        //qDebug()<<"connection:"<(&ans), sizeof(ans));
        m_client.flush();
    }
    void Client::client2world()
    {
        m_world.write(m_client.readAll());
    }
    void Client::world2client()
    {
        m_client.write(m_world.readAll());
    }
    void Client::onClientDisconnected()
    {
        m_world.flush();
        done();
    }
    void Client::onWorldDisconnected()
    {
        m_client.flush();
        done();
    }
    void Client::done()
    {
        m_client.close();
        m_world.close();
        deleteLater();
    }
    

    Epoll and Qt

    A cry is like a thunder:
    - Give people a rum
    You need to
    drink a rum for any People!


    If we compile the previous code and run it through strace -f , we will see poll calls. Surely there is someone who will say his weighty “fi”, they say with epoll will be “well, finally rocket”.

    Qt has a QAbstractEventDispatcher class that allows you to define your own event dispatcher. Naturally, there were kind people who made and laid out dispatchers with different backends. Here is a short list of them:

    When using your dispatcher in main.cpp, we prescribe
    QCoreApplication::setEventDispatcher(new QEventDispatcherEpoll);
    QCoreApplication app(argc, argv)
    

    and the initThreads method on the server becomes:
    void Server::initThreads()
    {
        for (size_t i = 0; i < m_threadCount; ++i)
        {
            QThread* thread = new QThread(this);
            thread->setEventDispatcher(new QEventDispatcherEpoll);
            Worker* worker = new Worker();
            worker->moveToThread(thread);
            connect(thread, &QThread::finished,
                    worker, &QObject::deleteLater);
            m_threads.push_back(thread);
            m_workers.push_back(worker);
            thread->start();
        }
    }
    

    And if we run strace again, we will see the treasured function calls with the epoll_ prefix .

    conclusions

    The conclusions are purely pragmatic.

    If you are an application programmer and you do not have tasks from the category of "big" data or highload according to Bunin, then write on what you want and how you can. The task of an application programmer is to produce a product of a certain quality, spending a certain amount of resources. Otherwise, epoll sockets alone are not enough.

    PS

    Source codes are available on GitHub .

    Also popular now: