Multithreading in C ++ and SObjectizer with CSP-channels, but absolutely no actors ...

    Earlier we talked about SObjectizer as an actor framework for C ++, although in reality this is not entirely true. For example, for a long time in SObjectizer there is such a cool thing as mchains (they are channels from the CSP model ). Mchain-s allow you to easily and naturally organize the exchange of data between workflows. Not creating agents that are not always needed. Just the other day, I once again had the opportunity to take advantage of this feature and simplify my life by transferring data between streams through channels (i.e., SObjectizer mchains). So not only in Go you can enjoy using CSP. In C ++, this is also possible. Who cares what and how, I ask for a cat.


    The task was as follows: there is a certain third-party system to which you need to make synchronous requests. It was necessary to see how this system behaves if requests to it go not in one stream, but in several. To do this, you should make an existing single-threaded multi-threaded client, the working threads of which would issue their own stream of requests to a third-party system.


    The full list of requests that were to be executed was in a separate file. So it was necessary to read this file sequentially, get another request and give it to one of the free working threads. Each thread counted the number of completed requests. It was necessary to determine how long it would take to proofread and process all the requests, as well as to calculate how many requests were completed.


    The obvious solution was a simple solution. There is a main working thread that reads the request file. Each request is put in the general queue of requests. From where requests are parsed by working threads. Those. the working thread takes the first request from the queue, executes it, then takes the new first request from the queue, etc. If the queue is empty, then the working thread should pause until something appears in the queue. If it is very full, the main thread should be suspended until a free space appears in the queue.


    Mchain's from SObjectizer just let you do without writing your thread-safe queues.


    To solve this problem, two mchain were required. The first mchain is used to send read requests to work threads. The main thread writes requests to it,
    working threads read requests from there. When the request file is fully read, the main thread just closes this mchain. Accordingly, as soon as worker threads see that there is nothing in mchain and it is closed, they will complete their work.


    The second mchain was required so that the working threads could transmit information to the main thread that they assured their work and how many requests they processed. In this mchain, worker threads write just one message. And the main thread only reads from this mchain.


    Well, now you can see how it all looks in code. Code without comments, because it was a one-time ejection program. Therefore, the necessary explanations will be given after the corresponding piece of code.


    Let's start with the run_app function, which is called from main () right after the program parses the command line parameters:


    void
    run_app( const app_args_t & args )
    {
    	so_5::wrapped_env_t sobj(
    			[]( so_5::environment_t & ) {},
    			[]( so_5::environment_params_t & params ) {
    				params.infrastructure_factory(
    						so_5::env_infrastructures::simple_mtsafe::factory() );
    			} );
    	auto tasks_ch = create_mchain( sobj,
    			std::chrono::seconds(5),
    			50,
    			so_5::mchain_props::memory_usage_t::preallocated,
    			so_5::mchain_props::overflow_reaction_t::abort_app );
    	auto finish_ack_ch = create_mchain( sobj );
    	std::vector< std::thread > workers;
    	const auto cleanup = cpp_util_3::at_scope_exit( [&] {
    			so_5::close_drop_content( finish_ack_ch );
    			so_5::close_drop_content( tasks_ch );
    			for( auto & t : workers )
    				t.join();
    		} );
    	cpp_util_3::n_times( args.m_threads_count, [&] {
    		workers.emplace_back( [&] {
    				worker_thread( args, tasks_ch, finish_ack_ch );
    			} );
    		} );
    	do_main_work( args, tasks_ch, finish_ack_ch );
    }

    Here, first, an instance of SObjectizer Environment is created, to which the mchain will belong. Without an SOEnvironment, you cannot create mchain, so you have to create an SOEnvironment.


    But we do not need a full-fledged SOEnvironment, which is designed to create clouds of agents in the application, for the effective management of which SOEnvironment is forced to create several own auxiliary threads. Therefore, in the parameters of SOEnvironment, we ask to use a special, single-threaded version of SObjectizer . In this case, wrapped_env_t will create one auxiliary thread on which so_5 :: launch () will be called and that’s it. More SObjectizer will not do anything. Yes, and this auxiliary thread will sleep in so_5 :: launch () until there is a return from run_app.


    Next, we need mchain to distribute requests to work threads. This is tasks_ch. But this is not a simple mchain. Firstly, it is a limited capacity mchain. Attempting to add another message to the filled mchain will block the current thread. But block not forever, but only for 5 seconds. If even after 5 seconds there is no free space in mchain, the entire application will be interrupted by calling std :: abort (). In this case, it is justified, because under normal conditions, none of the working threads should fall asleep for more than a few milliseconds, not to mention 5 seconds. So if there is no free space in tasks_ch in 5 seconds, then something is definitely going wrong, so you need to call std :: abort (). In addition, since tasks_ch has a predetermined size,


    With the second mchain, in which worker threads will send finish_ack messages, everything is much simpler. Therefore, finish_ack_ch is created by a simple call to create_mchain, with default parameters (dimensionless mchain without blocking on send operations).


    Next, we need to run N worker threads and save them in the workers vector. But it’s not so simple here. We can get an exception when creating the next working thread. In this case, it would be useful for us to normally complete those threads that have already been created.


    To simplify your life with the rollback of previously performed operations, an analog of D-scope sc_exit is used (well, either an analog of BOOST_SCOPE_EXIT or G-section defer, here it’s closer to anyone). The cleanup variable is essentially an object with a lambda inside. This lambda is called when the cleanup variable is called. It is created by cleanup using the small cpp_util helper library . Another explanation about cleanup: the first thing we need to do when cleaning is to close the mchains. If any of the working threads has already started and fell asleep on the receive call from tasks_ch, then closing tasks_ch in cleanup will immediately wake this thread and allow it to complete its work.


    Well, then we create the working threads and call do_main_work. Inside do_main_work, the main work of the main thread of the application is performed: reading a file with requests, sending requests to working threads, collecting results. Here's what a simplified version of do_main_work looks like, from which minor details were removed:


    void
    do_main_work(
    	const app_args_t & args,
    	so_5::mchain_t tasks_ch,
    	so_5::mchain_t finish_ack_ch )
    {
    	data_file_handler_t file{
    			args.m_data_file,
    			args.m_force_keep_alive
    		};
    	const auto started_at = hires_clock::now();
    	while( !file.is_eof() )
    	{
    		auto request = file.get_next_request();
    		if( !request )
    			break;
    		so_5::send< std::string >( tasks_ch, *request );
    	}
    	so_5::close_retain_content( tasks_ch );
    	unsigned long long total_requests{};
    	so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ),
    			[&]( const finish_ack_t & what ) {
    				total_requests += what.m_requests;
    			} );
    	const auto total_time = hires_clock::now() - started_at;
    	if( total_requests )
    	{
    		... // Print the results...
    	}
    }

    All the most interesting is collected in two places.


    First, inside the while. There, requests from the file are read sequentially and transmitted to the working threads by calling send. If send is called when tasks_ch is full, then the main thread will be suspended (but no more than 5 seconds).


    Secondly, when the entire request file is read, we need to wait for answers from all the working threads. To do this, we first close tasks_ch so that the working threads understand that it is time to complete their work. But you need to close it so that those requests that are already in the queue, but have not yet been processed, are lost. Therefore, close_retain_content is called (but for the cleanup action in run_app, close_drop_content was used, because there we do not need to save anything in the channel being closed).


    After tasks_ch is closed, you need to wait for a response from N work threads. This expectation of exactly N answers is recorded in one magic line:


    so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ),

    She says literally the following: read from the finish_ack_ch channel until exactly threads_count messages have been read and processed.


    Well, to complete the picture, you need to show how the code of the working thread looks. It is very simple:


    void
    worker_thread(
    	const app_args_t & args,
    	so_5::mchain_t tasks_ch,
    	so_5::mchain_t finish_ack_ch )
    {
    	io_performer_t io_performer{ args.m_srv, args.m_port };
    	unsigned long long total_requests{};
    	so_5::receive( from(tasks_ch),
    		[&]( const std::string & request ) {
    			io_performer.request_response( request );
    			++total_requests;
    		} );
    	so_5::send< finish_ack_t >( finish_ack_ch, total_requests );
    }

    The thread just hangs inside receive from the tasks_ch channel. The return from receive will happen when tasks_ch is closed. If tasks_ch is empty, then receive will sleep until
    something arrives in the channel (or until the channel is closed). And when the return from receive occurs, the working thread simply sends a finish_ack message to finish_ack_ch and ends.


    That, in fact, is all.


    I must say that there were no problems with the multithreading and information exchange between the flows. Literally the first time it started up and worked. Problems arose inside the implementation of io_performer_t :: request_response, when due to errors in the implementation of the interaction between the client and the server, the current thread hung. And then the 5-second limit on the time to wait for writing to the full tasks_ch helped: when the threads started to hang, a timeout worked and the multithreaded client crashed. It immediately became clear that there was a bug, moreover, a bug in request_response, because it was only hanging there that could stop normal reading from tasks_ch.


    In conclusion, I want to say that both the Model of Actors and the Model of Interactive Sequential Processes (aka CSP) are great things. Somewhere one works well, somewhere the second. SObjectizer allows you to use both. And then all at once, sometimes this is sometimes necessary.


    Also popular now: