R and Big Data: Using Replyr

Original author: John Mount
  • Transfer
replyr- short for RE mote PLY ing of big data for R (remote processing of big data in R).

Why is it worth a try replyr? Because it allows you to apply standard working approaches to remote data (databases or Spark ).

You can work in the same way as with the local one data.frame. replyrprovides such opportunities:

  • Data Aggregation: replyr_summary().
  • Joining Tables: replyr_union_all().
  • Linking tables in rows: replyr_bind_rows().
  • Using the functions of separation, union, combination ( dplyr::do()): replyr_split(), replyr::gapply().
  • Aggregation / Distribution: replyr_moveValuesToRows()/ replyr_moveValuesToColumns().
  • Track intermediate results.
  • Association controller.

Most likely, you do all this with the data locally, so these features will make working with Sparkand sparklyrmuch easier.

replyr- a product of the collective experience of using R in applied solutions for many customers, collecting feedback and correcting deficiencies.

Examples are below.

Everything is changing rapidly now, so we will use the versions of packages in development for examples.

base::date()
## [1] "Thu Jul  6 15:56:28 2017"
# devtools::install_github('rstudio/sparklyr')
# devtools::install_github('tidyverse/dplyr')
# devtools::install_github('tidyverse/dbplyr')
# install.packages("replyr")
suppressPackageStartupMessages(library("dplyr"))
packageVersion("dplyr")
## [1] '0.7.1.9000'
packageVersion("dbplyr")
## [1] '1.1.0.9000'
library("tidyr")
packageVersion("tidyr")
## [1] '0.6.3'
library("replyr")
packageVersion("replyr")
## [1] '0.4.2'
suppressPackageStartupMessages(library("sparklyr"))
packageVersion("sparklyr")
## [1] '0.5.6.9012'
# больше памяти, чем предполагается в https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.1.0', 
                              hadoop_version = '2.7',
                              master = "local",
                              config = config)

Summary


Standard summary()and glance()that cannot be performed on Spark.

mtcars_spark <- copy_to(sc, mtcars)
# резюме обработки, а не данных
summary(mtcars_spark)
##     Length Class          Mode
## src 1      src_spark      list
## ops 2      op_base_remote list
packageVersion("broom")
## [1] '0.4.2'
broom::glance(mtcars_spark)
## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl

replyr_summary works.

replyr_summary(mtcars_spark) %>%
  select(-lexmin, -lexmax, -nunique, -index)
##    column   class nrows nna    min     max       mean          sd
## 1     mpg numeric    32   0 10.400  33.900  20.090625   6.0269481
## 2     cyl numeric    32   0  4.000   8.000   6.187500   1.7859216
## 3    disp numeric    32   0 71.100 472.000 230.721875 123.9386938
## 4      hp numeric    32   0 52.000 335.000 146.687500  68.5628685
## 5    drat numeric    32   0  2.760   4.930   3.596563   0.5346787
## 6      wt numeric    32   0  1.513   5.424   3.217250   0.9784574
## 7    qsec numeric    32   0 14.500  22.900  17.848750   1.7869432
## 8      vs numeric    32   0  0.000   1.000   0.437500   0.5040161
## 9      am numeric    32   0  0.000   1.000   0.406250   0.4989909
## 10   gear numeric    32   0  3.000   5.000   3.687500   0.7378041
## 11   carb numeric    32   0  1.000   8.000   2.812500   1.6152000

Aggregation / Distribution


tidyr It works mainly with local data.

mtcars2 <- mtcars %>%
  mutate(car = row.names(mtcars)) %>%
  copy_to(sc, ., 'mtcars2')
# ошибки
mtcars2 %>% 
  tidyr::gather('fact', 'value')
## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
mtcars2 %>%
  replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', 
                          nameForNewValueColumn= 'value', 
                          columnsToTakeFrom= colnames(mtcars),
                          nameForNewClassColumn= 'class') %>%
  arrange(car, fact)
## # Source:     lazy query [?? x 4]
## # Database:   spark_connection
## # Ordered by: car, fact
##            car  fact  value   class
##                
##  1 AMC Javelin    am   0.00 numeric
##  2 AMC Javelin  carb   2.00 numeric
##  3 AMC Javelin   cyl   8.00 numeric
##  4 AMC Javelin  disp 304.00 numeric
##  5 AMC Javelin  drat   3.15 numeric
##  6 AMC Javelin  gear   3.00 numeric
##  7 AMC Javelin    hp 150.00 numeric
##  8 AMC Javelin   mpg  15.20 numeric
##  9 AMC Javelin  qsec  17.30 numeric
## 10 AMC Javelin    vs   0.00 numeric
## # ... with 342 more rows

Line Binding


dplyr bind_rows, unionand are union_allnow not applicable in Spark. replyr::replyr_union_all()and replyr::replyr_bind_rows()- a workable alternative.

bind_rows ()


db1 <- copy_to(sc, 
               data.frame(x=1:2, y=c('a','b'), 
                          stringsAsFactors=FALSE),
               name='db1')
db2 <- copy_to(sc, 
               data.frame(y=c('c','d'), x=3:4, 
                          stringsAsFactors=FALSE),
               name='db2')
# Ошибки из-за попытки осуществить операцию над обработчиком, а не данными
bind_rows(list(db1, db2))
## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl

union_all


# игнорирует названия столбцов и приводит все данные к строкам
union_all(db1, db2)
## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     1     a
## 2     2     b
## 3     3     c
## 4     4     d

union


# игнорирует названия столбцов и приводит все данные к строкам
# скорее всего, также потеряет дублирующиеся строки
union(db1, db2)
## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     4     d
## 2     1     a
## 3     3     c
## 4     2     b

replyr_bind_rows


replyr::replyr_bind_rowscan bind together a few data.frames.

replyr_bind_rows(list(db1, db2))
## # Source:   table [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     1     a
## 2     2     b
## 3     3     c
## 4     4     d

dplyr :: do


In our example, just take a few rows from each group of the aggregated dataset. Please note: since we do not explicitly specify the order using arrange, we cannot always expect the results to match in different data sources (DB or Spark).

dplyr::do on local data


From help('do', package='dplyr'):

by_cyl <- group_by(mtcars, cyl)
do(by_cyl, head(., 2))
## # A tibble: 6 x 11
## # Groups:   cyl [3]
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 2  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 3  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 4  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

dplyr::do on the Spark


by_cyl <- group_by(mtcars_spark, cyl)
do(by_cyl, head(., 2))
## # A tibble: 3 x 2
##     cyl     V2
##    
## 1     6 
## 2     4 
## 3     8 

We get not quite what can be used.

replyr split / merge


mtcars_spark %>%
  replyr_split('cyl', 
               partitionMethod = 'extract') %>%
  lapply(function(di) head(di, 2)) %>%
  replyr_bind_rows()
## # Source:   table [?? x 11]
## # Database: spark_connection
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 2  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 3  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 4  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

replyr gapply


mtcars_spark %>%
  gapply('cyl',
         partitionMethod = 'extract',
         function(di) head(di, 2))
## # Source:   table [?? x 11]
## # Database: spark_connection
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 2  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 3  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 4  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

replyr::replyr_apply_f_mapped


What I would like to receive: data with names corresponding to the code (i.e. change the data, not the code).

By some thought, this can be achieved if the change in data is associated with the environment of the function, and not with the data. That is, the data is changed until the corresponding monitoring function is performed. In our case, this function is replyr::replyr_apply_f_mapped(), and it works as follows.

Suppose that the operation we want to use is a downgrade function obtained from some source that is not under our control (for example, a package). It may be a simple function (as below), but suppose we want to use it unchanged (excluding even very small ones, as an introduction wrapr::let()).

# внешняя функция с заданными в явном виде названиями столбцов
DecreaseRankColumnByOne <- function(d) {
  d$RankColumn <- d$RankColumn - 1
  d
}

To apply this function to d(in which the column names are not as expected!), We use replyr::replyr_apply_f_mapped()to create a new parameterized adapter:

# наши данные
d <- data.frame(Sepal_Length = c(5.8,5.7),
                Sepal_Width = c(4.0,4.4),
                Species = 'setosa',
                rank = c(1,2))
# обработчик для ввода параметров
DecreaseRankColumnByOneNamed <- function(d, ColName) {
  replyr::replyr_apply_f_mapped(d, 
                                f = DecreaseRankColumnByOne, 
                                nmap = c(RankColumn = ColName),
                                restrictMapIn = FALSE, 
                                restrictMapOut = FALSE)
}
# использование
dF <- DecreaseRankColumnByOneNamed(d, 'rank')
print(dF)
##   Sepal_Length Sepal_Width Species rank
## 1          5.8         4.0  setosa    0
## 2          5.7         4.4  setosa    1

replyr::replyr_apply_f_mapped()renames the columns as expected in DecreaseRankColumnByOne(match specified in nmap), applies DecreaseRankColumnByOneand returns the names to the original ones before returning the result.

Tracking subtotals


Many of the tasks Sparklyrinvolved in creating intermediate or temporary tables. This can be done with dplyr::copy_to()and dplyr::compute(). These methods can be resource intensive.

There replyrare functions that allow you to keep the process under control: temporary name generators that do not change the data itself (they are also used inside the package itself).

The function itself is pretty simple:

print(replyr::makeTempNameGenerator)
## function (prefix, suffix = NULL) 
## {
##     force(prefix)
##     if ((length(prefix) != 1) || (!is.character(prefix))) {
##         stop("repyr::makeTempNameGenerator prefix must be a string")
##     }
##     if (is.null(suffix)) {
##         alphabet <- c(letters, toupper(letters), as.character(0:9))
##         suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), 
##             collapse = "")
##     }
##     count <- 0
##     nameList <- list()
##     function(..., peek = FALSE, dumpList = FALSE, remove = NULL) {
##         if (length(list(...)) > 0) {
##             stop("replyr::makeTempNameGenerator tempname generate unexpected argument")
##         }
##         if (peek) {
##             return(names(nameList))
##         }
##         if (dumpList) {
##             v <- names(nameList)
##             nameList <<- list()
##             return(v)
##         }
##         if (!is.null(remove)) {
##             victims <- intersect(remove, names(nameList))
##             nameList[victims] <<- NULL
##             return(victims)
##         }
##         nm <- paste(prefix, suffix, sprintf("%010d", count), 
##             sep = "_")
##         nameList[[nm]] <<- 1
##         count <<- count + 1
##         nm
##     }
## }
## 
## 

For example, to join several tables, a good solution for some data sources is to call computeafter each join (otherwise, the resulting SQL can become long and difficult to understand and maintain). The code looks something like this:

# создание данных для примера
names <- paste('table', 1:5, sep='_')
tables <- lapply(names, 
                 function(ni) {
                   di <- data.frame(key= 1:3)
                   di[[paste('val',ni,sep='_')]] <- runif(nrow(di))
                   copy_to(sc, di, ni)
                 })
# собственный генератор временных имён
tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP')
# объединение слева таблиц в последовательности
joined <- tables[[1]]
for(i in seq(2,length(tables))) {
  ti <- tables[[i]]
  if(i

Аккуратное введение и управление временными данными может сохранить ресурсы (и время, и место) и значительно улучшить результаты. Нам кажется хорошей практикой задавать в явном виде генератор временных имён, передавать его во все преобразования Sparklyr, а затем очищать временные значения все вместе, когда результаты от них больше не зависят.

Заключение


Если вы хотите тщательно контролировать обработку данных в Spark или БД с помощью R, стоит рассмотреть replyr в дополнение к dplyr и sparklyr.

sparklyr::spark_disconnect(sc)
rm(list=ls())
gc()
##           used (Mb) gc trigger (Mb) max used (Mb)
## Ncells  821292 43.9    1442291 77.1  1168576 62.5
## Vcells 1364897 10.5    2552219 19.5  1694265 13.0

Also popular now: