Scheduling server tasks using boost.task

    Recently, on the profile share one programmer asked the question: "What use MMO server for streaming?" . The programmer was inclined to Intel TBB , but not even to the basic primitives, but to custom task scheduling. Well, like TBB - well, okay. And a little later, I saw the source code of an MMO server by another programmer who recently began to correspond with it from scratch to improve the architecture. And there were a lot of bikes that were written by the programmer himself instead of using third-party components such as boost (for example, classes of the wrapper over pthread, and this is in 2010, when boost.threadalmost standard). Support for a thread pool with a task scheduler was also implemented there. This topic is very interesting to me and I started digging information about ready-made solutions for task planning (as in TBB) and found boost.task , about which I decided to write.


    Definition


    A task is a logically integrated set of actions. The task scheduler asynchronously executes tasks guided by certain strategies for choosing who should be executed at a given moment in which thread.
    Tasks allow you to abstract from ordinary flows and operate at a higher level.

    Why do I need a task scheduler?


    How does a spherical server work in a vacuum? Very simple:
    1. A request comes from the client
    2. He is being processed!
    3. Reply sent

    Well, in addition, some processes may occur in the server that are executed without a client’s request. For example, sending notifications to the entire user database, cleaning the database of obsolete data (bracket), processing daily statistics, etc.
    Now the catch is exactly how the request is processed. We need to figure out how to handle it.
    Take, for example, a memcached-like server: we have hash_map with data, there are read requests, there are write requests that make a simple lookup on the hash map and return data or write them to the hash map. While everything happens in one thread, but what if we need to use all the processors of the system?
    We create as many threads as there are cores. In each thread, we process users who, when creating a connection, are scattered according to the principleround-robin . When accessing the container, we use rwlock (boost :: shared_mutex). Excellent. But what about the removal of items from the container? We create a stream that wakes up every N seconds and cleans the container.
    This was a simple example, and now a more complex example: a service that can, depending on the user's request, make a request to the database, make an http request to some website. What will happen if the server is made according to the previous model (all requests to other components will be executed synchronously)? Well, the database is on the same platform as the server, the answer will be in the aisles of a couple of milliseconds. Sending email is also not a problem - we put sendmail on the same machine, we give it the data, and he himself will figure out how to send the letter.
    Excellent. Although not really. What to do with the http request? It can take a very long time - it all depends on the site which is somewhere far away and it is not known how long the request will be processed. In this case, the thread will be inactive, although there are many requests in the queue that can be executed, but they wait until this thread is freed.
    Such a request must be performed asynchronously. You can implement it like this:
    class LongRequestHandler
    {
    public:
        void Handle()
        {
            // read client request parameters
            // mysql request 1
            // mysql request 2
            HttpRequestExecutor::GetInstance()->Execute(
                "example.com?x=1",
                boost::bind(this, &LongRequestHandler::HandleStage2)
            );
        }
        void HandleStage2(const std::string & http_request_result)
        {
            // mysql request 3
            // write response to client
        }
    };

    * This source code was highlighted with Source Code Highlighter.

    The HttpRequestExecutor accepts the request url and the callback that must be called upon completion of the request (the callback type is boost :: function).
    And this approach works, though not very beautiful.
    The Thinking Asynchronously in C ++ blog shows an interesting implementation of performing asynchronous tasks. The result is as follows:
    template void async_echo(
     tcp::socket& socket,
     mutable_buffer working_buffer,
     Handler handler,
     // coroutine state:
     coroutine coro = coroutine(),
     error_code ec = error_code(),
     size_t length = 0)
    {
     reenter (coro)
     {
     entry:
      while (!ec)
      {
       yield socket.async_read_some(
         buffer(working_buffer),
         bind(&async_echo,
          ref(socket), working_buffer,
          box(handler), coro, _1, _2));
       if (ec) break;
       yield async_write(socket,
         buffer(working_buffer, length),
         bind(&async_echo,
          ref(socket), working_buffer,
          box(handler), coro, _1, _2));
      }
      handler(ec);
     }
    }

    * This source code was highlighted with Source Code Highlighter.
    Coroutine and yield in C ++ look unusual;) This is implemented on defines , in the blog you can read how the author managed to.
    Gradually, the logic gets complicated, new elements are added that need to be processed asynchronously, the implementation is also complicated. Further task
    mysql request 1
    mysql request 2
    http request 1
    mysql request 3
    http request 2
    mysql request 4
    mysql request 5
    

    And executing it sequentially with stops in http requests, we see that requests
    mysql request 2
    http request 1
    

    and
    mysql request 3
    http request 2
    mysql request 4
    

    can be done in parallel and if we want to do this, we will have to complicate the logic even more. I would like to write simple code, for example:
    mysql request 1
    x = run (func1)
    y = run (func2)
    wait (x, y)
    mysql request 5
    func1:
      mysql request 2
      http request 1
    func2:
      mysql request 3
      http request 2
      mysql request 4
    

    This is where the task scheduler comes in handy.

    Implementations


    You can read about task scheduler support in the new 0x standard here .

    I liked boost.task the most. Further its detailed consideration.

    Description of boost.task


    boost.task - implementation of the proposal in the C ++ 0x standard . It supports setting task execution strategies, creating sub-tasks, and interrupting tasks.
    The library depends on:

    boost.task and boost.fiber compiled libraries (boost.atomic and boost.move - header-only) - so you have to collect them. To make it more convenient to experiment, I collected all the dependencies in one place, seasoned with cmake and uploaded the project to github . It works on linux, for building under windows - it will take 2-3 lines to add files to cmake.

    Usage example


    The library API is quite simple; implementing the request handler described above will not be difficult. I will bring it again:
    mysql request 1
      mysql request 2
      http request 1
      mysql request 3
      http request 2
      mysql request 4
    mysql request 5
    

    An ordinary sleep for a random time will be used as an emulation of a query to mysql: An asynchronous timer from boost :: asio will be used as an external http request. So: Request is the request class.
    boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10));
    





    class Request
    {
    public:
      Request(const std::string & data);
      const std::string & Read() const;
      void Write(const std::string & answer);
    };

    * This source code was highlighted with Source Code Highlighter.

    And RequestHandler is the class of the request handler.
    class RequestHandler
    {
    public:
      RequestHandler(boost::asio::io_service & io_service, const RequestPtr & request);
      void Process() const;
    };

    * This source code was highlighted with Source Code Highlighter.

    io_service - passed so that you can make an external call (use the boost :: asio :: deadline_timer timer). So let's start. We define a pool of threads to handle our tasks:
    boost::tasks::static_pool< boost::tasks::unbounded_fifo > pool( boost::tasks::poolsize( 5) );

    * This source code was highlighted with Source Code Highlighter.

    boost.task supports two main types of task scheduling strategies:
    • limited (bounded) - have a threshold for the number of tasks performed, upon reaching which the addition of a new task blocks the thread that performs this action. The main task is to avoid resource exhaustion when the speed of adding tasks exceeds the speed of their execution
    • unbounded - allow you to add an infinite number of tasks to the queue

    It is also possible to set a strategy for processing tasks within the queue:
    • fifo - the first task added is performed first
    • priority - the task has priority, tasks with the highest priority are selected for execution
    • smart - a queue of this type can be strongly customized by passing parameters to the template. by default, it is possible to index tasks by any key and replace the old task with a new one, if it exists

    Accordingly, the described line of code creates a pool of 5 threads with an unlimited queue of type fifo.
    Now we need to create io_service and a pool of 3 threads to handle external requests.
    boost::asio::io_service io_service;

    * This source code was highlighted with Source Code Highlighter.

    If you call io_service :: run at a time when there are no tasks in it, the method ends immediately, and for normal operation we need working threads. Usually this is achieved by the fact that io_service has an acceptor of the port to which clients connect, and in this case, you can take io_service while waiting for the timer to execute:
    boost::asio::deadline_timer dummy_timer(io_service);
    dummy_timer.expires_from_now(boost::posix_time::seconds(10));
    // void dummy_handler(const boost::system::error_code&) {}
    dummy_timer.async_wait(&dummy_handler);

    * This source code was highlighted with Source Code Highlighter.

    After that, you can create a thread pool:
    boost::thread_group io_service_thread_pool;
    for(int i = 0; i < 3; ++i)
      io_service_thread_pool.create_thread(
        boost::bind(&boost::asio::io_service::run, &io_service)
      );

    * This source code was highlighted with Source Code Highlighter.
    Next, create a request:
    RequestPtr request(new Request("some data"));
    RequestHandlerPtr handler(new RequestHandler(io_service, request));

    * This source code was highlighted with Source Code Highlighter.

    Everything is ready, you can perform the task:
    boost::tasks::handle< void > request_processing(
      boost::tasks::async(
        boost::tasks::make_task( &RequestHandler::Process, handler ),
        pool));

    * This source code was highlighted with Source Code Highlighter.
    boost :: tasks :: make_task (& RequestHandler :: Process, handler) - creates a task to call Process on the handler object that can be executed. boost :: tasks :: async initiates asynchronous task execution. boost :: tasks :: handle an object by which you can track the completion status of a task, get the result if it is.
    boost :: tasks :: async supports 4 task execution algorithms:
    • own_thread - synchronous execution in the same thread
    • new_thread - a thread is created for the task in which it will be executed, after which the thread will be completed
    • as_sub_task - if the current task is running in the pool - adds a new task to it, otherwise it creates a new thread, like new_thread. This is the default behavior.
    • static_pool - execute a task in a thread pool

    Next, wait until the task is completed:
    request_processing.wait();

    * This source code was highlighted with Source Code Highlighter.

    And stop io_service:
    io_service.stop();
    io_service_thread_pool.join_all();

    * This source code was highlighted with Source Code Highlighter.

    The function Process was surprisingly very simple.
    void Subtask1() const
    {
      Request("query2");
      ExternalRequest("extquery1");
    }

    void Subtask2() const
    {
      Request("query3");
      ExternalRequest("extquery2");
      Request("query4");
    }

    void Process() const
    {
      std::string data = request_->Read();

      Request("query1");

      boost::tasks::handle< void > subtask1(
        boost::tasks::async(
          boost::tasks::make_task( &RequestHandler::Subtask1, this )));
      boost::tasks::handle< void > subtask2(
        boost::tasks::async(
          boost::tasks::make_task( &RequestHandler::Subtask2, this )));

      boost::tasks::waitfor_all( subtask1, subtask2);

      Request("query5");

      request_->Write("some answer");
    }

    * This source code was highlighted with Source Code Highlighter.

    Sub-tasks are performed using boost :: tasks :: async without specifying a policy to run and the as_sub_task algorithm is automatically selected that will execute tasks in the same thread pool as the parent task. The implementation of subtask functions is also trivial.
    RequestHandler :: Request - calls boost :: this_thread :: sleep, and with ExternalRequest it's a little more complicated:
    void ExternalRequest(const std::string & what) const
    {
      ExternalRequestHandler external_handler(io_service_);
      boost::tasks::spin::auto_reset_event ev;
      external_handler.PerformExternalReqeust(what, &ev);
      ev.wait();
    }

    * This source code was highlighted with Source Code Highlighter.
    A handler is created, as well as an event with automatic reset - boost :: tasks :: spin :: auto_reset_event. This event is passed to the external request handler and upon its completion ev.set () will be called, and until ev.wait () blocks the task.
    In contrast to regular threads and synchronization primitives (boost :: condition) ev.wait () does not block the stream, but blocks the task (calls this_task :: yield () in the loop). This means that the processor resources will be used by other tasks.
    The entire file can be found here .

    conclusions


    boost.task is a very convenient library for scheduling tasks. It allows you to see how support for asynchronous code execution will look in the new C ++ 0x standard, and it can be used right now without waiting for the standard to be released.
    Code using boost.task becomes smaller and much more understandable than with normal use of threads.
    There are certainly shortcomings: the code is not yet optimized, which can cause problems in rare cases; the library is not yet accepted in boost (along with its dependencies).

    What to read on the topic?



    Also popular now: