Distributed processing of graphs with Spark GraphX

image

"Simplicity is prerequisite for reliability" by Edsger Dijkstra

Prologue


The graphs are so visual and easy to understand the data structure, since the time of Leonard Euler forced to break the minds of mankind over heterogeneous tasks, such as how you can go through all seven bridges of Königsberg without passing through any of them twice or as a traveling mediator, to find the most advantageous route.

Since Euler, many things have changed: transistors, programming languages ​​and distributed computing have appeared. It is the latter from this list that drastically simplified the storage and processing of graphs. Actually, this is what will be discussed in this article.

If you are not familiar with basic Apache Spark concepts such as RDD, Driver program, Worker node etc., then I would recommend that you read the documentation from Databricks before continuing to read this article .

As for me, the best way to deal with any technology is to try to write something on it. In this article, we will analyze the similarity of the “social network” using the basic concepts of graph theory.

Write the code


I chose the simplest and most intuitive way to store our “social network”: tsv files on disk, of course it could be files of any other format like Parquet, Avro. The location for storing files may not be HDFS or S3, even if we need to change something, Spark SQL will do most of the work for us. The structure of the network will be as follows: the first file is a pair of Id of the user and his name, the second file of the Id of the user and a list of his peers. Apache Spark supports the following programming languages ​​Java, Scala, and Python as an API. I chose the second.

Immediately I want to answer the popular question of whether Spark GraphX ​​should be used to store graphs when you have many insert / update operations - the answer is no, all RDD change operations force you to change the whole RDD in a cluster, which is not the optimal solution, special ones are suitable for this case NoSql solution such Neo4J, Titanium or even Cassandra, Hbase. Nothing prevents you from using Spark GraphX ​​together with them specifically for processing graphs, loading the data from the database itself, for example, on a sheduler or in event driven style.

Well, let's get down to writing code. First we need to load the graph into memory, take the source files and pull out the necessary vertices and edges (the main points are shown here, you can find the link to the full listing with the source code at the end of the article):

defverts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES)
                                       .flatMap(InputDataFlow.parseNames)
defedges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH)
                                       .flatMap(InputDataFlow.makeEdges)

Pregel


The main mechanism for graph iteration in GraphX ​​is the Pregel algorithm. The algorithm is developed by Google, the Pregel model uses message transfer between vertices in a graph. Messaging through a sequence of iterations called supersteps is the main idea behind this algorithm. Also, the main idea can be described as: “think like a vertex” , that is, the state of the current vertex depends only on the state of its neighbors.

Pregel becomes extremely necessary in the case when solving a problem with an ordinary MapReduce becomes an extremely difficult process. Interestingly, the name Pregel comes from the name of the river, which covered the seven bridges of Königsberg.

The main primitive for traversing a graph is a triplet - it consists of the following components: the current vertex (a source vertex), the vertex to which we go (a destination vertex) and the edge between them (an edge connecting) - everything is clear here: how we go where we go and what way we go. Also for Pregel, you need to specify the default distance between the vertices, as a rule, this is PositiveInfinity, a UDF (user defined function) function for each vertex, to process the incoming message and calculate the next vertex, and UDF to merge two incoming messages, this function must be commutative and associative. Since Scala is a functional language, the last two functions will be represented as two lambda expressions.

When we have already disassembled the main components of Pregel, it is worthwhile to come to practice. The first algorithm that we implement will be Dijkstra's algorithm for finding the shortest path from an arbitrary vertex to all the others.

defdijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = {
    val initialGraph = graph.mapVertices((id, _) =>
      if (id == sourceId) 0.0elseDouble.PositiveInfinity)
    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
      (_, dist, newDist) => math.min(dist, newDist),
      triplet => {
        //Distance accumulatorif (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
        } else {
          Iterator.empty
        }
      },
      (a, b) => math.min(a, b)
    )
    sssp.vertices.sortByKey(ascending = true).collect.mkString("\n")
  }

Everything is obvious here: we start from a given vertex, use the minimum function to determine the minimum distance at each step. The first function used in Pregel maintains the shortest distance between the incoming message and the current vertex. The second function distributes messages to neighbors while maintaining a distance. The last function is an analogue of the Reduce stage - selects the minimum value in the case of multiple incoming messages. Then simply form a convenient output of the graph.

Degree of separation


I am sure that many readers of this article have heard about the theory of six handshakes ( Six degrees of separation ) - this is an unproved theory, according to which any two people are separated by no more than five levels of common friends, that is, a maximum of 6 handshakes are needed in order to combine two arbitrary man on earth. In terms of graph theory, it sounds like this: the diameter of the dating graph does not exceed 6 for any two people on Earth.

Let's start writing the code with the following, we will need to search for a width on the graph to search for contacts of the specified vertex, for this we need to modify the code of the Dijkstra algorithm:

defgetBFS(root: VertexId) = {
    val initialGraph = graph.mapVertices((id, _) =>
      if (id == root) 0.0elseDouble.PositiveInfinity)
    val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)(
      (_, attr, msg) => math.min(attr, msg),
      triplet => {
        if (triplet.srcAttr != Double.PositiveInfinity) {
          Iterator((triplet.dstId, triplet.srcAttr + 1))
        } else {
          Iterator.empty
        }
      },
      (a, b) => math.min(a, b)).cache()
    bfs
  }

Everything is very similar to what was higher, but we already indicate the number of iterations - for your graph this may be a different number - 10 for my graph I got it in an empirical way. Then we join with the names of users and take the first 100 values ​​for an arbitrary user:

defdegreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = {
    getBFS(root).vertices.join(verts).take(100)
  }

Now we are looking for a degree of separation from a given vertex to all the others, you can also do a degree of separation search for two arbitrary vertices:

defdegreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = {
    getBFS(firstUser)
      .vertices
      .filter { case (vertexId, _) => vertexId == secondUser }
      .collect.map { case (_, degree) => degree }
  }

Spark GraphX ​​from the box gives you the opportunity to get a lot of information about the graph, for example, to get the connected component of the graph (connected component):

defgetMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = {
    graph.degrees.join(verts)
      .sortBy({ case (_, (userName, _)) => userName }, ascending = false)
      .take(amount)
  }

Or get such a metric as the number of triangles in the graph (triangle count):

defsocialGraphTriangleCount= graph.triangleCount()

Page rank


The PageRank algorithm appeared thanks to the post-graduate students at Stanford Larry Page and Sergey Brin. For each vertex of the graph, the algorithm assigns importance among all the others. For example, if a Twitter user has a large number of subscriptions from other users, then he will have a high rating, therefore, it can be easily found in a search engine.

GraphX ​​has a static and dynamic version of the implementation of PageRank. The static version has a fixed number of iterations, while the dynamic version will work until the rating begins to converge to the specified value.

For our graph, it will look like this:

defdynamicRanks(socialGraph: SocialGraph, tolerance: Double) = 
  socialGraph.graph.pageRank(tol = tolerance).vertices
defstaticRanks(socialGraph: SocialGraph, tolerance: Double) = 
  socialGraph.graph.staticPageRank(numIter = 20).vertices

Conclusion


The attentive reader noted that the topic of this article is distributed processing of graphs, but at the time of writing the code, we did nothing so that the processing was really distributed. And here we should remember the quote of Edsger Dijkstra at the very beginning. Spark drastically simplifies our life by taking on the burden and burden of distributed computing. Writing code that will be performed on a distributed cluster becomes not such a difficult task as it might seem at the beginning. And there are even several options for cluster resource management: Hadoop YARN, Apache Mesos (personally my favorite option) and, more recently, there is support for Kubernetes. All the source code that was analyzed in this article can be found on the github .

Also popular now: