Big Data Learning: Spark MLlib

    Hi, Habr!

    image

    The last time we met with the tool the Apache the Spark , which has recently become almost the most popular means for processing large data and, in particular, Large the Scale the Learning Machine . Today we will take a closer look at the MlLib library , namely, we will show how to solve the problems of machine learning - classification, regression, clustering, as well as collaborative filtering. In addition, we will show how it is possible to investigate features in order to select and highlight new ones (the so-called Feature Engineering , which we spoke about earlier , more than once ).

    Plan


    First of all, let's see how to store the objects of our training set, how to read basic statistics of attributes, then machine learning algorithms (classification, regression, clustering), and finally, consider an example of constructing a recommendation system - the so-called collaborative filtering methods, or more precisely - one of the most common ALS algorithms.

    Vectors


    For simple "dense" vectors, there is a special class Vector.dense :

    from pyspark.mllib.linalg import Vectors
    my_vec = Vectors.dence ([1.12, 4.10, 1.5, -2.7, 3.5, 10.7, 0.7])
    

    For sparse vectors, the Vectors.sparse class is used :

    from pyspark.mllib.linalg import Vectors
    my_vec = Vectors.sparse(10, [0,2,4,9], [-1.2, 3.05, -4.08, 0.46])
    

    Here, the first argument is the number of features (vector length), then the list is followed by the numbers of nonzero features, and then the values ​​of the features themselves.

    Marked vectors


    For marked points in Spark there is a special LabeledPoint class :

    from pyspark.mllib.regression import LabeledPoint
    my_point = LabeledPoint(1.0, my_vec)
    

    Where in the LabeledPoint class we have LabeledPoint.features is any of the vectors described above, and LabeledPoint.label is, accordingly, a label that can take any real value in the case of a regression task and the value [0.0,1.0,2.0, ...] - for classification tasks

    Work with tags


    It is no secret that often, in order to build a good machine learning algorithm, just look at the signs, select from the most relevant ones or come up with new ones. For this purpose, in the spark class, the Statistics class , with which you can do all these things, for example:

    from pyspark.mllib.stat import Statistics
    summary = Statistics.colStats(features)
    # meas of features
    summary.mean
    # non zeros features
    summary.numNonzeros
    # variance
    summary.variance
    # correlations of features
    Statistics.corr(features)
    

    In addition, Spark has a huge number of additional features like sampling, generating standard features (like TF-IDF for texts), as well as such an important thing as scaling features (the reader is invited to see this in the documentation after reading this article). There is a special Scaler class for the latter :

    from pyspark.mllib.feature import StandardScaler
    scaler = StandardScaler(withMean=True, withStd=True).fit(features)
    scaler.transform (features.map(lambda x:x.toArray()))
    

    The only thing that is important to remember is that in the case of sparse vectors this does not work and the scaling strategy must be thought out for a specific task. Now we turn directly to the problems of machine learning.

    Classification and Regression


    Linear methods


    The most common methods are, as always, linear classifiers. Learning a linear classifier reduces to the problem of convex minimization of the functional of the vector of weights. The difference lies in the choice of the loss function, the regularization function, the number of iterations, and many other parameters. As an example, we consider below the logistic function of losses (and, accordingly, the so-called logistic regression method), 500 iterations, and L2 - regularization.

    import pyspark.mllib.classification as cls
    model = cls.LogisticRegressionWithSGD.train(train, iterations=500, regType="l2")
    

    Similarly, linear regression is done:

    import pyspark.mllib.regression as regr
    model = regr.RidgeRegressionWithSGD.train(train)
    

    Naive Bayes


    In this case, the learning algorithm takes as input only 2 parameters - the training sample itself and the smoothing parameter:

    from pyspark.mllib.classification import NaiveBayes
    model = NaiveBayes.train(train, 8.5)
    model.predict(test.features)
    

    Decisive trees


    In the spark, as in many other packages, regression and classification trees are implemented. The learning algorithm accepts many parameters as inputs, such as many classes, maximum tree depth. The algorithm also needs to indicate which categories have categorical features, as well as many other parameters. However, one of the most important of them when learning trees is the so-called impurity - a criterion for calculating the so-called information gain , which can usually take the following values: entropy and gini - for classification problems, variance - for regression problems. For example, consider a binary classification with the parameters defined below:

    from pyspark.mllib.tree import DecisionTree
    model = DecisionTree.trainClassifier(train, numClasses=2, impurity='gini', maxDepth=5)
    model.predict(test.map(lambda x: x.features))
    

    Random forest


    Random forests, as you know, is one of the universal algorithms and one would expect that they will be implemented in this tool. They use the trees described above. Here, in the same way, there are trainClassifier and trainRegression methods - for training the classifier and regression function, respectively. One of the most important parameters is - the number of trees in the forest, impurity already known to us , as well as featureSubsetStrategy - the number of attributes that are considered when breaking on the next tree node (for more details on the values, see the documentation). Accordingly, below is an example of binary classification using 50 trees:

    from pyspark.mllib.tree import RandomForest
    model = RandomForest.trainClassifier(train, numClasses=2, numTrees=50, featureSubsetStrategy="auto", impurity='gini', maxDepth=20, seed=12)
    model.predict(test.map(lambda x:x.features))
    

    Clustering


    As elsewhere, the spark implements the well-known KMeans algorithm , the training of which takes directly the dataset, the number of clusters, the number of iterations, as well as the strategy for selecting the initial cluster centers (the initializationMode parameter , which defaults to k-means , can also take random value ):

    from pyspark.mllib.clustering import KMeans
    clusters = KMeans.train(features, 3, maxIterations=100, runs=5, initializationMode="random")
    clusters.predict(x.features))
    

    Collaborative filtering


    Given that the best-known example of using Big Data is a recommender system, it would be strange if the simplest algorithms were not implemented in many packages. This also applies to Spark. It implements the ALS algorithm (Alternative Least Square) - perhaps one of the most famous collaborative filtering algorithms. The description of the algorithm itself deserves a separate article. Here we just say in a nutshell that the algorithm is actually decomposing the feedback matrix (the rows of which are users and the columns are products) - into product matrices - topic and topic-user , where topics are some hidden variables, the meaning of which is often not clear (all the charm of the ALS algorithmjust to find the topics themselves and their values). The essence of these topics is that each user and each film is now characterized by a set of features, and the scalar product of these vectors is the rating of the film of a particular user. The training sample for this algorithm is set in the form of a table userID -> productID -> rating . After that, the model is trained using ALS (which, like other algorithms, takes many parameters as inputs, which the reader is invited to read about):

    from pyspark.mllib.recommendation import ALS
    model = ALS.train (ratings, 20, 60)
    predictions = model.predictAll(ratings.map (lambda x: (x[0],x[1])))
    

    Conclusion


    So, we briefly reviewed the MlLib library from the Apache Spark framework, which was developed for distributed processing of big data. Recall that the main advantage of this tool, as discussed earlier , is that data can be cached in RAM, which can significantly speed up calculations in the case of iterative algorithms, which are the majority of machine learning algorithms.

    Also popular now: