Parallel Programming Using a Computing Graph

    There are applications that are well implemented as messaging systems. In the broad sense, messages can be anything — data blocks that control “signals”, etc. Logic consists of nodes that process messages and the relationships between them. Such a structure is naturally represented by a graph whose edges “flow” messages processed in nodes. The most well-known name for such a model is a computational graph.

    Using a computational graph, you can establish dependencies between tasks and, to some extent, programmatically implement a “dataflow architecture”.

    In this post I will describe how to implement such a model in C ++ using the Intel Threading Building Blocks (Intel TBB) library , namely the tbb :: flow :: graph class.



    What is Intel TBB and the tbb :: flow :: graph class


    Intel Threading Building Blocks - C ++ template library for parallel programming. It is distributed free of charge in an open source implementation, but there is also a commercial version. Available in binary form for Windows *, Linux *, and OS X *.

    TBB has many ready-made algorithms, constructs, and data structures that are tailored for use in parallel computing. Including, there are constructions that allow to implement a computational graph, which will be discussed.

    The graph, as you know, consists of vertices (nodes) and edges. The computational graph tbb :: flow :: graph also consists of nodes, an edge, and an entire graph object.



    The nodes of the graph have the sender and receiver interfaces, manage messages or perform some functions. The edges connect the nodes of the graph and are the "channels" of message transmission.

    The body of each node is represented by the TBB task and can be executed in parallel with others if there are no dependencies between them. In TBB, many parallel algorithms (or all) are built on tasks - small work items (instructions) that are executed by workflows. There may be dependencies between tasks, they can be dynamically redistributed between threads. Thanks to the use of tasks, it is possible to achieve optimal granularity and load balance on the CPU, as well as build higher-level parallel structures based on them - such as tbb :: flow :: graph.

    The simplest dependency graph


    A graph consisting of two vertices connected by one edge, one of which prints “Hello”, and the second “World”, can be schematically depicted as follows:



    And in the code it will look like this:

    #include 
    #include 
    int main(int argc, char *argv[]) {
    	tbb::flow::graph g; 
    	tbb::flow::continue_node< tbb::flow::continue_msg > 
    		h( g, []( const tbb::flow::continue_msg & ) { std::cout << "Hello "; } );
    	tbb::flow::continue_node< tbb::flow::continue_msg > 
    		w( g, []( const tbb::flow::continue_msg & ) { std::cout << "World\n"; } );
    	tbb::flow::make_edge( h, w );
    	h.try_put(tbb::flow::continue_msg());
    	g.wait_for_all();
    	return 0;
    }

    This creates an object of graph g and two nodes of type continue_node - h and w. These nodes receive and transmit a message of type continue_msg - an internal control message. They are used to build dependency graphs when the node body is executed only after a message is received from its predecessor.

    Each of continue_node executes some conditionally useful code - print “Hello” and “World”. Nodes are joined by an edge using the make_edge method. Everything, the structure of the computational graph is ready - you can run it for execution by submitting a message to it with the try_put method. Next, the graph works out, and to make sure that all its tasks are completed, we wait using the wait_for_all method.

    Simple messaging graph


    Imagine that our program should calculate the expression x 2 + x 3 for x from 1 to 10. Yes, this is not the most difficult computational task, but it will fit in for a demonstration.

    Let's try to present the count of the expression in the form of a graph. The first node will take x values ​​from the incoming data stream and send it to the nodes that are cube and squared. The exponentiation operations are independent of each other and can be executed in parallel. To smooth out possible imbalances, they transfer their result to the buffer nodes. Next comes the unifying node, which supplies the results of raising to the power of the summing node, on which the calculation ends:



    The code for such a graph:

    #include 
    #include 
    using namespace tbb::flow;
    struct square { 
      int operator()(int v) {
        printf("squaring %d\n", v);
        Sleep(1000);		 
        return v*v; 
      }
    };
    struct cube {
      int operator()(int v) {
         printf("cubing %d\n", v);
        Sleep(1000); 
        return v*v*v; 
      }
    };
    class sum {
      int &my_sum;
    public:
      sum( int &s ) : my_sum(s) {}
      int operator()( std::tuple v ) {
        printf("adding %d and %d to %d\n", std::get<0>(v), std::get<1>(v), my_sum);
        my_sum += std::get<0>(v) + std::get<1>(v);
        return my_sum;
      }
    };
    int main(int argc, char *argv[]) {
    	int result = 0;
    	graph g; 
    	broadcast_node input (g);
    	function_node squarer( g, unlimited, square() );
    	function_node cuber( g, unlimited, cube() );
    	buffer_node square_buffer(g);
    	buffer_node cube_buffer(g);
    	join_node< std::tuple, queueing > join(g);
    	function_node,int>
    		summer( g, serial, sum(result) );
    	make_edge( input, squarer );
    	make_edge( input, cuber );
    	make_edge( squarer, square_buffer );
    	make_edge( squarer, input_port<0>(join) );
    	make_edge( cuber, cube_buffer );
    	make_edge( cuber, input_port<1>(join)		);
    	make_edge( join, summer );
    	for (int i = 1; i <= 10; ++i)
    		input.try_put(i);
    	g.wait_for_all();
    	printf("Final result is %d\n", result);
    	return 0;
    }

    The Sleep (1000) function has been added to visualize the process (the example was compiled on Windows, use equivalent calls on other platforms). Then everything is as in the first example - we create nodes, combine them with edges and run them for execution. The second parameter in function_node (unlimited or serial) determines how many instances of the node body can be executed in parallel. A node of type join_node determines the readiness of the input data / messages at each input, and when both are ready, it passes them to the next node in the form of std :: tuple.

    Solving the dinner philosophers problem with tbb :: flow :: graph


    From Wikipedia :
    “The Problem of Dining Philosophers” is a classic example used in computer science to illustrate synchronization problems in the design of parallel algorithms and techniques for solving these problems.

    In the task, several philosophers are sitting at the table, and can either eat or think, but not at the same time. In our version, philosophers eat noodles with chopsticks - to eat two sticks, but one each for each:



    In this situation, a deadlock (deadlock) can occur if, for example, each philosopher grabs a stick on his left, so synchronization of actions between diners.

    Let's try to present a table with philosophers in the form of tbb :: flow :: graph. Each philosopher will be represented by two nodes: join_node for capturing sticks and function_node for accomplishing the “eat” and “think” tasks. Place for sticks on the table is implemented through queue_node. The queue_node queue can have no more than one wand, and if it is there, it is available for capture. The graph will look like this:



    Main function with some constants and header files:

    #include 
    #include 
    #include 
    using namespace tbb::flow;
    const char *names[] = 
    { "Archimedes", "Aristotle", "Democritus", "Epicurus", "Euclid", 
    "Heraclitus", "Plato", "Pythagoras", "Socrates", "Thales" };
    ….
    int main(int argc, char *argv[]) {
      int num_threads = 0;
      int num_philosophers = 10;
      if ( argc > 1 ) num_threads = atoi(argv[1]);
      if ( argc > 2 ) num_philosophers = atoi(argv[2]);
      if ( num_threads < 1 || num_philosophers < 1 || num_philosophers > 10 ) exit(1);
      tbb::task_scheduler_init init(num_threads);
      graph g;
      printf("\n%d philosophers with %d threads\n\n", 
             num_philosophers, num_threads);
      std::vector< queue_node * > places;
      for ( int i = 0; i < num_philosophers; ++i ) {
        queue_node *qn_ptr = new queue_node(g);
        qn_ptr->try_put(chopstick());
        places.push_back( qn_ptr );
      }
      std::vector< philosopher > philosophers;
      for ( int i = 0; i < num_philosophers; ++i ) {
        philosophers.push_back( philosopher( names[i], g,
                                             places[i], 
                                             places[(i+1)%num_philosophers] ) );
        g.run( philosophers[i] );
      }
      g.wait_for_all();
      for ( int i = 0; i < num_philosophers; ++i ) philosophers[i].check();
      return 0;
    }

    After processing the command line parameters, the library is initialized by creating an object of type tbb :: task_scheduler_init. This allows you to control the time of initialization and manually set the number of thread-handlers. Without this, initialization will take place automatically. Next, an object of graph g is created. The "wand places" queue_node are placed in std :: vector, and each wand is placed on a wand.

    Further philosophers are created in a similar way - placed in std :: vector. The object of each philosopher is passed to the run function of the graph object. The philosopher class will contain operator (), and the run function allows you to execute this functor in a task that is a child of the root task of the graph g object. So we can wait until these tasks are completed during the call to g.wait_for_all ().

    Philosopher class:

    const int think_time = 1000; 
    const int eat_time = 1000; 
    const int num_times = 10; 
    class chopstick {}; 
    class philosopher { 
    public: 
      typedef queue_node< chopstick > chopstick_buffer; 
      typedef join_node< std::tuple > join_type; 
      philosopher( const char *name, graph &the_graph,
                   chopstick_buffer *left, chopstick_buffer *right ) : 
      my_name(name), my_graph(&the_graph),
      my_left_chopstick(left), my_right_chopstick(right),
      my_join(new join_type(the_graph)), my_function_node(NULL),
      my_count(new int(num_times)) {} 
      void operator()(); 
      void check(); 
    private: 
      const char *my_name; 
      graph *my_graph; 
      chopstick_buffer *my_left_chopstick; 
      chopstick_buffer *my_right_chopstick; 
      join_type *my_join; 
      function_node< join_type::output_type, continue_msg > *my_function_node; 
      int *my_count; 
      friend class node_body; 
      void eat_and_think( ); 
      void eat( ); 
      void think( ); 
      void make_my_node(); 
    };

    Each philosopher has a name, pointers to the graph object and to the left and right sticks, the join_node node, the function_node function node and the my_count counter, which counts how many times the philosopher thought and ate.

    operator () (), called by the graph run function, is implemented so that the philosopher first thinks and then attaches himself to the graph.

    void philosopher::operator()() { 
      think(); 
      make_my_node(); 
    } 
    Методы think и eat просто спят положенное время:
    void philosopher::think() { 
      printf("%s thinking\n", my_name ); 
      Sleep(think_time); 
      printf("%s done thinking\n", my_name ); 
    } 
    void philosopher::eat() { 
      printf("%s eating\n", my_name ); 
      Sleep(eat_time); 
      printf("%s done eating\n", my_name ); 
    }

    The make_my_node method creates a function node, and associates both it and join_node with the rest of the graph:

    void philosopher::make_my_node() { 
      my_left_chopstick->register_successor( input_port<0>(*my_join) ); 
      my_right_chopstick->register_successor( input_port<1>(*my_join) ); 
      my_function_node = 
        new function_node< join_type::output_type, continue_msg >( *my_graph, 
          serial, node_body( *this ) ); 
      make_edge( *my_join, *my_function_node ); 
    }

    Note that the graph is created dynamically - the edge is formed by the register_successor method. It is not necessary to first completely create the graph structure, and then run it for execution. TBB has the ability to change this structure on the fly, even when the graph is already running - delete and add new nodes. This adds even more flexibility to the concept of a computational graph.

    The node_body class is a simple functor that calls the philosopher :: eat_and_think () method:

    class node_body { 
      philosopher &my_philosopher; 
    public: 
      node_body( philosopher &p ) : my_philosopher(p) { } 
      void operator()( philosopher::join_type::output_type ) { 
        my_philosopher.eat_and_think(); 
      } 
    };

    The eat_and_think method calls the eat () function and decrements the counter. Then the philosopher puts his wands on the table and thinks. And if he ate and thought the right number of times, he gets up from the table - disconnects his join_node from the graph using the remove_successor method. Here again, the dynamic structure of the graph is visible - part of the nodes are deleted while the rest continue to work.

    void philosopher::eat_and_think( ) { 
      eat(); 
      --(*my_count); 
      if (*my_count > 0) { 
        my_left_chopstick->try_put( chopstick() ); 
        my_right_chopstick->try_put( chopstick() ); 
        think(); 
      } else { 
        my_left_chopstick->remove_successor( input_port<0>(*my_join) );
        my_right_chopstick->remove_successor( input_port<1>(*my_join) );
        my_left_chopstick->try_put( chopstick() ); 
        my_right_chopstick->try_put( chopstick() ); 
      } 
    }

    In our graph there is an edge from queue_node (a place for a stick) to the philosopher, more precisely, its join_node. But in the opposite direction, no. However, the eat_and_think method can call try_put in order to put the wand back in the queue.

    At the end of the main () function, for every philosopher, the check method is called, which makes sure that the philosopher has eaten and thought the right number of times and does the necessary “cleaning”:

    void philosopher::check() { 
      if ( *my_count != 0 ) { 
        printf("ERROR: philosopher %s still had to run %d more times\n", my_name, *my_count); 
        exit(1); 
      } else { 
        printf("%s done.\n", my_name); 
      } 
      delete my_function_node; 
      delete my_join; 
      delete my_count; 
    }

    Deadlock in this example does not happen due to the use of join_node. This type of nodes creates std :: tuple from objects received from both inputs. In this case, the input data is not consumed immediately upon receipt. join_node first waits until the data appears on both inputs, then tries to reserve them in turn. If this operation is successful - only then they are "consumed" and std :: tuple is created from them. If the reservation of at least one input “channel” does not work out, those that are already reserved are released. Those. if the philosopher can capture one wand, but the second is busy, he will let go of the first and wait, without blocking the neighbors in vain.

    This dining philosopher example demonstrates several features of the TBB graph:
    • Using join_node to ensure resource access synchronization
    • Dynamic graph construction - nodes can be added and removed during operation
    • Lack of single entry and exit points, the graph may have loops
    • Using the graph run function

    Types of Nodes


    tbb :: flow :: graph provides a fairly wide range of node options. They can be divided into four groups: functional, buffering, combining and separating, and others. List of node types with legend:



    Conclusion


    Using a graph implemented in Intel TBB, you can create complex and interesting parallel program logic, sometimes called "unstructured parallelism." The computational graph allows you to organize dependencies between tasks, build applications based on the transmission of messages and events.

    The graph structure can be either static or dynamic - nodes and edges can be added and removed on the fly. You can connect individual subgraphs into a large graph.

    Most of the material is based on the English publications of my overseas colleagues.

    For those who are interested, try:

    Download the Intel Threading Building Blocks library (Open Source Version):
    http://threadingbuildingblocks.org

    Commercial version of Intel TBB (functionally no different):
    http://software.intel.com/en-us/intel-tbb

    English-language blogs about tbb :: flow :: graph:
    http://software.intel.com/en- us / tags / 17218
    http://software.intel.com/en-us/tags/17455

    Also popular now: