Piping with spark.ml

  • Tutorial
Today I would like to talk about a new package that appeared in version 1.2, called spark.ml. It is designed to provide a single, high-level API for machine learning algorithms that will help simplify the creation and configuration, as well as combining several algorithms into a single pipeline or workflow. Right now we have version 1.4.1 in the yard, and the developers claim that the package came out of alpha, although many components are still labeled Experimental or DeveloperApi.

Well, let's check what a new package can do and how good it is. First of all, we need to get acquainted with the basic concepts introduced in spark.ml.

1. the ML the Dataset - spark.ml uses for data DataFramefrom spark.sql package. A DataFrame is a distributed collection in which data is stored as named columns. Conceptually, a DataFrame is equivalent to a table in a relational database or a data type such as frame in R or Python, but with richer optimizations under the hood. (Examples and methods of work will be given later in the article).

2. Transformer (modifier) ​​is simply any algorithm that can convert one DataFrame to another. For example: any trained model is a modifier, because it converts a set of features (features) into prediction

3. Estimator(evaluation algorithm) is an algorithm that can perform a conversion from a DataFrame to a Transformer. For example, any learning algorithm is also an evaluation algorithm, because it takes a set of data for training and creates a trained model at the output.

4. Pipeline - a pipeline that combines any number of modifiers and evaluation algorithms to create a machine learning workflow.

5. Param is a general type that modifiers and evaluation algorithms use to set parameters.

According to the described interface, each Estimator must have a fit method that accepts a DataFrame and returns a Transformer. Transformer, in turn, must have a transform method that converts one DataFrame to another.

I knowScalable Machine Learning in one of the laboratory works, teachers, talking about linear regression, solved the problem of “determining the year a song was created by a set of audio characteristics”. It has implemented quite a lot of methods both for data processing and for evaluating and finding the best model. This was done in order to familiarize students with the main processes in machine learning in more detail, but let's check how spark.ml package makes our life easier.

In laboratory work, we were provided with prepared and slightly cropped data. But since we are interested in going all the way, I suggest taking a raw data set . Each line of the form:
2007, 45.17809 46.34234 -40.65357 -2.47909 1.21253 -0.65302 -6.95536 -12.20040 17.02512 2.00002 -1.87785 9.85499 25.59837 1905.18577 3676.09074 1976.85531 913.11216 1957.52415 955.98525 942.72667 439.85991 591.66138 493.40770 496.38516 33.94285 -255.90134 -762.28079 -66.10935 -128.02217 198.12908 -34.44957 176.00397 -140.80069 -22.56380 12.77945 193.30164 314.20949 576.29519 -429.58643 -72.20157 59.59139 -5.12110 -182.15958 31.80120 -10.67380 -8.13459 -122.96813 208.69408 -138.66307 119.52244 -17.48938 75.58779 93.29243 85.83507 47.13972 312.85482 135.50478 -32.47886 49.67063 -214.73180 -77.83503 -47.26902 7.58366 -352.56581 -36.15655 -53.39933 -98.60417 -82.37799 45.81588 -16.91676 18.35888 -315.68965 -3.14554 125.45269 -130.18808 -3.06337 42.26602 -9.04929 26.41570 23.36165 -4.36742 -87.55285 -70.79677 76.57355 -7.71727 3.26926 -298.49845 11.49326 -89.21804 -15.09719
where the year goes first, then 12 numbers are the average tones, and the last 78 are the covariances of the tones.

First of all, we need to pull this data into a DataFrame, but first we will slightly transform the data format:
  val sc = new SparkContext("local[*]", "YearPrediction")
  val rawData: RDD[(Double, linalg.Vector, linalg.Vector)] = sc.textFile("data/YearPredictionMSD.txt")
    .map(_.split(','))
    .map(x => (
      x.head.toDouble,
      Vectors.dense(x.tail.take(12).map(_.toDouble)),
      Vectors.dense(x.takeRight(78).map(_.toDouble))
    ))

Now each RDD element is a tuple containing a year and two vectors of characteristics. To get a DataFrame, you need to perform one more transformation:
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  val rawDF: DataFrame = labeledPointsRDD.toDF("label", "avg", "cov")

Please note that we created sqlContext and pulled up the implicit conversion methods (in this case, you could write ) to use the toDF method. We also specified the column names, and now the data structure will look like this:import sqlContext.implicits.rddToDataFrameHolder

  label | avg                                     | cov
 -------|-----------------------------------------|---------------------------------------------
  2001  | [49.94357 21.47114 73.07750 8.74861...  | [10.20556 611.10913 951.08960 698.11428...
 -------|-----------------------------------------|---------------------------------------------
  2007  | [50.57546 33.17843 50.53517 11.5521...  | [44.38997 2056.93836 605.40696 457.4117...

The gradient method, which is used in linear regression, is sensitive to the scatter of characteristic values, so the data must be normalized or standardized before training. For these purposes, the spark.ml.feature package has two classes: StandardScaler and Normalizer.
  import org.apache.spark.ml.feature.{Normalizer, StandardScalerModel, StandardScaler}
  val scalerAvg: StandardScalerModel = new StandardScaler()
    .setWithMean(true)
    .setWithStd(true)
    .setInputCol("avg")
    .setOutputCol("features")
    // скармливаем наши сырые данные, чтобы алгоритм смог
    // посчитать статистику (среднее значение и стандартное отклонение)
    .fit(rawDF)
  val normAvg: Normalizer = new Normalizer()
    .setP(2.0)
    .setInputCol("avg")
    .setOutputCol("features")

Note that StandardScaler is an Estimator, which means we need to call the fit method to get the Transformer, in this case StandardScalerModel. All classes working with a DataFrame have two common methods:
setInputCol - specify the name of the column from which to read the data
setOutputCol - specify the name of the column into which the converted data should be written.

The differences as a result of the work of these classes in this case will be that scaler will return data in the range from -1 to 1, and Normalizer in the range from 0 to 1. More about the operation algorithms can be found here and here .

We prepared the training sample (or rather received the modifiers that we will use for data processing), now we need to create an evaluation algorithm (Estimator), which at the output will give us a trained model. We set almost standard settings, at this stage they are not particularly interesting.
  import org.apache.spark.ml.regression.LinearRegression
  val linReg = new LinearRegression()
    .setFeaturesCol("features")
    .setLabelCol("label")
    .setElasticNetParam(0.5)
    .setMaxIter(500)
    .setRegParam(1e-10)
    .setTol(1e-6)

Now we have everything we need to assemble a simple conveyor:
  import org.apache.spark.ml.Pipeline
  val pipeline = new Pipeline().setStages(Array(
    normAvg,
    linReg
  ))

Pipeline has a setStages method that accepts an array of steps that will be performed in the specified order when the training sample arrives. Now, all that remains for us is to remember to divide the data into the training and test samples:
  val splitedData = rawDF.randomSplit(Array(0.8, 0.2), 42).map(_.cache())
  val trainData = splitedData(0)
  val testData = splitedData(1)

Let's run the pipeline we created and evaluate the result of its work:
  val pipelineModel = pipeline.fit(trainData)
  val fullPredictions = pipelineModel.transform(testData)
  val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
  val labels = fullPredictions.select("label").map(_.getDouble(0))
  val rmseTest = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError
  > (2003.0,1999.6153819348176)
    (1997.0,2000.9207184703566)
    (1996.0,2000.4171327880172)
    (1997.0,2002.022142263423)
    (2000.0,1997.6327888556184)
  RMSE: 10,552024

At this stage, everything should already be clear, note that to evaluate the model, we used the ready-made RegressionMetrics class in which, along with the familiar RMSE estimate, other basic estimates are also implemented.

Moving on: in the Scalable Machine Learning course, we created new characteristics by converting the source into a polynomial with degree 2. The spark.ml developers took care of this too: now we just need to create another modifier and add it to the pipeline; the main thing is not to get confused in this process and correctly indicate the name of the columns.
  import org.apache.spark.ml.feature.PolynomialExpansion
  // Создаём модификатор, который возьмёт данные из колонки "features" и созданный полином добавит в колонку "polyFeatures"
  val polynomAvg = new PolynomialExpansion()
    .setInputCol("features")
    .setOutputCol("polyFeatures")
    .setDegree(2)
  // Указываем алгоритму оценки из какой колонки брать характеристики
  linReg.setFeaturesCol("polyFeatures")
  // И добавляем новый модификатор в конвейер
  val pipeline = new Pipeline().setStages(Array(
    normAvg,
    polynomAvg,
    linReg
  ))


So far, we have used only 12 characteristics for training, but I remember there were 78 more in the raw data, maybe we will try to combine them? And for this case spark.ml has a VectorAssembler solution . Once you have decided, let's do:
  import org.apache.spark.ml.feature.VectorAssembler
  val assembler = new VectorAssembler()
    .setInputCols(Array("avg", "cov"))
    .setOutputCol("united")
  normAvg.setInputCol("united")
  val pipeline = new Pipeline().setStages(Array(
    assembler,
    normAvg,
    polynomAvg,
    linReg
  ))

We sorted out the data preparation a bit, but the question remained of selecting the optimal parameters for the algorithm, I really do not want to do it manually, and don’t! For this purpose, spark.ml implements the CrossValidator class .. CrossValidator accepts an evaluation algorithm (in our case it is linReg), a set of parameters that we would like to test and an evaluation tool (when we evaluated the model manually, we used RMSE). CrossValidator begins its work by splitting the data set into several samples (k by default 3), randomly choosing a training and validation sample (the validation sample will be 1 / k in size from the original). Then, for each set of parameters on each of the samples the model will be trained, its effectiveness will be evaluated and the best model selected. It should be noted that choosing a model through CrossValidator is a rather time-consuming operation, but is statistically more reasonable than heuristic manual selection.

For convenience, creating a set of parameters in spark.ml there is a utility class ParamGridBuilder, which we use:
  import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
  val paramGrid: Array[ParamMap] = new ParamGridBuilder()
    .addGrid(linReg.maxIter, Array(5, 500))
    .addGrid(linReg.regParam, Array(1e-15, 1e-10))
    .addGrid(linReg.tol, Array(1e-9, 1e-6, 1e-3))
    .addGrid(linReg.elasticNetParam, Array(0, 0.5, 1))
    .build()
  val crossVal = new CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(new RegressionEvaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3)  
  val bestModel = crossVal.fit(trainData) 
  > Best set of parameters:
    {
	  linReg_3a964d0300fd-elasticNetParam: 0.5,
	  linReg_3a964d0300fd-maxIter: 500,
	  linReg_3a964d0300fd-regParam: 1.0E-15,
	  linReg_3a964d0300fd-tol: 1.0E-9
    }
    Best cross-validation metric: -10.47433119891316 

Well, that’s probably all that concerns linear regression, for classification and clustering algorithms in spark.ml there is also a set of solutions ready to help conveniently organize the workflow.

Materials Used: UCI Machine Learning Repository Scalable Machine Learning White
Paper


Also popular now: