Stream-safe signals that are really convenient to use

There are many libraries in the world that implement signals in C ++. Unfortunately, all the implementations I came across have several problems that prevent me from writing simple multithreaded code using these libraries. Here I will talk about these problems, and how they can be solved.

What are signals?


I think many are already familiar with this concept, but just in case, I’ll write it.

A signal is a way to send a notification of an arbitrary event to recipients who can register independently of each other. If you like, callback with many recipients. Or, for those who worked with .NET, a multicast delegate.

A couple of examples with boost :: signals2
Signal Announcement:

struct Button
{
    boost::signals2::signal OnClick;
};

Connecting to and disconnecting from a signal:

void ClickHandler()
{ cout << “Button clicked” << endl; }
// ...
boost::signals2::connection c = button->OnClick.connect(&ClickHandler);
// ...
c.disconnect();

Signal call:

struct Button
{
    boost::signals2::signal OnClick;
private:
    void MouseDownHandler()
    {
        OnClick();
    }
};


Now about the problems


In single-threaded code, everything looks great and works pretty well, but what about multi-threaded?

Here, unfortunately, there are three problems common to different implementations:

  1. There is no way to atomically connect to a signal and get a bound state
  2. Non-blocking disconnection from a signal
  3. Disabling an asynchronous handler does not cancel calls that are already in its thread queue

Let's consider each of them in detail. To do this, we will write part of the firmware of an imaginary media set-top box, namely three classes:

  • StorageManager - a class that responds to flash drives, DVDs and other media that the user inserted into the console
  • MediaScanner - a class that searches for media files on each of these devices
  • MediaUiModel - a model for displaying these media files in an imaginary Model-View-something framework

I must say right away that the code that you will see here is extremely simplified, and does not contain anything superfluous, so that we can concentrate on these problems. You will also see types of type TypePtr . This is just std :: shared_ptrDo not be alarmed.

There is no way to atomically connect to a signal and get a bound state


So, StorageManager . We need a getter for those media that are already inserted in the console, and a signal to notify you of new ones.

class StorageManager
{
public:
    std::vector GetStorages() const;
    boost::signals2::signal OnStorageAdded;
    // ...
};

Alas, such an interface cannot be used without getting a race condition.

Doesn't work in that order ...

storageManager->OnStorageAdded.connect(&StorageHandler);
// Если пользователь вставляет флэшку до цикла, она будет обработана дважды
for (auto&& storage : storageManager->GetStorages())
    StorageHandler(storage);

... and does not work in that order.

for (auto&& storage : storageManager->GetStorages())
    StorageHandler(storage);
// Если пользователь вставляет флэшку до подключения к сигналу, она не будет обработана совсем
storageManager->OnStorageAdded.connect(&StorageHandler);

Common solution


Obviously, since we got a race condition, we need a mutex.

class StorageManager
{
    mutable std::recursive_mutex   _mutex;
    std::vector        _storages;
public:
    StorageManager()
    { /* ... */ }
    boost::signals2::signal OnStorageAdded;
    std::recursive_mutex& GetMutex() const
    { return _mutex; }
    std::vector GetStorages() const
    {
        std::lock_guard l(_mutex);
        return _storages;
    }
private:
    void ReportNewStorage(const StoragePtr& storage)
    {
        std::lock_guard l(_mutex);
        _storages.push_back(storage);
        OnStorageAdded(storage);
    }
};
// ...
{
    std::lock_guard l(storageManager->GetMutex());
    storageManager->OnStorageAdded.connect(&StorageHandler);
    for (auto&& storage : storageManager->GetStorages())
        StorageHandler(storage);
}

This code will work, but it has several drawbacks:

  • If you want to use std :: mutex instead of std :: recursive_mutex , you lose the ability to capture it inside the GetStorages method , which makes the StorageManager class non- thread safe
  • You cannot get rid of copying a collection inside GetStorages without losing the thread safety of StorageManager .
  • You have to show the type std :: vectoralthough in reality these are just implementation details
  • A rather voluminous code for connecting to a signal and receiving the current state, which in this case is almost the same for different signals

How to do better?


Let's transfer everything that we do around the connect call (grabbing the mutex and traversing the collection) inward.

It is important to understand here that the algorithm for obtaining the current state depends on the nature of this state itself. If this is a collection, you need to call the handler for each element, but if it is, for example, enum, then you need to call the handler exactly once. Accordingly, we need some abstraction.

Add a populator to the signal - a function that accepts the handler that is now connected, and let the signal owner (StorageManager, in our case) determine how the current state will be sent to this handler.

template < typename Signature >
class signal
{
    using populator_type = std::function&)>;
    mutable std::mutex                       _mutex;
    std::list >     _handlers;
    populator_type                           _populator;
public:
    signal(populator_type populator)
        : _populator(std::move(populator))
    { }
    std::mutex& get_mutex() const { return _mutex; }
    signal_connection connect(std::function handler)
    {
        std::lock_guard l(_mutex);
        _populator(handler); // Владелец сигнала определяет конкретный алгоритм получения состояния
        _handlers.push_back(std::move(handler));
        return signal_connection([&]() { /* удаляем обработчик из _handlers */ } );
    }
    // ...
};

The signal_connection class currently accepts a lambda function that will remove the handler from the list in the signal. I will give a bit more complete code later.

We rewrite StorageManager using this new concept:

class StorageManager
{
    std::vector        _storages;
public:
    StorageManager()
        : _storages([&](const std::function& h) { for (auto&& s : _storages) h(s); })
    { /* ... */ }
    signal OnStorageAdded;
private:
    void ReportNewStorage(const StoragePtr& storage)
    {
        // Мы должны захватить мьютекс именно тут, а не внутри вызова сигнала,
        // потому что он защищает в том числе и коллекцию _storages
        std::lock_guard l(OnStorageAdded.get_mutex());
        _storages.push_back(storage);
        OnStorageAdded(storage);
    }
};

If you use C ++ 14, the populator can be quite short:

StorageManager()
    : _storages([&](auto&& h) { for (auto&& s : _storages) h(s); })
{ }

Please note that when the populator is called, the mutex is captured in the signal :: connect method , so this is not necessary in the body of the populator.

The client code becomes very short:

storageManager->OnStorageAdded.connect(&StorageHandler);

With one line, we simultaneously connect to the signal and get the current state of the object. Excellent!

Non-blocking disconnection from a signal


Теперь пора писать MediaScanner. В конструкторе подключимся к сигналу StorageManager::OnStorageAdded, а в деструкторе отключимся.

class MediaScanner
{
private:
    boost::signals2::connection _connection;
public:
    MediaScanner(const StorageManagerPtr& storageManager)
    { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); }
    ~MediaScanner()
    {
        _connection.disconnect();
        // Обработчик сигнала может всё ещё исполняться в потоке, вызвавшем сигнал.
        // В этом случае, далее он будет обращаться к разрушенному объекту MediaScanner.
    }
private:
    void StorageHandler(const StoragePtr& storage)
    { /* Здесь что-то долгое */ }
};

Увы, этот код время от времени будет падать. Причина в том, как работает метод disconnect во всех известных мне реализациях. Он гарантирует, что когда сигнал будет вызван в следующий раз, соответствующий обработчик не сработает. При этом, если обработчик в это время исполняется в другом потоке, то он не будет прерван, и продолжит работать с разрушенным объектом MediaScanner.

Решение в Qt


In Qt, each object belongs to a specific thread, and its handlers are called exclusively in that thread. To safely disconnect from the signal, you should call the QObject :: deleteLater method , which ensures that the actual deletion will be performed from the desired thread, and that no handler will be called after deletion.

mediaScanner->deleteLater();

This is a good option if you are ready to fully integrate with Qt (abandon std :: thread in the core of your program in favor of QObject, QThread, etc.).

Solution in boost :: signals2


To solve this problem, boost suggests using the track / track_foreign methods in a slot (i.e., a handler). These methods take weak_ptr for an arbitrary object, and the connection of the handler with the signal exists while each of the objects is alive, which the slot “watches”.

This works quite simply: in each slot there is a collection of weak_ptr 's on monitored objects that are "locked" (sorry) for the duration of the handler. Thus, these objects are not guaranteed to be destroyed as long as the handler code has access to them. If any of the objects has already been destroyed, the connection is broken.

The problem is that we need to have weak_ptr for thison the object to be signed. In my opinion, the most adequate way to achieve this is to make a factory method in the MediaScanner class , where to sign the created object to all the signals it is interested in:

class MediaScanner
{
public:
    static std::shared_ptr Create(const StorageManagerPtr& storageManager)
    {
        std::lock_guard l(storageManager->GetMutex());
        MediaScannerPtr result(new MediaScanner);
        boost::signals2::signal::slot_type
            slot(bind(&MediaScanner::StorageHandler, result.get(), _1));
        slot.track_foreign(result);
        storageManager->OnStorageAdded.connect(slot);
        for (auto&& storage : storageManager->GetStorages())
            result->StorageHandler(storage);
        return result;
    }
private:
    MediaScanner() // приватный конструктор!
    { /* Проинициализировать всё, кроме обработчиков сигналов */ }
    void StorageHandler(const StoragePtr& storage);
    { /* Здесь что-то долгое */ }
};

So, the disadvantages are:

  • O-a lot of code that you copy each time
  • MediaScanner's initialization has split into two parts: subscribing to signals in the Create method , and everything else in the constructor
  • You must use shared_ptr to store MediaScanner
  • You are not sure if MediaScanner was deleted when you released the last external link to it. This can be a problem if it uses any limited resource that you want to reuse after MediaScanner is released .

How to do better?


Let's make the disconnect method block, so that it guarantees to us that after it returns control, it is possible to destroy everything that the signal handler had access to. Something like the std :: thread :: join method .

Looking ahead, I’ll say that we need three classes for this:

  • life_token - controls the lifetime of the handler, allows you to mark it as "dying", and wait for the execution to complete, if necessary
  • life_token :: checker - stored inside the signal next to the handler, refers to its life_token
  • life_token :: checker :: execution_guard - is created on the stack for the duration of the handler’s execution, blocks the corresponding life_token and allows you to check if the handler “died” earlier

Class code signal_connection :

class signal_connection
{
    life_token               _token;
    std::function    _eraseHandlerFunc;
public:
    signal_connection(life_token token, std::function eraseHandlerFunc)
        : _token(token), _eraseHandlerFunc(eraseHandlerFunc)
    { }
    ~signal_connection();
    { disconnect(); }
    void disconnect()
    {
        if (_token.released())
            return;
        _token.release(); // Тут мы ждём, если обработчик сейчас заблокирован (т. е. исполняется)
        _eraseHandler(); // Та самая лямбда-функция, которая удалит обработчик из списка
    }
};

Here I must say that I am a supporter of the RAII connection object. I will not dwell on this in detail, I will only say that it is not significant in this context.

The signal class will change a bit too:

template < typename Signature >
class signal
{
    using populator_type = std::function&)>;
    struct handler
    {
        std::function    handler_func;
        life_token::checker         life_checker;
    };
    mutable std::mutex            _mutex;
    std::list            _handlers;
    populator_type                _populator;
public:
    // ...
    signal_connection connect(std::function handler)
    {
        std::lock_guard l(_mutex);
        life_token token;
        _populator(handler);
        _handlers.push_back(Handler{std::move(handler), life_token::checker(token)});
        return signal_connection(token, [&]() { /* удаляем обработчик из _handlers */ } );
    }
    template < typename... Args >
    void operator() (Args&&... args) const
    {
        for (auto&& handler : _handlers)
        {
            life_token::checker::execution_guard g(handler.life_checker);
            if (g.is_alive())
                handler.handler_func(forward(args)...);
        }
    }
};

Now, next to each handler, we have the life_token :: checker object , which refers to the life_token that lies in signal_connection . We capture it for the duration of the handler using the life_token :: checker :: execution_guard object

I’ll hide the implementation of these objects under the spoiler. If you are tired, you can skip.
Inside life_token, we need the following things:

  • Some kind of primitive operating system to wait in life_token :: release (here for simplicity we take a mutex)
  • Live / Dead Flag
  • Lock counter through execution_guard (omitted here for simplicity)

class life_token
{
    struct impl
    {
        std::mutex              mutex;
        bool                    alive = true;
    };
    std::shared_ptr       _impl;
public:
    life_token() : _impl(std::make_shared()) { }
    ~life_token() { release(); }
    bool released() const { return !_impl; }
    void release()
    {
        if (released())
            return;
        std::lock_guard l(_impl->mutex);
        _impl->alive = false;
        _impl.reset();
    }
    class checker
    {
        shared_ptr	_impl;
    public:
        checker(const life_token& t) : _impl(t._impl) { }
        class execution_guard
        {
            shared_ptr	_impl;
        public:
            execution_guard(const checker& c) : _impl(c._impl) { _impl->mutex.lock(); }
            ~execution_guard() { _impl->mutex.unlock(); }
            bool is_alive() const { return _impl->alive; }
        };
    };
};

The mutex is captured for the lifetime of execution_guard . Accordingly, if the life_token :: release method is called in another thread at this time , it will block at the capture of the same mutex and wait for the signal handler to complete. After that, it will clear the alive flag , and all subsequent calls to the signal will not lead to a call to the handler.

What does MediaScanner code look like now ? Exactly the way we wanted to write it at the very beginning:

class MediaScanner
{
private:
    signals_connection    _connection;
public:
    MediaScanner(const StorageManagerPtr& storageManager)
    { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); }
    ~MediaScanner()
    { _connection.disconnect(); }
private:
    void StorageHandler(const StoragePtr& storage)
    { /* Здесь что-то долгое */ }
};

Disabling an asynchronous handler does not cancel calls that are already in its thread queue


We write MediaUiModel , which will respond to the found media files and add lines to display them.

To do this, add the following signal to MediaScanner :

signal OnMediaFound;

There are two important things here:

  • A model is an object of a UI library, so all actions with it should be performed from a UI stream.
  • Often UI libraries use their own hierarchy of ownership, so we cannot use shared_ptr to store the model. Accordingly, the focus with track / track_foreign will not work here, but this is not the main thing now, so let's pretend that everything is fine

class MediaUiModel : public UiModel
{
private:
    boost::io_service&             _uiThread;
    boost::signals2::connection    _connection;
public:
    MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner)
        : _uiThread(uiThread)
    {
        std::lock_guard l(scanner.GetMutex());
        scanner.OnMediaFound.connect([&](const MediaPtr& m) { this->MediaHandler(m); });
        for (auto&& m : scanner.GetMedia())
            AppendRow(MediaUiModelRow(m))
    }
    ~MediaUiModel()
    { _connection.disconnect(); }
private:
    // Этот метод выполняется в потоке MediaScanner'а, и всю реальную работу перебрасывает в поток UI.
    void MediaHandler(const MediaPtr& m)
    { _uiThread.post([&]() { this->AppendRow(MediaUiModelRow(m)); }); }
};

In addition to the previous problem, there is one more. Each time a signal is triggered, we transfer the handler to the UI stream. If at some point we delete the model (for example, we left the Gallery application), all these handlers come later to the dead object. And again the fall.

Solution in Qt


All the same deleteLater , with the same features.

Solution in boost :: signals2


If you're lucky and your UI framework allows you to tell deleteLater models , you are saved. You just need to make a public method, which will first disconnect the model from the signals, and then call deleteLater , and you will get about the same behavior as in Qt. True, you still have to solve the previous problem. To do this, you are likely to make a shared_ptr model inside a model that will subscribe to signals. The code is not very small, but this is a matter of technology.

If you are not lucky, and your UI framework requires deleting the model exactly when he wants to, you will invent your life_token .

For example, something like this (also better not to read if you are tired).
template < typename Signature_ >
class AsyncToUiHandlerWrapper
{
private:
    boost::io_service&          _uiThread;
    std::function   _realHandler;
    bool                        _released;
    mutable std::mutex          _mutex;
public:
    AsyncToUiHandlerWrapper(boost::io_service& uiThread, std::function realHandler)
        : _uiThread(uiThread), _realHandler(realHandler), _released(false)
    { }
    void Release()
    {
        std::lock_guard l(_mutex);
        _released = true;
    }
    template < typename... Args_ >
    static void AsyncHandler(const std::weak_ptr& selfWeak, Args_&&... args)
    {
        auto self = selfWeak.lock();
        std::lock_guard l(self->_mutex);
        if (!self->_released) // AsyncToUiHandlerWrapper не был освобождён, значит _uiThread всё ещё ссылается на живой объект
            self->_uiThread.post(std::bind(&AsyncToUiHandlerWrapper::UiThreadHandler, selfWeak, std::forward(args)...)));
    }
private:
    template < typename... Args_ >
    static void UiThreadHandler(const std::weak_ptr& selfWeak, Args_&&... args)
    {
        auto self = selfWeak.lock();
        if (!self)
            return;
        if (!self->_released) // AsyncToUiHandlerWrapper не был освобождён, значит, объекты, доступные _realHandler, ещё живы
            self->_realHandler(std::forward(args)...);
    }
};
class MediaUiModel : public UiModel
{
private:
    using AsyncMediaHandler = AsyncToUiHandlerWrapper;
private:
    std::shared_ptr    _asyncHandler;
public:
    MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner)
    {
        try
        {
            _asyncHandler = std::make_shared(std::ref(uiThread), [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); });
            std::lock_guard l(scanner.GetMutex());
            boost::signals2::signal::slot_type
                slot(std::bind(&AsyncMediaHandler::AsyncHandler, std::weak_ptr(_asyncHandler), std::placeholders::_1));
            slot.track_foreign(_asyncHandler);
            scanner.OnMediaFound.connect(slot);
            for (auto&& m : scanner.GetMedia())
                AppendRow(MediaUiModelRow(m));
        }
        catch (...)
        {
            Destroy();
            throw;
        }
    }
    ~MediaUiModel()
    { Destroy(); }
private:
    void Destroy()
    {
        if (_asyncHandler)
            _asyncHandler->Release(); // Асинхронный код не обращается к MediaUiModel после этой строки, так что можно окончательно разрушать объект
        _asyncHandler.reset();
    }
};

I won’t even comment on this code, let's just get a little sad.

How to do better?


Very simple. First, make an interface for the thread as a task queue:

struct task_executor
{
    virtual ~task_executor() { }
    virtual void add_task(const std::function& task) = 0;
};

Secondly, make an overloaded connect method in the signal , which receives the stream:

signal_connection connect(const std::shared_ptr& worker, std::function handler);

In this method, put a wrapper over the handler in the _handlers collection , which, when called, transfers a pair of the handler and the corresponding life_token :: checker to the desired stream . To call the real handler in the final thread, we will use execution_guard in the same way as before.

Thus, the disconnect method will guarantee us, among other things, that asynchronous handlers will also not be called after we disconnect from the signal. I will not provide the

code for the wrapper and the overloaded connect method here. I think the idea is clear and so.

The model code becomes very simple:

class MediaUiModel : public UiModel
{
private:
    signal_connection    _connection;
public:
    MediaUiModel(const std::shared_ptr& uiThread, const MediaScanner& scanner)
    { _connection = scanner.OnMediaFound.connect(uiThread, [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); }
    ~MediaUiModel()
    { _connection.reset(); }
};

Here, the AppendRow method will be called strictly in the UI thread, and only until we disconnect.

To summarize


So, there are three key things that allow you to write much simpler code using signals:

  1. Populators allow you to conveniently get the current state while connected to the signal
  2. The disconnect blocking method allows you to unsubscribe an object in its own destructor
  3. In order for the previous item to be true for asynchronous handlers, disconnect must also mark those calls that are already in the thread queue as “irrelevant”

Of course, the signal code that I brought here is very simple and primitive, and does not work very fast. My goal was to talk about an alternative approach, which seems to me more attractive than the dominant ones today. In reality, all these things can be written much more efficiently.

We have been using this approach in our project for about five years, and are very happy.

Ready implementation


I rewrote using C ++ 11 from scratch the signals that we had, improved those parts of the implementation that had long been worth improving.
Use on health: https://github.com/koplyarov/wigwag .

Mini FAQ


Judging by the reaction of people on reddit and on Twitter, basically everyone cares about three questions:

Q: Immediately, you need to block life_token to call each handler. Would it be slow?
A: Oddly enough, no. You can use atomic variables instead of the mutex, and if we still got the disconnect call at the moment the handler was executed, wait on std :: condition_variable . Then the result is exactly the opposite: due to the missing overhead in the form of track / track_foreign (which require working with weak_ptr collections ), this implementation leaves memory :: speed2 far behind in memory and speed, and even outperforms Qt.
Benchmarks can be found here .

Q: Will there be deadlocks due to a blocking disconnect method?
A: Yes, deadlocks are really a bit easier to get than in boost and Qt. In my opinion, this pays off with a simpler code for using signals and a higher speed of their work. Moreover, if you carefully monitor who is following whom, then such situations are more likely the exception.

Well, of course, deadlocks need to be caught and repaired. On Linux, I recommend Helgrind for this . For Windows, a two-minute Google search is provided by Intel Inspector and CHESS .

If for some reason you cannot afford any of the above (for example, your platform does not have enough memory to run helgrind or some kind of marginal operating system), there is a crutch in the form of this (again, simplified) mutex class :

class mutex
{
private:
    std::timed_mutex    _m;
public:
    void lock()
    {
        if (_m.try_lock())
            return;
        while (!_m.try_lock_for(std::chrono::seconds(10)))
            Logger::Warning() << "Could not lock mutex " << (void*)this << " for a long time:\n" << get_backtrace_string();
    }
    // ...
};

Both Visual Studio and GCC have tools for getting backtraces in code. In addition, there is a good libunwind.
With this approach, most of your deadlocks will be caught by QA, and at a glance at the logs you will understand where everything is blocked. It only remains to be repaired.

Q: Is it possible to use one mutex for several signals? Is it possible to handle exceptions the way I want? Is it possible not to use synchronization and get fast single-threaded signals?
A: You can, you can, you can. There are template strategies for this. Read more in the documentation.

Also popular now: