A bit about multi-threaded programming. Part 1. Syncing evil or not

    At work, I often have to deal with highly loaded multithreaded or multiprocess services (application-, web-, index-server).
    Interesting enough, but sometimes ungrateful work is to optimize this whole economy.
    The growing needs of customers often rest on the inability to simply replace the iron component of the system with a more modern one, because computer performance, read / write speeds of hard drives and networks grow much slower than client requests.
    It rarely helps to increase the number of cluster nodes (the system is usually distributed).
    More often, you have to run the profiler, look for bottlenecks, climb into the source code and correct the mistakes that colleagues left, and sometimes even yourself, which is a sin, many years ago.
    Some of the problems associated with synchronization, I will try to pose here. This will not be an introductory course on multi-threaded programming - it is assumed that the reader is familiar with the concepts of thread and context switch, and knows what mutex, semaphore, etc. are for.

    It is clear to any developer who is multi-threaded in designing something larger than “Hello world” that creating completely asynchronous code is incredibly difficult - you need to write something in a common channel, change the structure in memory (for example, rotate the hash table tree), pick up what from the queue, etc.
    By synchronizing such access, we restrict the simultaneous execution of some critical sections of code. Typically this is one, rarely multiple threads (e.g. 1 writer / N readers).
    The need for synchronization is undeniable. Excessive synchronization is very harmful - a piece of the program works more or less smartly on 2 or 3 threads, for almost 5 threads it can be almost “singlethreaded”, and on 20 even it goes to bed with very good hardware.

    However, practice shows that sometimes insufficient synchronization of execution leads to the same result - the system sticks. This happens when the code executed in parallel contains, for example, access to the HDD (continuous seek), or when multiple accesses to various large chunks of memory (for example, permanent cache reset with context switch - the CPU cache just falls off stupidly).

    Use semaphores

    Семафоры изобрели не только для того, что бы на них строить конструкции вида ReadWriteMutex. Семафоры можно и нужно использовать для уменьшения нагрузки на железо на куске кода, исполняемого параллельно.
    Как правило таким образом можно вылечить множество «залипаний», которые легко найти профилированием кода — когда при увеличении количества потоков, время исполнения отдельных функций заметно растет, при том, что другие функции отрабатывают с той же или сравнимой скоростью.
    Развернуть Profiler-Output
    ========================================================================================================================
    Run # 1 (5 Threads)
      rpcsd (hbgsrv0189, PID:0718, TID:2648)
      # 03-09-2012 | 13:50:45 | Servlet: A::RpcsServlet, URI: /index-search
    ========================================================================================================================
                                 NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
    ========================================================================================================================
                     ::RPC::Service | service                |  1 |  1 |  1.593 |  1.593 |  1.593 |  1.593 |  1.593 |  1.593
                   ::A::RpcsServlet | service                |  1 |  1 |  1.592 |  1.592 |  1.592 |  1.592 |  1.592 |  1.592
                      ::IndexSrvRpc | index-search           |  1 |  1 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584
                  ::Indexer::Search | Search                 |  1 |  1 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584
                  ::Indexer::Search | ParallelSearch         |  2 |  2 |  1.256 |  1.256 |  0.628 |  0.628 |  0.655 |  0.601
           ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  0.686 |  0.686 |  0.016 |  0.016 |  0.016 |  0.015
                  ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  0.570 |  0.570 |  0.028 |  0.028 |  0.031 |  0.020
           ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  0.276 |  0.276 |  0.014 |  0.014 |  0.016 |  0.002
                  ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.203 |  0.203 |  0.203 |  0.016 |  0.203 |  0.016
                  ::Indexer::Search | MergeJoin              |  1 |  1 |  0.125 |  0.125 |  0.125 |  0.125 |  0.125 |  0.125
    ========================================================================================================================
    Run # 2 (25 Threads w/o semaphore)
      rpcsd (hbgsrv0189, PID:0718, TID:2648)
      # 03-09-2012 | 13:52:03 | Servlet: A::RpcsServlet, URI: /index-search
    ========================================================================================================================
                                 NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
    ========================================================================================================================
                     ::RPC::Service | service                |  1 |  1 |  4.255 |  4.255 |  4.255 |  4.255 |  4.255 |  4.255
                   ::A::RpcsServlet | service                |  1 |  1 |  4.254 |  4.254 |  4.254 |  4.254 |  4.254 |  4.254
                      ::IndexSrvRpc | index-search           |  1 |  1 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244
                  ::Indexer::Search | Search                 |  1 |  1 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244
                  ::Indexer::Search | ParallelSearch         |  2 |  2 |  3.729 |  3.729 |  1.865 |  1.865 |  1.889 |  1.840
                  ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  2.497 |  2.497 |  0.125 |  0.125 |  0.126 |  0.125
           ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  2.188 |  2.188 |  0.109 |  0.109 |  0.113 |  0.109
           ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  1.231 |  1.231 |  0.028 |  0.028 |  0.031 |  0.015
                  ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.360 |  0.360 |  0.360 |  0.028 |  0.360 |  0.016
                  ::Indexer::Search | MergeJoin              |  1 |  1 |  0.155 |  0.155 |  0.155 |  0.155 |  0.155 |  0.155
    ========================================================================================================================
    Run # 3 (25 Threads with semaphore in InvalidateCacheIdx, before InvalidateIdx)
      rpcsd (hbgsrv0189, PID:0718, TID:2648)
      # 03-09-2012 | 14:02:51 | Servlet: A::RpcsServlet, URI: /index-search
    ========================================================================================================================
                                 NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
    ========================================================================================================================
                     ::RPC::Service | service                |  1 |  1 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213
                   ::A::RpcsServlet | service                |  1 |  1 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213
                      ::IndexSrvRpc | index-search           |  1 |  1 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205
                  ::Indexer::Search | Search                 |  1 |  1 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205
                  ::Indexer::Search | ParallelSearch         |  2 |  2 |  1.690 |  1.690 |  0.845 |  0.845 |  0.889 |  0.801
           ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  1.153 |  1.153 |  0.026 |  0.026 |  0.031 |  0.016
                  ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  0.537 |  0.537 |  0.027 |  0.027 |  0.031 |  0.007
                  ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.359 |  0.359 |  0.359 |  0.028 |  0.359 |  0.017
           ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  0.278 |  0.278 |  0.014 |  0.014 |  0.016 |  0.004
                  ::Indexer::Search | MergeJoin              |  1 |  1 |  0.156 |  0.156 |  0.156 |  0.156 |  0.156 |  0.156
    


    В третьей выдаче профайлера можно видеть, как изменилось время исполнения метода InvalidateIdx и соответственно метода InvalidateCacheIdx, после окружения семафором invCI_semaphore вызова метода InvalidateIdx
    semaphore invCI_semaphore(config.InvCI_Count/* = 5*/);
    ...
    int InvalidateCacheIdx() {
      ...
      while (...) {
        cache.SearchL2Index();
        invCI_semaphore++;
        while (cache.InvalidateIdx()) {};
        invCI_semaphore--;
      }
      ...
    }
    

    This method of using semaphores is quite simple and does not necessarily require a complete understanding of the process, but it has many drawbacks, including the fact that the maximum number of threads for each block will most likely be selected in battle (in production, on the client system) - which is not always eat well. But the huge advantage of this optimization method is the ability to quickly increase the number of threads of the entire service, without changing the execution plan, i.e. almost without altering the entire engine - just putting a few semaphores to the previous value in bottlenecks. I am not a supporter of using semaphores thoughtlessly, but as a temporary solution (to reassure the client), I used this method more than once to calmly redo “correctly” later, delving into the source code.

    Prioritize

    Priorities are a very convenient mechanism, which also makes it quite easy to "lighten" the application. For example, if the system logs are written in a separate stream, then reducing its priority to the minimum, you can greatly “facilitate” the process without decreasing the log-level.
    For example, a design of the following type can be used if a pool with many threads processes tasks of different priorities:
    // before doing ...
    if ( thisThread.pool.count() > 1 
      && !(currentTaskType in (asap, immediately, now)) 
    ) {
      thisThread.priority = 2 * thisThread.pool.priority;
    } else {
      thisThread.priority = 5 * thisThread.pool.priority;
    }
    // do current task ...
    

    At the same time, you need to understand that the priority of the stream is valid for the whole process, and not just for the pool in which this stream exists - use it with caution.

    Divide et impera (Divide and Conquer)

    Quite often, instant execution of a piece of code is not required - i.e. some action or part of the task can be postponed. For example, write logs, count visits, reindex the cache, etc.
    You can significantly increase the speed of execution by highlighting pieces of synchronous code into separate tasks, and then executing them later (for example, using the so-called background service). It can be a separate thread, a thread pool, or even another aka RPC process (for example, an asynchronous WebService call). Naturally, the time cost of the call (queuing, etc.) of this task should be less than the cost of the execution itself.
    Example with a separate LOG stream:
    // здесь мы пишем лог напрямую :
    int log(int level, ...) {
      if (level >= level2log) {
        logMutex.lock();
        try {
          file.write(...);
          file.flush();
        } finally {
          logMutex.release();
        }
      }
    }
    


    // здесь - фоново :
    int log(int level, ...) {
      if (level >= level2log) {
        // защитить, добавить и освободить очередь :
        logQueue.mutex.lock();
        logQueue.add(currentThread.id, ...);
        logQueue.mutex.release();
        // разбудить лог-worker'а :
        logQueue.threadEvent.pulse();
      }
    }
    // background-logging thread:
    int logThreadProc() {
      ...
      while (true) {
        // делаем задержку - ожидаем латенц /* 500 ms */ или размер очереди /* 10 */:
        if ( logQueue.count < config.LogMaxCount /* = 10 */
          || (sleepTime = currentTime - lastTime) < config.LogLatency /* = 500 */) 
        {
          logQueue.threadEvent.wait(config.LogLatency - sleepTime);
          continue;
        };
        // пишем в буфер и удаляем из очереди :
        logQueue.mutex.lock();
        try {
          foreach (... in logQueue) {
            file.write(...);
            logQueue.delete(...);
          }
        } finally {
          logQueue.mutex.release();
        }
        // пишем буфер в лог:
        file.flush();
        // спать :
        logQueue.threadEvent.wait();
        lastTime = currentTime;
      }
      ...
    }
    

    Such a simple design can significantly reduce the cost of logging and reduce the consequences of the context switch, which will practically not depend on the number of threads using the method log.
    It is important to understand that now, by hanging additional logic on logging, only the stream directly writing to the log is loaded. Those. you can make our log more intelligent as much as you like - introduce the concept of LogLatency, as an example, add some kind of log analyzer (something like fail2ban) or save for example all debug messages in order to log them only in case of an error, group by TID , etc. - All this will practically not load the rest of the threads.
    In addition, when using the first method (the message is written synchronously directly to the log file), the threads are so-called “parallelized”. Those. the more synchronization objects (mutex, critical section, waiting events) and the higher the costs of context switch, the more likely it is that all threads passing through these objects will be executed sequentially.
    Those. the speed of multi-threaded execution of a task approaches or becomes even worse than the speed of its single-threaded execution. Reducing the time between lock () and release (), the code improves in two directions at once - it becomes faster in the thread itself and the likelihood of "parallelization" of the process decreases.
    Having organized the queue of events, sometimes it is possible to create similar constructions without even resorting to the creation of additional flows. For example, to queue some actions so that later, for example, during "idle time", execute them in the same thread, one after another.
    This can easily be illustrated on TCL:
    ## отдать страницу / документ ...
    ...
    ## показать counter :
    set counter [db onecolumn {select cntr from accesslog where userid = $userid}]
    %>
    Вы видели эту страницу <%= $counter %> раз...
    <%
    ## добавить событие "писать access log" in background, когда будет выполнено "update idle":
    after idle UpdateAccess $userid [clock seconds]
    ## завершить.
    ....
    ## где-то в коде приложения :
    proc UpdateAccess {userid lasttime} {
      db exec {update accesslog set cntr = cntr + 1, lastaccess = $lasttime where userid = $userid}
    }
    


    Queues, FIFO, LIFO, and Multithreading

    Organizing a queue, a data pool, or a serial buffer is not a tricky thing, but you need to keep in mind that with multithreading and other things being equal, the LIFO queue should be made number one choice (of course, if the sequence of actions is not important). Sometimes you can combine or group LIFO and FIFO (make LIFO elements small FIFO queues or for example build a buffer from the end, etc.). The meaning of such distortions lies in the processor cache and partly in the virtual organization of memory. Those. the probability that the last elements from LIFO are still in the processor cache is incomparably higher than the probability of the same for FIFOs of the same length.

    Life example - In our own memory manager, a hash table was organized from pools of free objects of the same size (who very often called malloc/freeknows why this is done :). The pools were organized according to the FIFO principle - the function mymallocreturned the first myfreeelement , long ago put into the pool by the function . The reason that prompted the developer to use FIFO then is simple to the point of banality - if some unscrupulous “programmer” uses the object after some time myfree, the program will probably work longer. After replacing it with LIFO, the entire arsenal (application server) that actively uses the memory manager earned about 30% faster.

    ReadWriteMutex

    Very often, synchronization is only necessary if the object changes. For example, when writing to a shared file, when changing the structure of lists or hash tables, etc. At the same time, as a rule, this is allowed only to one thread, and often even reading threads are blocked (to exclude dirty read and program crashes with an exception, since the entries until the end of the change are not entirely valid).
    It is more correct to lock such objects using RW-mutex, where the reading streams do not block each other, and only when the record is locked, the code is fully synchronized (executed by one thread).
    When using read / write-mutex, you must always have an accurate idea of ​​how the object is read, since in some cases, even when reading, the object can change (for example, when building an internal cache during initialization or reinitialization after writing). In this case, the ideal API provides a callback to block, or blocks on its own in case of multithreading, or the possible use of RW-mutex, with all exceptions, is described in more detail in the API documentation. In some RW-mutex implementations, you need to know in advance (tell mutex) the number of reader threads, sometimes writer threads. This is due to the specific implementation of write locks (semaphores are usually used). Despite these and other limitations, with multiple reader streams,

    Read the documentation, read the source code

    The problem of ignorance, sometimes misunderstanding, of what is hidden behind a particular class or object is especially critical when used in a multi-threaded application. This is especially true for basic synchronization objects. I will try to clarify what I mean by the example of improper use of RW-mutex.
    One colleague of mine once used the fair RW-mutex, built on semaphores. He was too lazy to dynamically transfer the number of reader streams to the RWMutex class (he set the statically "maximum possible" value to 500) and wrote the following code for the writer stream:
    ...
    RWMutex mtx(500);
    ...
    mtx.lockWrite();
    hashTab.add(...);
    mtx.releaseWrite();
    ...
    

    And with a good load, the server went into deep binge and went into hibernation. The thing is that he made two mistakes - taking a static value of 500 and did not understand how such an RW-mutex will behave on this particular platform. Because RW-mutex was made fair - code was used similar to the following:
    void RWMutex::lockWrite() {
      writeMutex.lock();
      for (register int i = 0; i < readersCount /* в нашем случае 500 */; i++)
        readSemaphore++;
    }
    void RWMutex::releaseWrite() {
      if (!f4read)
        writeMutex.release();
      readSemaphore -= readersCount;
      if (f4read)
        writeMutex.release();
    }
    

    This design, due to the use of lockWriteincrement in the body readSemaphore++in the loop, instead readSemaphore += readersCount, gives equal chances for reader and writer streams. Perhaps he did not know that the semaphore class for constructing this RWMutex, used one cross-platform library, which produced a simple code for this particular platform that looked something like this:
    int Semaphore::operator ++() {
      mutex.lock();
      if (sema++ > MaxFlowCount) flowMutex.lock();
      mutex.release();
    }
    

    Those. when adding hashTab100 values to the hash table , while reading by several reader threads at the same time, we had 100 * 500 locks (and a few milliseconds were precipitated due to context switch). The most interesting thing in this story is that it was the base class RWSyncHashTable, actively used everywhere in our code.
    Keep in mind: some API constructs may already be in sync. Sometimes it is even a constructor and destructor of an object. In this case, additional synchronization is often harmful. This is just the case when you spoil the porridge with butter.
    Read the sources, look at the documentation for the API - and such mistakes are more likely to bypass you.

    Synchronous! = Waiting

    Synchronization of execution does not mean at all that our process only does what awaits. The blocking methods of modern systems are quite flexible, and allow you to do the following designs:
    static int mtx_locked = 0;
    // уже заблокировано кем-то - нет, подождем 1 мс?
    while ( mtx_locked || !mtx.lock(config.MaxWaitTime /* пример 1 ms */) ) {
      // не могу блокировать - сделай что-то другое ... например ...
      processNextRequest();
    }
    // за мютексом - блокирован ...
    mtx_locked++;
    // исполняем ...
    processInLock();
    // unlock ...
    mtx_locked--;
    mtx.release();
    

    Using this kind of code allows you not to wait for the mutex to lock and goes to bed, but try to do something else at this time. On a similar principle, although often a little differently implemented (callback or event execution, transactional nowait locking, per thread-caching, etc.), the concept of asynchronous programming is based. In this case, you must follow a very simple rule - "do not wait."
    This example shows another trick to avoid or minimize context switch: it is a static variable mtx_locked. This technique allows us not to perform it mtx.lock, if it is already known in advance that the code is blocked ( mtx_locked > 0), and we don’t need to know for sure - we just do something else.

    It’s probably worth finishing the first part (a lot of letters). If I wrote somewhere truths for someone, I ask you to forgive me submissively - not from evil. Suggestions, wishes, criticism are welcome.

    In the next part:

    • Deadlock
    • Events; reaction while waiting;
    • Synchronization in the data banks;
    • Systemwide synchronization (crossprocess, crosscluster)
    • Asynchronous programming;
    • Shared resources
    • Garbage, Releasing Resources
    • Stream Monitoring, HeartBit
    • Profiling
    • Your wishes

    Also popular now: