R and Big Data: Using Replyr
- Transfer
replyr
- short for RE mote PLY ing of big data for R (remote processing of big data in R). Why is it worth a try
replyr
? Because it allows you to apply standard working approaches to remote data (databases or Spark ). You can work in the same way as with the local one
data.frame
. replyr
provides such opportunities:- Data Aggregation:
replyr_summary()
. - Joining Tables:
replyr_union_all()
. - Linking tables in rows:
replyr_bind_rows()
. - Using the functions of separation, union, combination (
dplyr::do()
):replyr_split()
,replyr::gapply()
. - Aggregation / Distribution:
replyr_moveValuesToRows()
/replyr_moveValuesToColumns()
. - Track intermediate results.
- Association controller.
Most likely, you do all this with the data locally, so these features will make working with
Spark
and sparklyr
much easier. replyr
- a product of the collective experience of using R in applied solutions for many customers, collecting feedback and correcting deficiencies. Examples are below.
Everything is changing rapidly now, so we will use the versions of packages in development for examples.
base::date()
## [1] "Thu Jul 6 15:56:28 2017"
# devtools::install_github('rstudio/sparklyr')
# devtools::install_github('tidyverse/dplyr')
# devtools::install_github('tidyverse/dbplyr')
# install.packages("replyr")
suppressPackageStartupMessages(library("dplyr"))
packageVersion("dplyr")
## [1] '0.7.1.9000'
packageVersion("dbplyr")
## [1] '1.1.0.9000'
library("tidyr")
packageVersion("tidyr")
## [1] '0.6.3'
library("replyr")
packageVersion("replyr")
## [1] '0.4.2'
suppressPackageStartupMessages(library("sparklyr"))
packageVersion("sparklyr")
## [1] '0.5.6.9012'
# больше памяти, чем предполагается в https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.1.0',
hadoop_version = '2.7',
master = "local",
config = config)
Summary
Standard
summary()
and glance()
that cannot be performed on Spark
.mtcars_spark <- copy_to(sc, mtcars)
# резюме обработки, а не данных
summary(mtcars_spark)
## Length Class Mode
## src 1 src_spark list
## ops 2 op_base_remote list
packageVersion("broom")
## [1] '0.4.2'
broom::glance(mtcars_spark)
## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl
replyr_summary
works.replyr_summary(mtcars_spark) %>%
select(-lexmin, -lexmax, -nunique, -index)
## column class nrows nna min max mean sd
## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481
## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216
## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938
## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685
## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787
## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574
## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432
## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161
## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909
## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041
## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000
Aggregation / Distribution
tidyr
It works mainly with local data.mtcars2 <- mtcars %>%
mutate(car = row.names(mtcars)) %>%
copy_to(sc, ., 'mtcars2')
# ошибки
mtcars2 %>%
tidyr::gather('fact', 'value')
## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
mtcars2 %>%
replyr_moveValuesToRows(nameForNewKeyColumn= 'fact',
nameForNewValueColumn= 'value',
columnsToTakeFrom= colnames(mtcars),
nameForNewClassColumn= 'class') %>%
arrange(car, fact)
## # Source: lazy query [?? x 4]
## # Database: spark_connection
## # Ordered by: car, fact
## car fact value class
##
## 1 AMC Javelin am 0.00 numeric
## 2 AMC Javelin carb 2.00 numeric
## 3 AMC Javelin cyl 8.00 numeric
## 4 AMC Javelin disp 304.00 numeric
## 5 AMC Javelin drat 3.15 numeric
## 6 AMC Javelin gear 3.00 numeric
## 7 AMC Javelin hp 150.00 numeric
## 8 AMC Javelin mpg 15.20 numeric
## 9 AMC Javelin qsec 17.30 numeric
## 10 AMC Javelin vs 0.00 numeric
## # ... with 342 more rows
Line Binding
dplyr bind_rows
, union
and are union_all
now not applicable in Spark
. replyr::replyr_union_all()
and replyr::replyr_bind_rows()
- a workable alternative.bind_rows ()
db1 <- copy_to(sc,
data.frame(x=1:2, y=c('a','b'),
stringsAsFactors=FALSE),
name='db1')
db2 <- copy_to(sc,
data.frame(y=c('c','d'), x=3:4,
stringsAsFactors=FALSE),
name='db2')
# Ошибки из-за попытки осуществить операцию над обработчиком, а не данными
bind_rows(list(db1, db2))
## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl
union_all
# игнорирует названия столбцов и приводит все данные к строкам
union_all(db1, db2)
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## x y
##
## 1 1 a
## 2 2 b
## 3 3 c
## 4 4 d
union
# игнорирует названия столбцов и приводит все данные к строкам
# скорее всего, также потеряет дублирующиеся строки
union(db1, db2)
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## x y
##
## 1 4 d
## 2 1 a
## 3 3 c
## 4 2 b
replyr_bind_rows
replyr::replyr_bind_rows
can bind together a few data.frame
s.replyr_bind_rows(list(db1, db2))
## # Source: table [?? x 2]
## # Database: spark_connection
## x y
##
## 1 1 a
## 2 2 b
## 3 3 c
## 4 4 d
dplyr :: do
In our example, just take a few rows from each group of the aggregated dataset. Please note: since we do not explicitly specify the order using arrange, we cannot always expect the results to match in different data sources (DB or
Spark
).dplyr::do
on local data
From
help('do', package='dplyr')
:by_cyl <- group_by(mtcars, cyl)
do(by_cyl, head(., 2))
## # A tibble: 6 x 11
## # Groups: cyl [3]
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
dplyr::do
on the Spark
by_cyl <- group_by(mtcars_spark, cyl)
do(by_cyl, head(., 2))
## # A tibble: 3 x 2
## cyl V2
##
## 1 6
## 2 4
## 3 8
We get not quite what can be used.
replyr
split / merge
mtcars_spark %>%
replyr_split('cyl',
partitionMethod = 'extract') %>%
lapply(function(di) head(di, 2)) %>%
replyr_bind_rows()
## # Source: table [?? x 11]
## # Database: spark_connection
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr gapply
mtcars_spark %>%
gapply('cyl',
partitionMethod = 'extract',
function(di) head(di, 2))
## # Source: table [?? x 11]
## # Database: spark_connection
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr::replyr_apply_f_mapped
What I would like to receive: data with names corresponding to the code (i.e. change the data, not the code).
By some thought, this can be achieved if the change in data is associated with the environment of the function, and not with the data. That is, the data is changed until the corresponding monitoring function is performed. In our case, this function is
replyr::replyr_apply_f_mapped()
, and it works as follows. Suppose that the operation we want to use is a downgrade function obtained from some source that is not under our control (for example, a package). It may be a simple function (as below), but suppose we want to use it unchanged (excluding even very small ones, as an introduction
wrapr::let()
).# внешняя функция с заданными в явном виде названиями столбцов
DecreaseRankColumnByOne <- function(d) {
d$RankColumn <- d$RankColumn - 1
d
}
To apply this function to
d
(in which the column names are not as expected!), We use replyr::replyr_apply_f_mapped()
to create a new parameterized adapter:# наши данные
d <- data.frame(Sepal_Length = c(5.8,5.7),
Sepal_Width = c(4.0,4.4),
Species = 'setosa',
rank = c(1,2))
# обработчик для ввода параметров
DecreaseRankColumnByOneNamed <- function(d, ColName) {
replyr::replyr_apply_f_mapped(d,
f = DecreaseRankColumnByOne,
nmap = c(RankColumn = ColName),
restrictMapIn = FALSE,
restrictMapOut = FALSE)
}
# использование
dF <- DecreaseRankColumnByOneNamed(d, 'rank')
print(dF)
## Sepal_Length Sepal_Width Species rank
## 1 5.8 4.0 setosa 0
## 2 5.7 4.4 setosa 1
replyr::replyr_apply_f_mapped()
renames the columns as expected in DecreaseRankColumnByOne
(match specified in nmap
), applies DecreaseRankColumnByOne
and returns the names to the original ones before returning the result.Tracking subtotals
Many of the tasks
Sparklyr
involved in creating intermediate or temporary tables. This can be done with dplyr::copy_to()
and dplyr::compute()
. These methods can be resource intensive. There
replyr
are functions that allow you to keep the process under control: temporary name generators that do not change the data itself (they are also used inside the package itself). The function itself is pretty simple:
print(replyr::makeTempNameGenerator)
## function (prefix, suffix = NULL)
## {
## force(prefix)
## if ((length(prefix) != 1) || (!is.character(prefix))) {
## stop("repyr::makeTempNameGenerator prefix must be a string")
## }
## if (is.null(suffix)) {
## alphabet <- c(letters, toupper(letters), as.character(0:9))
## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE),
## collapse = "")
## }
## count <- 0
## nameList <- list()
## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) {
## if (length(list(...)) > 0) {
## stop("replyr::makeTempNameGenerator tempname generate unexpected argument")
## }
## if (peek) {
## return(names(nameList))
## }
## if (dumpList) {
## v <- names(nameList)
## nameList <<- list()
## return(v)
## }
## if (!is.null(remove)) {
## victims <- intersect(remove, names(nameList))
## nameList[victims] <<- NULL
## return(victims)
## }
## nm <- paste(prefix, suffix, sprintf("%010d", count),
## sep = "_")
## nameList[[nm]] <<- 1
## count <<- count + 1
## nm
## }
## }
##
##
For example, to join several tables, a good solution for some data sources is to call
compute
after each join (otherwise, the resulting SQL can become long and difficult to understand and maintain). The code looks something like this:# создание данных для примера
names <- paste('table', 1:5, sep='_')
tables <- lapply(names,
function(ni) {
di <- data.frame(key= 1:3)
di[[paste('val',ni,sep='_')]] <- runif(nrow(di))
copy_to(sc, di, ni)
})
# собственный генератор временных имён
tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP')
# объединение слева таблиц в последовательности
joined <- tables[[1]]
for(i in seq(2,length(tables))) {
ti <- tables[[i]]
if(i
Аккуратное введение и управление временными данными может сохранить ресурсы (и время, и место) и значительно улучшить результаты. Нам кажется хорошей практикой задавать в явном виде генератор временных имён, передавать его во все преобразования Sparklyr
, а затем очищать временные значения все вместе, когда результаты от них больше не зависят.
Заключение
Если вы хотите тщательно контролировать обработку данных в Spark
или БД с помощью R
, стоит рассмотреть replyr
в дополнение к dplyr
и sparklyr
.
sparklyr::spark_disconnect(sc)
rm(list=ls())
gc()
## used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 821292 43.9 1442291 77.1 1168576 62.5
## Vcells 1364897 10.5 2552219 19.5 1694265 13.0