Data analysis on Scala. Consider 21st Century Correlation


    It is very important to choose the right tool for data analysis. The Kaggle.com forums , which host international Data Science competitions, are often asked which tool is best. The first lines of popularity are occupied by R and Python. In this article, we will talk about an alternative stack of data analysis technologies based on the Scala programming language and the Spark distributed computing platform .

    How did we come to this? At Retail Rocket, we do a lot of machine learning on very large datasets. Previously, we used the bundle IPython + Pyhs2 (hive driver for Python) + Pandas + Sklearn to develop prototypes. At the end of the summer of 2014, they made a fundamental decision to switch to Spark, as experiments have shown that we will get a 3-4-fold increase in performance on the same server fleet.

    Another plus - we can use one programming language for modeling and code that will work on battle servers. This was a big advantage for us, since before that we used 4 languages ​​at the same time: Hive, Pig, Java, Python, for a small team this is a serious problem.

    Spark is good at working with Python / Scala / Java through the API. We decided to choose Scala, since it is Spark that is written on it, that is, you can analyze its source code and correct errors if necessary, plus it is the JVM on which the entire Hadoop runs. Analysis of forums on programming languages ​​for Spark led to the following:

    Scala:
    + functional;
    + native to Spark;
    + works on JVM, which means native to Hadoop;
    + strict static typing;
    - A rather complicated entry, but the code is readable.

    Python:
    + popular;
    + simple;
    - dynamic typing;
    - Performance is worse than Scala.

    Java:
    + popularity;
    + native to Hadoop;
    - too much code.

    You can read more about choosing a programming language for Spark here .

    I must say that the choice was not easy, since no one on the team at that time knew Scala.
    Known fact: in order to learn to communicate well in a language, you need to immerse yourself in the language environment and use it as often as possible. Therefore, for modeling and quick data analysis, we abandoned the Python stack in favor of Scala.

    First of all, it was necessary to find a replacement for IPython, the options were as follows:
    1) Zeppelin - an IPython-like notebook for Spark;
    2) ISpark;
    3) Spark Notebook;
    4) Spark IPython Notebook from IBM .

    While the choice fell on ISpark, since it is simple, it is IPython for Scala / Spark, it was relatively easy to fasten the HighCharts and R graphs. And we had no problems connecting it to the Yarn cluster.

    Our story about the Scala data mining environment consists of three parts:
    1) A simple task on Scala in ISpark, which will be performed locally on Spark.
    2) Configuring and installing components to work in ISpark.
    3) We write Machine Learning task in Scala, using libraries R.
    And if this article is popular, I will write two others. ;)

    Task


    Let's try to answer the question: does the average purchase receipt in the online store depend on the static parameters of the client, which include the locality, browser type (mobile / Desktop), operating system and browser version? This can be done with the help of "mutual information» (Mutual's Information Part).

    In Retail Rocket, we use entropy for our recommendation algorithms and analysis in many places: the classical Shannon formula, the Kullback-Leibler divergence, and mutual information. We even applied for a report at the RecSys conference on this topic. These measures are dedicated to a separate, albeit small, section in Murphy's famous machine learning textbook.

    Let's analyze on real data Retail Rocket . Previously, I copied a sample from our cluster to my computer as a csv file.

    Data loading


    Here we use ISpark and Spark, launched in local mode, that is, all calculations occur locally, the distribution goes through the cores. Actually, everything is written in the comments. Most importantly, at the output we get an RDD (Spark data structure), which is a collection of case classes of the Row type, which is defined in the code. This will allow access to the fields through ".", For example _.categoryId.

    At the entrance:
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql._
    import org.tribbloid.ispark.display.dsl._
    import scala.util.Try
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    // Объявляем CASE class, он нам понадобится для dataframe
    case class Row(categoryId: Long, orderId: String ,cityId: String, osName: String,
                   osFamily: String, uaType: String, uaName: String,aov: Double)
    // читаем файл в переменную val с помощью sc (Spark Context), его объявляет Ipython заранее   
    val aov = sc.textFile("file:///Users/rzykov/Downloads/AOVC.csv")
    // парсим поля
    val dataAov = aov.flatMap { line => Try { line.split(",") match {
        case Array(categoryId, orderId, cityId, osName, osFamily, uaType, uaName, aov) =>
            Row(categoryId.toLong + 100, orderId, cityId, osName, osFamily, osFamily, uaType, aov.toDouble)
        } }.toOption }
    

    At the exit:
    MapPartitionsRDD[4] at map at :28

    Now let's look at the data itself:

    This line uses the new DataFrame data type, added to Spark in version 1.3.0, it is very similar to the similar structure in the pandas library in Python. toDf picks up our Row case-class, thanks to which it gets the names of the fields and their types.

    For further analysis, you need to select any one category, preferably with a lot of data. To do this, you need to get a list of the most popular categories.

    At the entrance:
    //Наиболее популярная категория
    dataAov.map { x => x.categoryId } // выбираем поле categoryId
        .countByValue()  // рассчитываем частоту появления каждой categoryId
        .toSeq
        .sortBy( - _._2) // делаем сортировку по частоте по убыванию
        .take(10) // берем ТОП 10 записей

    At the output, we got an array of tuples (tuple) in the format (categoryId, frequency):
    ArrayBuffer((314,3068), (132,2229), (128,1770), (270,1483), (139,1379), (107,1366), (177,1311), (226,1268), (103,1259), (127,1204))

    For further work, I decided to choose the 128th category.

    Prepare the data: we filter out the necessary types of operating systems so as not to clog the graphics with garbage.

    At the entrance:
    val interestedBrowsers = List("Android", "OS X", "iOS", "Linux", "Windows")
    val osAov = dataAov.filter(x => interestedBrowsers.contains(x.osFamily)) //оставляем только нужные ОС
        .filter(_.categoryId == 128) // фильтруем категории
        .map(x => (x.osFamily, (x.aov, 1.0))) // нужно для расчета среднего чека
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
        .map{ case(osFamily, (revenue, orders)) => (osFamily, revenue/orders) }
        .collect()

    The output is an array of tuples in the OS format, average check:
    Array((OS X,4859.827586206897), (Linux,3730.4347826086955), (iOS,3964.6153846153848), (Android,3670.8474576271187), (Windows,3261.030993042378))

    I want visualization, let's do it in HighCharts:

    Theoretically, you can use any HighCharts charts if they are supported in Wisp . All graphics are interactive.

    Let's try to do the same, but through R.
    Run the R client:
    import org.ddahl.rscala._
    import ru.retailrocket.ispark._
    def connect() = RClient("R", false)
    @transient
    val r = connect()

    Building the graph itself:

    So you can build any R graphs right in IPython notepad.

    Mutual information


    The graphs show that there is a dependence, but will the metrics confirm this conclusion? There are many ways to do this. In our case, we use mutual information ( Mutual Information ) between the values ​​in the table. It measures the mutual dependence between the distributions of two random (discrete) quantities.

    For discrete distributions, it is calculated by the formula:



    But we are interested in a more practical metric: Maximal Information Coefficient (MIC), for the calculation of which for continuous variables you have to go to tricks. This is how the definition of this parameter sounds.

    Let D = (x, y) be a set of n ordered pairs of elements of random variables X and Y. This two-dimensional space is divided by X and Y grids, grouping the values ​​of x and y in X and Y of the partition, respectively (remember the histograms!).



    where B (n) is the grid size, I ∗ (D, X, Y) is the mutual information on the partition of X and Y. The denominator indicates the logarithm that serves to normalize MIC to the values ​​of the interval [0, 1]. MIC takes continuous values ​​in the interval [0,1]: for extreme values ​​it is 1 if there is a dependence, 0 if it is not. What else can be read on this topic is listed at the end of the article, in the list of references.

    In the book MIC(mutual information) is called 21st century correlation. And that's why! The graph below shows 6 dependencies (graphs C - H). For them, the correlation of Pearson and MIC was calculated, they are marked with the corresponding letters in the graph on the left. As we can see, the Pearson correlation is almost zero, while the MIC shows the dependence (graphs F, G, E).

    Original source: people.cs.ubc.ca

    The table below shows a number of metrics that were calculated on different dependencies: random, linear, cubic, etc. The table shows that the MIC behaves very well, detecting non-linear dependencies:


    Another interesting graph illustrates the effect of noise on the MIC:


    In our case, we are dealing with the calculation of MIC, when the Aov variable is continuous, and all the others are discrete with unordered values, for example, the type of browser. For the correct calculation of the MIC, you need to sample Aov. We will use a ready-made solution from exploredata.net . There is one problem with this solution: it considers both variables to be continuous and expressed in Float values. Therefore, we will have to deceive the code by encoding the values ​​of discrete quantities in Float and randomly changing the order of these quantities. To do this, you will have to do many iterations with a random order (we will do 100), and as a result we take the maximum MIC value.
    import data.VarPairData
    import mine.core.MineParameters
    import analysis.Analysis
    import analysis.results.BriefResult
    import  scala.util.Random 
    //Кодируем дискретную величину, случайно изменяя порядок "кодов"
    def encode(col: Array[String]): Array[Double] = {
        val ns = scala.util.Random.shuffle(1 to col.toSet.size)
        val encMap = col.toSet.zip(ns).toMap
        col.map{encMap(_).toDouble}
    }
    // функция вычисления MIC
    def mic(x: Array[Double], y: Array[Double]) = {
        val data = new VarPairData(x.map(_.toFloat), y.map(_.toFloat))
        val params = new MineParameters(0.6.toFloat, 15, 0, null)
        val res = Analysis.getResult(classOf[BriefResult], data, params)
        res.getMIC
    }
    //в случае дискретной величины делаем много итераций и берем максимум
    def micMax(x: Array[Double], y: Array[Double], n: Int = 100) = 
        (for{ i <- 1 to 100} yield mic(x, y)).max 

    Well, we are close to the final, now we will carry out the calculation itself:
    val aov = dataAov.filter(x => interestedBrowsers.contains(x.osFamily)) //оставляем только нужные ОС
        .filter(_.categoryId == 128) // фильтруем категории
    //osFamily
    var aovMic = aov.map(x => (x.osFamily, x.aov)).collect()
    println("osFamily MIC =" + micMax(encode(aovMic.map(_._1)),  aovMic.map(_._2)))
    //orderId
    aovMic = aov.map(x => (x.orderId, x.aov)).collect()
    println("orderId MIC =" + micMax(encode(aovMic.map(_._1)),  aovMic.map(_._2)))
    //cityId
    aovMic = aov.map(x => (x.cityId, x.aov)).collect()
    println("cityId MIC =" + micMax(encode(aovMic.map(_._1)),  aovMic.map(_._2)))
    //uaName
    aovMic = aov.map(x => (x.uaName, x.aov)).collect()
    println("uaName MIC =" + mic(encode(aovMic.map(_._1)),  aovMic.map(_._2)))
    //aov
    println("aov MIC =" + micMax(aovMic.map(_._2),  aovMic.map(_._2)))
    //random
    println("random MIC =" + mic(aovMic.map(_ => math.random*100.0),  aovMic.map(_._2)))

    At the exit:
    osFamily MIC =0.06658
    orderId MIC =0.10074
    cityId MIC =0.07281
    aov MIC =0.99999
    uaName MIC =0.05297
    random MIC =0.10599

    For the experiment, I added a random variable with a uniform distribution and the AOV itself.
    As we can see, almost all MICs were below a random value (random MIC), which can be considered a “conditional" decision threshold. Aov MIC is almost unity, which is natural, since the correlation to itself is equal to 1. An

    interesting question arises: why do we see the dependence on the graphs, and the MIC is zero? You can come up with a lot of hypotheses, but most likely for the os Family case, everything is quite simple - the number of Windows machines far exceeds the number of others:


    Conclusion


    Hopefully Scala will gain its popularity with Data Scientists. This is very convenient, since it is possible to work with the standard IPython notebook + to get all the features of Spark. This code can easily work with terabyte data arrays, for this you just need to change the configuration line in ISpark, specifying the URI of your cluster.

    By the way, we have open vacancies in this area:


    Useful links:
    Scientific article on the basis of which MIC was developed .
    A note on KDnuggets about mutual information (there is a video).
    C library for calculating MIC with wrappers for Python and MATLAB / OCTAVE .
    The site of the author of the scientific article , which was developed by MIC ( the site has a module for R and a library in Java).

    Also popular now: