Introduction
The R
programming language, along with RStudio
, has become one of the most popular tools for data analysis as it contains a large amount of open-source packages developed by a community of statisticians. However, R
or RStudio
is not ideal for Big Data analysis as mostly the data would not fit into R memory. On the other hand, Spark has become the leading platform for big-data analytics. It works with the system to distribute data across clusters and process data in parallel. Moreover it provides native bindings for different languages such as Java, Python, Scala, and R.
sparklyr
is an R package that allows us to analyze data in Spark from R. It supports dplyr
, a popular tool for working with data frame like objects both in memory and out of memory, and many machine learning algorithms to run classifiers, regressions, and so on in Spark. It is extensible that you can create R packages that depend on sparklyr
to call the full Spark API, such as H2O’s rsparkling
, an R package that works with H2O’s machine learning algorithm. With sparklyr
and rsparkling
, we have access to all the tools in H2O for analysis with R and Spark.
Connect to Spark
Suppose that sparklyr
has been successfully installed in your R
environment. To get start with Spark using sparklyr
and a local cluster,
library(sparklyr)
spark_install()
sc <- spark_connect(master = "local")
or if a Spark cluster has been made available to you
sc <- spark_connect(master = "<cluster-master>")
When I run spark_connect(master = "local")
, I got the error message
"Java 9 is currently unsupported in Spark distributions unless you manually install Hadoop 2.8 and manually configure Spark. Please consider uninstalling Java 9 and reinstalling Java 8. To override this failure set 'options(sparklyr.java9 = TRUE)'."
That’s because on my Windows 10 laptop, my JAVA_HOME was set to C:\Java\jdk
, which has Version 11, in the system environment. I change it to
JAVA_HOME = "C:\Java\jre1.8.0_151"
Note that my Java folder was in C:\Program Files (x86)\..
and it created an issue when connecting to Spark. So I moved the folder directly to C:\..
to solve the problem. Howoever, another error triggers when connecting to Spark
---- Output Log ----
Error occurred during initialization of VM
Could not reserve enough space for 2097152KB object heap
It cannot allocate 2GB and this seems a common issue under Windows with a Java version using x86. To solve this, we can either install java x64 or reduce the default memory
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "512m"
sc <- spark_connect(master = "local", config = config)
Finally, I get connected to Spark! Now I can run analyses and build models using Spark from R.
To monitor and analyze execution, we can go to the Spark’s web interface:
spark_web(sc)
Once we are done with analysis, we can disconnect spark,
spark_disconnect(sc)
Data Analysis
Copy data to Spark
The data set mtcars
is a dataframe available in R
. Run ?mtcars
to see more details. To copy the data set into Apache Spark
cars <- copy_to(sc, mtcars)
# or
cars <- sdf_copy_to(sc, mtcars)
Now we can access the data that was copied into Spark from R using the cars
reference.
To read data from existing data sources in csv format and copy to Spark,
cars <- spark_read_csv(sc, "cars.csv")
To export data as a csv file,
spark_write_csv(cars, "cars.csv")
Other formats like plain text, JSON, JDBC are supported as well.
Exploratory data analysis
When using Spark from R to analyze data, most regular R
functions, such as nrow
, won’t work directly on the Spark reference cars
. Instead, we can either use SQL through the DBI
package or use dplyr
(strongly preferred). Most of the data transformation made available by dplyr
to work with local data frames are also available to use with a Spark connection. This means that a general approach to learning dplyr
can be taken in order to gain more proficiency with data exploration and preparation with Spark. For example, to count how many records are available in cars
,
dplyr::count(cars) # = nrow(mtcars)
To select columns, sample rows, and collect data from Spark,
df_in_r <- dplyr::select(cars, hp, mpg) %>%
dplyr::sample_n(100) %>%
dplyr::collect()
Then we can apply regular R
functions on the dataframe df_in_r
, for example
dim(df_in_r)
plot(df_in_r)
If a particular functionality is not available in Spark and no extension has been developed, we can distribute the R code across the Spark cluster. For example,
cars %>% spark_apply(nrow)
This is a powerful tools but comes with additional complexity that we should only use as a last resort option. We should learn how to do proper data analysis and modeling without having to distribute custom R code across our cluster!
The corrr
package specializes in correlations. It contains friendly functions to prepare and visualize the results.
library(corrr)
cars %>%
correlate(use = "pairwise.complete.obs", method = "pearson") %>%
shave() %>%
rplot()
The sparklyr
package also provides some functions for data transformation and exploratory data analysis. Those functions usually have sdf_
as a prefix.
Modeling
Spark MLlib is the component of Spark that allows one to write high level code to perform machine learning tasks on distributed data. Sparklyr provides an interface to the ML algorithms that should be familiar to R users. For example, you can run a linear regression as follows:
model <- ml_linear_regression(cars, mpg ~ hp)
model %>%
ml_predict(copy_to(sc, data.frame(hp = 250 + 10 * 1:10))) %>%
transmute(hp = hp, mpg = prediction) %>%
full_join(select(cars, hp, mpg)) %>%
collect() %>%
plot()
To retrieve additional statistics from the model,
broom::glance(model)
Spark provides a wide range of algorithms and feature transformers. Those functions usually have ml_
or ft_
as prefix.
Extensions
Many extensions to sparklyr
have been made available, such as sparklyr.nested
, rsparkling
, and Mleap
. The sparklyr.nested
package helps you manage values that contain nested information, for example JSON files. The Mleap
enables Spark pipelines in production. The rsparkling
package provides H2O support. For example, you may convert a Spark dataframe to an H2O frame and then use h2o
package to run basic H2O commands in R.
h2o_df <- rsparkling::as_h2o_frame(sc, cars)
model <- h2o::h2o.glm(x = "gp", y = "mpg", h2o_df, alpha = 0, lambda = 0)
summary(model)
You can use the invoke
family of functions to generate the objects you want and write extension function that calls functions in Java or Scala. For example,
count_lines <- function(sc, "file") {
spark_context(sc) %>%
invoke("textFile", file, 1L) %>%
invoke("count")
}
The count_lines
function calls the textFile().count()
method in Java [2].
References
[1] The R in Spark: Learning Apache Spark with R. https://therinspark.com/starting.html#starting-spark-web-interface.
[2] Sparklyr from Rstudio. https://spark.rstudio.com/extensions/.