Parallelization of tasks with dependencies - example on .NET

Original author: Riccardo Terrell
  • Transfer
Hello colleagues!

This week we gave the translation the book Manning Publishing House " Concurrency in .NET ", ambitious in its complexity :



The author kindly posted on the Medium site an excerpt from Chapter 13, which we propose to evaluate long before the premiere.
Enjoy reading!

Suppose you need to write a tool that allows you to perform a number of asynchronous tasks, each of which has its own set of dependencies that affect the order of operations. Such problems can be solved with the help of consistent and imperative execution, but if you want to achieve maximum performance, then successive operations will not work for you. Instead, you need to organize parallel execution of tasks. Many competitive problems can be interpreted as static collections of atomic operations with dependencies between their input and output data. When the operation is completed, its output is used as input for other, dependent operations. To optimize performance, tasks must be assigned based on their dependencies, and the algorithm must be configured so that dependent tasks are performed so consistently

You want to make a reusable component that executes a series of tasks in parallel, and ensure that all dependencies that could affect the order of operations are taken into account. How to build such a programming model that would provide basic parallelism of a collection of operations performed effectively, either in parallel or sequentially, depending on what dependencies arise between this operation and others?

Solution: Implement the dependency graph using the MailboxProcessor class from F # and provide methods as standard tasks (Task) so that they can be consumed from C #

Such a solution is called “Oriented Acyclic Graph” (DAG) and is intended to form a graph, splitting operations into sequences of atomic tasks with well-defined dependencies. In this case, the acyclic essence of the graph is important, since it eliminates the possibility of interlocking between tasks, provided that the tasks are in fact completely atomic. When setting the graph, it is important to understand all the dependencies between the tasks, especially implicit dependencies that can lead to interlocks or race conditions. Below is a typical example of a graph-like data structure, with which you can imagine the limitations that arise when planning interactions between operations in a given column.

A graph is an extremely powerful data structure, and you can write strong algorithms based on it.



Fig. 1 A graph is a collection of vertices connected by edges. In this representation of a directed graph, node 1 depends on nodes 4 and 5, node 2 depends on node 5, node 3 depends on nodes 5 and 6, and so on.

The DAG structure is applicable as a strategy for parallel execution of tasks, taking into account the order of dependencies, which improves performance. The structure of such a graph can be determined using a class MailboxProcessor from the F # language; In this class, the internal state is preserved for tasks registered for execution in the form of edge dependencies.

Validation of a directed acyclic graph

When working with any graph data structure, such as DAG, you have to take care of the correct registration of edges. For example, going back to Figure 1: what happens if we have node 2 registered with dependencies on node 5, and node 5 does not exist? It may also happen that some edges are dependent on each other, which is why an oriented cycle occurs. With an oriented cycle, it is critical to perform some tasks in parallel; otherwise, some tasks can always wait for others to complete, and a deadlock will occur.

The problem is solved with the help of topological sorting: this means that we can order all the vertices of the graph in such a way that any edge leads from the vertex with a smaller number to the vertex with a larger number. So, if task A should complete before task B, and task B should complete task C, which, in turn, should complete before task A, then a circular reference occurs, and the system will notify you of this error by throwing an exception. If an oriented cycle occurs during sequence management, then there is no solution. This kind of verification is called “detection of a cycle in a directed graph”. If a directed graph satisfies the described rules, then it is a directed acyclic graph, perfectly suitable for parallel launching of several tasks between which there are dependencies.

The full version of Listing 2, containing the DAG validation code, is in the source code posted online.

In the following listing, a class MailboxProccessorfrom F # is used as an ideal candidate for the implementation of a DAG that provides concurrent operation of related dependencies. First, let's define the marked union, with which we will manage the tasks and perform their dependencies.

Listing 1: Message type and data structure for coordinating tasks according to their dependencies

type TaskMessage = // #A
| AddTask ofint * TaskInfo
| QueueTask of TaskInfo
| ExecuteTasks
and TaskInfo = // #B
{ Context : System.Threading.ExecutionContext
  Edges : intarray; Id : int; Task : Func<Task>
  EdgesLeft : intoption; Start : DateTimeOffset optionEnd : DateTimeOffset option }


#A sends to the ParallelTasksDAGbase agent dagAgentresponsible for coordinating the execution of tasks.

#B Wraps the details of each task to perform.

Type TaskMessagerepresents the wrapper of messages sent to the base type agent ParallelTasksDAG. These messages are used to coordinate tasks and synchronize dependencies. Type ofTaskInfocontains and tracks the details of the registered tasks during the execution of the DAG, including dependency edges. The execution context (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) is captured for the purpose of accessing information during deferred execution, for example, of such information: current user, any state associated with a logical flow of execution, information about safe access to code, etc. After the event is triggered, the start and end time is published.

Listing 2 F # DAG agent for parallelizing dependencies related operations

type ParallelTasksDAG() =
   let onTaskCompleted = new Event<TaskInfo>() // #A
   let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox ->
   let rec loop (tasks : Dictionary<int, TaskInfo>) // #B
                (edges : Dictionary<int, int list>) = async { // #B
       let! msg = inbox.Receive() // #C
       match msg with
      | ExecuteTasks -> // #D
         let fromTo = newDictionary<int, int list>()
         let ops = newDictionary<int, TaskInfo>() // #E
         for KeyValue(key, value) in tasks do // #F
         let operation =
            { valuewith EdgesLeft = Some(value.Edges.Length) }
         forfromin operation.Edges do
           let exists, lstDependencies = fromTo.TryGetValue(from)
           ifnot <| existsthen
              fromTo.Add(from, [ operation.Id ])
           else fromTo.[from] <- (operation.Id :: lstDependencies)
         ops.Add(key, operation)
         ops |> Seq.iter (fun kv -> // #F
             match kv.Value.EdgesLeft with
            | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value))
            | _ -> ())
        return! loop ops fromTo
     | QueueTask(op) -> // #G
          Async.Start <| async { // #G
           let start = DateTimeOffset.Now
           match op.Context with // #H
           | null -> op.Task.Invoke() |> Async.AwaitATsk
           | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I
                    (fun op -> let opCtx = (op :?> TaskInfo)
                    opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo)
                   let end’ = DateTimeOffset.Now
                   onTaskCompleted.Trigger 
                    { op with Start = Some(start)
                      End = Some(end’) } // #L
                  let exists, deps = edges.TryGetValue(op.Id)
                  ifexists && deps.Length > 0then
                    let depOps = getDependentOperation deps tasks []
                    edges.Remove(op.Id) |> ignore
                   depOps |> Seq.iter (fun nestedOp ->
                        inbox.Post(QueueTask(nestedOp))) }
                   return! loop tasks edges
    | AddTask(id, op) -> tasks.Add(id, op) // #M
                         return! loop tasks edges }
  loop (newDictionary<int, TaskInfo>(HashIdentity.Structural))
       (newDictionary<int, int list>(HashIdentity.Structural)))
[<CLIEventAttribute>]
member this.OnTaskCompleted = onTaskCompleted.Publish // #L
member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N
member this.AddTask(id, task, [<ParamArray>] edges : intarray) =
  let data = { Context = ExecutionContext.Capture()
               Edges = edges; Id = id; Task = task
               NumRemainingEdges = None; Start = None; End = None }
  dagAgent.Post(AddTask(id, data)) // #O


#A Instance of the class onTaskCompletedEventused to notify when the task is completed.

#B Internal state of the agent to track task registers and their dependencies. Collections are changeable because the ParallelTasksDAGstate changes during execution and because they inherit thread safety because they are in Agent

#C Asynchronously waiting for execution

#D Message ParallelTasksDAG

wrapper that runs #E Collection displayed on a monotonically increasing index with the task to start

#F The process iterates the list tasks, analyzing dependencies with other tasks to create a topological structure, which presents the order of tasks

#G The message envelope for setting the task to the queue, its execution and, ultimately, for deleting this task from the agent state as an active dependency after the task completes.

#H If the picked up ExecutionContextis equal null, then run the task in the current context, otherwise go to #I #I

Start the task in the intercepted ExecutionContext

#L. Initiate and publish the event onTaskCompletedto give notification of the completion of the task. The event contains information about the task

#M The message envelope for adding a task for execution in accordance with its dependencies, if any

#N Starts the execution of registered tasks

#O Add a task, its dependencies and the current one ExecutionContextfor DAG execution.

The purpose of the function AddTaskis to register a task with arbitrary dependency edges. This function accepts a unique ID, a task that must be performed, and a set of edges representing the IDs of all other registered tasks that must be completed before you can begin to perform this task. If the array is empty, it means that there are no dependencies. An instance MailboxProcessorcalled dagAgentkeeps registered tasks in up-to-date “tasks”, which is a dictionary ( tasks : Dictionary<int, TaskInfo>), which relates the ID of each task and its details. Moreover, the Agent also stores the state of edge dependencies by the ID of each task (edges : Dictionary<int, int list>). When the agent receives a notification about the need to start execution, this process checks that all edge dependencies are registered and that there are no cycles in the graph. This verification step is available in the full implementation of ParallelTasksDAG, provided in the online code. Further I offer an example on C #, where I refer to the library that F # to run ParallelTasksDAG(and consume it). The registered tasks reflect the dependencies shown above in fig. one.

Func<int, int, Func<Task>> action = (id, delay) => async () => {
  Console.WriteLine($”Starting operation{id} in Thread Id
  {Thread.CurrentThread.ManagedThreadId}…”);
  await Task.Delay(delay);
};
var dagAsync = new DAG.ParallelTasksDAG();
dagAsync.OnTaskCompleted.Subscribe(op =>   
     Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”));
dagAsync.AddTask(1, action(1, 600), 4, 5);
dagAsync.AddTask(2, action(2, 200), 5);
dagAsync.AddTask(3, action(3, 800), 6, 5);
dagAsync.AddTask(4, action(4, 500), 6);
dagAsync.AddTask(5, action(5, 450), 7, 8);
dagAsync.AddTask(6, action(6, 100), 7);
dagAsync.AddTask(7, action(7, 900));
dagAsync.AddTask(8, action(8, 700));
dagAsync.ExecuteTasks();

The purpose of the auxiliary function is to display a message that the task has started, referring to the Idcurrent thread to confirm multithreading. On the other hand, an event is OnTaskCompletedregistered to issue a notification of the completion of each task with the output to the console of the IDtask and the Idcurrent flow. Here is the output that we get when calling the method ExecuteTasks.

Starting operation 8in Thread Id 23…
Starting operation 7in Thread Id 24…
Operation 8 Completed in Thread Id 23
Operation 7 Completed in Thread Id 24
Starting operation 5in Thread Id 23…
Starting operation 6in Thread Id 25…
Operation 6 Completed in Thread Id 25
Starting operation 4in Thread Id 24…
Operation 5 Completed in Thread Id 23
Starting operation 2in Thread Id 27…
Starting operation 3in Thread Id 30…
Operation 4 Completed in Thread Id 24
Starting operation 1in Thread Id 28…
Operation 2 Completed in Thread Id 27
Operation 1 Completed in Thread Id 28
Operation 3 Completed in Thread Id 30

As you can see, tasks are performed in parallel in different threads ( IDthey differ in the flow), and the order of dependencies is preserved.

In essence, this is how tasks that have dependencies are parallelized. Read more in the book Concurrency in .NET.

Also popular now: