QThread + QtSql the right way

    Today's article inspired me to share my way of putting databases in a separate thread. The method is suitable not only for the database, but also for any interactions described by the pattern "an object lives in a separate stream, you need to ask him something and do something with it." In addition, the method is good in that it tries to be type-safe and extensible: no stringly-typed QMetaObject::invokeMethod(), no transmission of the results of the object jerking in the stream through signals. Only a direct function call, only QFuture!

    Disclaimer: the code given here is a piece of one of my big project, so it will use some auxiliary library functions from this project. However, I will try not to miss out on cases of such use and describe their semantics.

    So, let's start with the most important thing: how would we like to work with an object in a separate thread? Ideally, we simply pull some methods of some object, methods return , the readiness of which will mean that the corresponding asynchronous method has finished executing, and it has type results . Recall that decomposition is our friend, so we take our original task “we need to pull something in a separate thread” and consider its piece “we need to keep a separate thread and provide a thread-safe call for something in it with a return ”. We will solve this problem as follows: the object-heir responsible for controlling the flow has a methodQFutureT

    QFuture

    QThreadScheduleImpl(), called from the main thread (and from others too), taking some functor, wrapping this functor in QFutureand storing everything that is needed, in a special queue, which is then processed internally QThread::run().

    You get something like this:
    class WorkerThreadBase : public QThread
    {
    	Q_OBJECT
    	QMutex FunctionsMutex_;
    	QList> Functions_;
    public:
    	using QThread::QThread;
    protected:
    	void run () override;
    	virtual void Initialize () = 0;
    	virtual void Cleanup () = 0;
    	template
    	QFuture> ScheduleImpl (const F& func)
    	{
    		QFutureInterface> iface;
    		iface.reportStarted ();
    		auto reporting = [func, iface] () mutable
    		{
    			ReportFutureResult (iface, func);
    		};
    		{
    			QMutexLocker locker { &FunctionsMutex_ };
    			Functions_ << reporting;
    		}
    		emit rotateFuncs ();
    		return iface.future ();
    	}
    private:
    	void RotateFuncs ();
    signals:
    	void rotateFuncs ();
    };
    


    Explanations for all sorts of ReportFutureResult and ResultOf_t
    ResultOf_t- direct analogue std::result_of_tfrom C ++ 14. My project, unfortunately, still has to support C ++ 11 compilers.
    template
    using ResultOf_t = typename std::result_of::type;
    


    ReportFutureResulttakes a functor, its arguments, executes the functor and marks the corresponding one QFutureInterfaceas ready, passing it the result of the functor execution at the same time, or wraps it in an QFutureInterfaceexception if the functor completes with this very exception. Unfortunately, the matter is somewhat complicated by the returning void functors: they have to write a separate function, because in C ++ you cannot declare a type variable void. We have such a type system, oh, there is a type, it has a value, but it cannot be declared.
    template
    EnableIf_t::value>
    	ReportFutureResult (QFutureInterface& iface, F&& f, Args... args)
    {
    	try
    	{
    		const auto result = f (args...);
    		iface.reportFinished (&result);
    	}
    	catch (const QtException_t& e)
    	{
    		iface.reportException (e);
    		iface.reportFinished ();
    	}
    	catch (const std::exception& e)
    	{
    		iface.reportException (ConcurrentStdException { e });
    		iface.reportFinished ();
    	}
    }
    template
    void ReportFutureResult (QFutureInterface& iface, F&& f, Args... args)
    {
    	try
    	{
    		f (args...);
    	}
    	catch (const QtException_t& e)
    	{
    		iface.reportException (e);
    	}
    	catch (const std::exception& e)
    	{
    		iface.reportException (ConcurrentStdException { e });
    	}
    	iface.reportFinished ();
    }
    


    QtException_t needed to support build with Qt4:
    #if QT_VERSION < 0x050000
    	using QtException_t = QtConcurrent::Exception;
    #else
    	using QtException_t = QException;
    #endif
    


    ConcurrentStdException wraps the standard exception into one that Qt's understandable QFuture mechanism understands, but its implementation is a little more complicated and is not so important here.


    That is, it ScheduleImpl()takes a functor with a signature of the type T (), returns , wraps the functor in a special function, now with the signature associated with the returned one , and which, when the functor is executed, marks this ready, and adds this wrapper to the queue. After that, a signal is emitted , which inside connects with the method that is just responsible for processing the queue of stored wrappers of functors. Now let's look at the implementation of methods and :QFuturevoid ()QFutureQFuture

    rotateFuncs()run()RotateFuncs()

    run()RotateFuncs()
    void WorkerThreadBase::run ()
    {
    	SlotClosure rotator
    	{
    		[this] { RotateFuncs (); },
    		this,
    		SIGNAL (rotateFuncs ()),
    		nullptr
    	};
    	Initialize ();
    	QThread::run ();
    	Cleanup ();
    }
    void WorkerThreadBase::RotateFuncs ()
    {
    	decltype (Functions_) funcs;
    	{
    		QMutexLocker locker { &FunctionsMutex_ };
    		using std::swap;
    		swap (funcs, Functions_);
    	}
    	for (const auto& func : funcs)
    		func ();
    }
    


    A bit about SlotClosure
    SlotClosure- A helper class that helps to attach signals to lambdas, not slots. Qt5 has a more adequate syntax for this, but, unfortunately, I also still need to support the Qt4 build.

    SlotClosureit’s simple, it calls its first argument every time when the object, which is the second argument, emits a signal, the third argument. The fourth argument is the parent object. Here we are SlotClosureasked on the stack, so parents are not needed.

    A template argument NoDeletePolicymeans that the object should not commit suicide after the first signal. Among other deletion policies, there is, for example, the deletion of the DeleteLaterPolicyconnection object after the first operation of the signal, which is convenient for various once-running tasks.


    With these functions, everything is simple: we connect the signal rotateFuncs()to the function RotateFuncs()(hmm, I wonder how many comments will be on the topic of the naming style?), We call the initialization function of the stream objects, defined somewhere in the heir, and we begin to twist the stream. When the owner of the stream makes the thread quit(), he QThread::run()returns control, and the heir can clean up after himself Cleanup().

    Note that it is precisely the Qt's signal-slot mechanism that is responsible for causing the radiation rotateFuncs()emitted from the main stream RotateFuncs()in our stream WorkerThreadBase.

    RotateFuncs()however, it briefly blocks the main queue, moving it to itself, after which it begins to execute it sequentially.

    Actually, that’s all. As an example of use, you can cite, for example, a piece of the system for storing avatars on disk in the IM client:
    avatarsstoragethread.h
    class AvatarsStorageThread final : public Util::WorkerThreadBase
    {
    	std::unique_ptr Storage_;
    public:
    	using Util::WorkerThreadBase::WorkerThreadBase;
    	QFuture SetAvatar (const QString& entryId, IHaveAvatars::Size size, const QByteArray& imageData);
    	QFuture> GetAvatar (const QString& entryId, IHaveAvatars::Size size);
    	QFuture DeleteAvatars (const QString& entryId);
    protected:
    	void Initialize () override;
    	void Cleanup () override;
    };
    


    avatarsstoragethread.cpp
    QFuture AvatarsStorageThread::SetAvatar (const QString& entryId,
    		IHaveAvatars::Size size, const QByteArray& imageData)
    {
    	return ScheduleImpl ([=] { Storage_->SetAvatar (entryId, size, imageData); });
    }
    QFuture> AvatarsStorageThread::GetAvatar (const QString& entryId, IHaveAvatars::Size size)
    {
    	return ScheduleImpl ([=] { return Storage_->GetAvatar (entryId, size); });
    }
    QFuture AvatarsStorageThread::DeleteAvatars (const QString& entryId)
    {
    	return ScheduleImpl ([=] { Storage_->DeleteAvatars (entryId); });
    }
    void AvatarsStorageThread::Initialize ()
    {
    	Storage_.reset (new AvatarsStorageOnDisk);
    }
    void AvatarsStorageThread::Cleanup ()
    {
    	Storage_.reset ();
    }
    



    But the implementation AvatarsStorageOnDiskis a separate interesting topic related to my homegrown under-ORM framework that allows me to generate tablets, SQL queries and the corresponding functions for inserting / deleting / updating to describe the structure with data through Boost.Fusion. However, this implementation does not apply specifically to the issue of multithreading, which, in general, is good, especially from the point of view of decomposition of the original problem.

    And finally, we note the shortcomings of the proposed solution:
    1. It is necessary to duplicate in the public API of the successor class WorkerThreadBaseall the methods that you want to call on the object being taken to a separate thread. How to effectively solve this problem, I did not immediately come up with.
    2. Initialize()and Cleanup()directly asked to turn into some kind of RAII. It’s worth coming up with something on this subject.

    Also popular now: