DryadLINQ. Distributed LINQ from Microsoft Research

    The subject of attention of yesterday's post on Habré was the distributed computing framework from Microsoft Research - Dryad .

    The framework is based on the representation of the task as a directed acyclic graph , where the vertices of the graph are programs and the edges are the channels through which data is transmitted. The Dryad framework ecosystem was also reviewed, and a detailed review of the architecture of one of the central components of the framework ecosystem , the execution environment of Dryad distributed applications, was made.

    In this article, we will discuss the top-level component of the Dryad framework software stack, the DryadLINQ distributed storage query language.



    #region Lyrical digression (on the motivation for writing)
    Yesterday's article about Dryad, I missed one paragraph that you should always write when you write something about Microsoft products.

    I emphasize: I do not propose , nor discourage, the use of Dryad in my research projects (since only an academic license is available now). More than that, I repeat that Dryad is an “ internalproduct for the evil corporation we all know , whose development strategy [product and evil] Microsoft has the right to decide individually (which is quite fair).

    All these facts do not (I speak for myself) study of the ideas and concepts of the Dryad platform less interesting or less useful for professional development (again - for myself). If you have something different - thenthis is not for me it is solely your business.

    For those who read through the line , they are worried that the article about comparing with Hadoop open-source projects, and not about DryadLINQ, I will hint that comparing with alternative solutions will be only in the next article .

    1. General information

    We want them to be able to write sequential and declarative code, and then, that same code can be run on a single machine, on a multicore machine, or on a cluster of machines. That's the beauty of the DryadLINQ programming model.
    - Yuan Yu, Principal Researcher, Microsoft Research
    Dryad  Software stack

    DryadLINQ is a high-level language for querying data stored in a distributed file system with SQL-like syntax. DryadLINQ is based on the .NET Language Integrated Query (LINQ) programming model, implements a specific LINQ provider for interacting with the Dryad runtime, and provides the developer with an API for writing distributed LINQ expressions.

    Unlike query languages ​​for the Hadoop - HiveQL platform, Pig Latin - DryadLINQ is not another query language with a specific syntax (necessary for learning). Instead, DryadLINQ is based on familiar to .NET developers:
    • LINQ unified software model ;
    • => as a result - an elegant functional approach for writing data queries ;
    • .NET Framework Object Model
    • MS Visual Studio development environment ;
    • high-level PLs such as C #, F #, or any CLS-compatible language.

    Opening the first paragraph of the above list, it is worth noting that LINQ did not initially contain explicit references to the nature of the data warehouse to which the request is made. And, built on top of LINQ, the DryadLINQ API is alsodoes not "give out"strive not to “betray” their distributed nature.

    Thus, by minimizing the differences in syntax for writing a query to the database (on LINQ-to-SQL) or to a distributed file system (on DryadLINQ), the solution to one of the most common cases is greatly facilitated - migration from database-based storage to storage based on a distributed file system.

    So it seems DryadLINQ for a distributed application developer. We’ll talk below about the internal implementation of DryadLINQ: the stages of querying the data, components and concepts underlying DryadLINQ .

    2. Stages of implementation


    DryadLINQ.  Execution stages
    Illustration Source [5]

    Step 1. A user distributed application containing a LINQ expression is running. LINQ expressions are delayed (they won’t be executed until the data returned by the request is needed). DryadLINQ expressions are also performed deferred.

    Step 2. When parsing the LINQ expression, the DryadLINQ-specific trigger “ToDryadTable ()” is called. DryadLINQ intercepts this trigger (so at this stage it becomes clear that the data request will be distributed).

    Step 3DryadLINQ compiles a LINQ expression into a Dryad distributed query plan: the LINQ expression tree is expanded over subqueries, each of which represents a separate vertex in a future Dryad execution graph; there is a generation of service data necessary to run remote vertex operations, generation of executable code on the tops, serialization of the necessary data types.

    Step 4. DryadLINQ calls the application-specific Dryad Job Manager.

    Step 5. Job Manager creates an application execution graph using the plan generated in step 3.

    Step 6. Vertex programs run at the vertices defined for them.

    Step 7. At the end of the execution of the Dryad task, the result is written to the output table (s).

    Step 8Job Manager returns the result to the node that runs the DryadLINQ job and ends.

    Step 9. Control is returned to the application that initiated the execution of the DryadLINQ expression. The result of the query is a DryadTable. DryadTable implements IEnumerabletherefore, the contents of the strongly typed DryadTable collection can be accessed like regular .NET objects.

    3. DryadLINQ compiler


    The heart of the DryadLINQ query language is the DryadLINQ parallel compiler . If we draw an analogy with the world of the SQL query language, the DryadLINQ compiler can be compared with the DBMS query scheduler / optimizer.

    The compiler is responsible for compiling the DryadLINQ expression into a distributed program that runs on a Dryad cluster. The DryadLINQ compiler contains both a static component that generates an execution plan, and a dynamic component that allows you to optimize executions based on various policies, changing the execution plan directly in runtime.

    3.1. Execution plan graph


    When transferring control to the compiler, the latter transforms the LINQ-expression in the execution plan graph (Execution Plan Graph, EPG). The EPG is a prototype of a performance graph (i.e. not a final plan).

    The DryadLINQ optimizer also complements EPG metadata, which can provide additional information about a distributed task during planning and execution. So for the vertices of the graph, this is information about the data partitioning scheme , and for the edges of the graph, this is the .NET data type and the data compression scheme, if any.

    3.2. DryadLINQ Optimizations


    In turn, the DryadLINQ optimizer performs both static optimization based on greedy heuristics and dynamic optimization based on statistical information collected during execution.

    Static Optimization


    The main tasks of the static optimizer are two: minimizing the number of input-output operations on disk media and on the network. Which is logical, since traditionally the disk subsystem and inter-machine interaction interfaces are a bottleneck in distributed computing environments.

    The most interesting static optimization techniques are listed below:
    1. Pipelining (in-process interaction): the optimizer tries to maximize the localization of calculations within a single computing node, if possible;
    2. I / O reduction : the optimizer tries to use TCP-pipe and in-memory FIFO to transfer data between vertex operations instead of the default way of transferring data - writing / reading temporary files to / from the disk (Dryad data channels were discussed in detail in the previous article);
    3. Removing redundancy : the optimizer removes redundant / unnecessary hash- and range-partitioning steps.

    Dynamic optimization


    The dynamic optimizer changes the execution graph during the execution of a distributed task . Thus, based on the collected statistical data (potentially, even a specially trained model), the optimizer can override the graph. The main techniques for dynamic optimization are given below:
    Dynamic aggregation : data aggregation is one of the most effective ways to reduce the amount of data transferred between nodes. Aggregation occurs in turn at the level of the computational node, rack and cluster. Such optimization very much depends on the topological location of the node and the aggregated data, therefore it is most efficient to carry it out at runtime (i.e. dynamically).

    Data-dependent partitioning: the optimizer dynamically sets the number of partitions (partition) in the data set depending on its size of the input data set. As with Dynamic aggregation, it is precisely possible to estimate the size of the input set only during the execution of a distributed task.

    4. Practice


    Word count


    DryadLINQ offers surprisingly concise syntax for writing data queries. The following listing is a complete implementation of the calculation in accordance with the map / reduce model:

    Listing 1. Implementation of the program map / reduce model.
    public static IQueryable MapReduce(
              this IQueryable source,
              Expression>> mapper,
              Expression> keySelector,
              Expression, TResult>> reducer)
     {
         return source
              .SelectMany(mapper)
              .GroupBy(keySelector)
              .Select(reducer);
    } 

    Listing 2 demonstrates how to use the map / reduce programming model implementation presented above to create a Dryad word count task in some data source foo.pt (Partitioned Table) stored in a distributed file system.

    Listing 2. Word counting with DryadLINQ.
    const string inputPath = @"file://\\machine\directory\foo.pt";
    const string outputPath = @"file://\\machine\directory\count.pt";
    PartitionedTable inputTable = PartitionedTable.Get(inputPath);
    var result = inputTable.MapReduce(
              r => r.Line.Split(' '), // r: rows
              w => w, // w: words
              g => new Tuple(g.Key, g.Count())); // g: groups
    result.ToDryadPartitionedTable(outputPath);

    The Dryad framework generates the following execution graph for this application:

    MapReduce Execution Plan
    Source of illustration [3].

    Moreover, the execution graph in step (2) and (3) is dynamically generated based on information about the amount of data sent between the vertices and the topological location of vertex operations that processes this data.

    PageRank Calculation


    Listings 3-5 show the code for the distributed PageRank calculation algorithm.

    Listing 3. Implementation of the PageRank calculation algorithm [5].
    public static IQueryable PRStep(IQueryable pages, IQueryable ranks)
    {
         // join pages with ranks, and disperse updates
         var updates = from page in pages
              join rank in ranks on page.Name equals rank.Name
              select page.Disperse(rank);
         // re-accumulate
         return from list in updates
              from rank in list
              group rank.Rank by rank.Name into g
              select new Rank(g.Key, g.Sum());
    } 

    Listing 4. Calculating PageRank using DryadLINQ. Source [5].
    const string inputPath = @"dfs://pages.txt";
    const string outputPath = @"dfs://outputranks.txt";
    var pages = PartitionedTable.Get(inputPath);
    var ranks = pages.Select(page => new Rank(page.Name, 1.0));
    const int iterationCount = 1000;
    for (int iter = 0; iter < iterationCount; iter++)
         ranks = PRStep(pages, ranks);
    ranks.ToPartitionedTable(outputPath); 

    Listing 5. Helper classes. Source [5]
    public class Page
    {
         public Page(Int64 name, Int64 degreee, Int64[] links)
         {
              this.Name = name;
              this.Degree = degreee;
              this.Links = links;
         }
         public Int64 Name { get; set; }
         public Int64 Degree { get; set; } 
         public Int64[] Links { get; set; } 
         public Rank[] Disperse(Rank rank) 
         {
              Rank[] ranks = new Rank[Links.Length]; 
              double score = rank.Value / this.Degree; 
              for (int i = 0; i < ranks.Length; i++) 
                   ranks[i] = new Rank(this.Links[i], score); 
              return ranks; 
         } 
    } 
    public class Rank 
    { 
         public Rank(Int64 name, double rank) 
         { 
              this.Name = name; 
              this.Value = rank; 
         } 
         public Int64 Name { get; set; } 
         public double Value { get; set; }
    } 

    Data transfer between different iterations will occur via the in-memory FIFO channel , which guarantees an order of magnitude higher performance than data transfer over the network, as is the case with the implementation of a similar algorithm in Hadoop (we are talking about the latest release version of [plain] Hadoop) .
    PageRank Execution Plan
    Source of illustration [5]

    Supplement to illustration : data transfer between iterations iteration 1> iteration 2> ...> iteration n occurs exclusively through the in-memory FIFO channel.

    5. Limitations


    The Dryad framework, unlike the Hadoop MapReduce, does not mix the responsibility of executing a distributed application and a programming model / query language with which such applications can be written.

    Despite this division of responsibilities, in my opinion, the DryadLINQ software model within itself still mixes responsibilities when it takes on not only direct obligations regarding the interpretation of LINQ expressions in Dryad programs, but is also involved in the construction of EPG execution graphs and optimizations . The latter will inevitably lead to a longer start-up time for the Dryad task: more CPU cycles are spent on interpreting the DryadLINQ expression than it could have gone with fewer obligations.

    As a result, the interpretation of many DryadLINQ expressions on one computing node will have a greater negative impact on the execution time of the task both at the local level and at the level of the cluster as a whole. Although I still don’t see how the described problem can turn into the scalability problem of the Dryad-cluster as a whole.

    Another remark is related to the static optimizer, which, in order to effectively apply optimizations, needs to know too much , including about the internal "affairs" of the Dryad runtime components - the topology of websites, the data partitioning scheme.

    It’s not clear from the documentation what kind of statistics the dynamic optimizer has: the statistics of the number of input / output operations is again the internal data of the execution engine (Dryad runtime), which should not be disclosed at the level of the program model (DryadLINQ).
    DryadLINQ performs both static and dynamic optimizations. [3]
    The passage quoted above immediately raises the question: why is the task of dynamic optimization included in the DryadLINQ area of ​​responsibility? Indeed, in semantics, the dynamic optimizer works after the final interpretation of the DryadLINQ expression, that is, at the level of the runtime environment.

    6. Advantages


    Full-fledged programming language


    Development using modern high-level programming languages, LINQ model with the ability to write data queries in a functional style.

    Strong data typing


    The Dryad framework calculates strongly typed data and returns strongly typed collections of objects.

    Automatic data serialization


    Data is automatically serialized / deserialized by the framework when transmitted over channels.

    Automatic parallelization of execution


    DryadLINQ generates a distributed execution plan that runs in a cluster. Improved utilization of multiprocessor compute nodes through the use of PLINQ (Parallel LINQ) for tasks performed locally.

    Automatic performance optimization


    The runtime graph is optimized by the special Dryad framework components both at the time of creating the execution plan, using optimization policies, and dynamically at runtime, relying on statistics.

    Familiar Development Tools


    To write MPP applications using the DryadLINQ programming model, you can use MS Visual Studio, as well as VS features such as: Intellisense, code refactoring, integrated debugging, build, source code management.

    100% compatible with .NET Framework


    DryadLINQ can be used with any. NET libraries and CLS-compatible programming languages ​​with static typing.

    Conclusion


    DryadLINQ is a software model familiar to .net developers, perfectly integrated into the existing .NET Framework stack, expressive and concise, inherent to the functional style of writing programs. In addition, the DryadLINQ model provides developers with a LINQ-like syntax for writing queries to a distributed data warehouse, encapsulating the details of the distributed nature of the query, scheduling execution, and optimizing it.
    For those who are bored (or a bonus)
    In the third final part of the cycle, the Dryad framework will be compared with other MPP “tools” - relational DBMS, GPU computing, and the Hadoop platform. Therefore, ahead of us are waiting for 'fascinating' debate in the comments on the topic 'Windows is buggy' and the fall of karma.

    List of sources


    [1] The DryadLINQ Project . Microsoft Research.
    [2] M. Isard and Y. Yu. Distributed data-parallel computing using a high-level programming language . In International Conference on Management of Data (SIGMOD), 2009.
    [3] Y. Yu, M. Isard, D. Fetterly, M. Budiu, U. Erlingsson, PK Gunda, and J. Currey. DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language . In Proceedings of the 8th Symposium on Operating Systems Design and Implementation (OSDI), 2008.
    [4] Y. Yu, M. Isard, D. Fetterly, M. Budiu, U. Erlingsson, PK Gunda, J. Currey, Report MSR -TR-2008-74, Microsoft Research, 2008.
    [5] Jinyang Li.Dryad / Dryad LINQ Slides adapted from those of Yuan Yu and Michael Isard , 2009.

    Also popular now: