scalable data science with sparkr: spark summit east talk by felix cheung
TRANSCRIPT
![Page 1: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/1.jpg)
SCALABLE DATA SCIENCE WITH SPARKR
Felix Cheung Principal Engineer - Spark @ Microsoft & Apache Spark Committer
![Page 2: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/2.jpg)
![Page 3: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/3.jpg)
Disclaimer: Apache Spark community contributions
![Page 4: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/4.jpg)
Spark in 5 seconds• General-purpose cluster computing system • Spark SQL + DataFrame/Dataset + data sources • Streaming/Structured Streaming • ML • GraphX
![Page 5: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/5.jpg)
R• A programming language for statistical computing and
graphics • S – 1975 • S4 - advanced object-oriented features • R – 1993 • S + lexical scoping • Interpreted • Matrix arithmetic • Comprehensive R Archive Network (CRAN) – 10k+ packages
![Page 6: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/6.jpg)
![Page 7: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/7.jpg)
SparkR• R language APIs for Spark and Spark SQL • Exposes Spark functionality in an R-friendly
DataFrame APIs • Runs as its own REPL sparkR • or as a R package loaded in IDEs like RStudio library(SparkR) sparkR.session()
![Page 8: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/8.jpg)
Architecture• Native R classes and methods • RBackend • Scala “helper” methods (ML pipeline etc.)
www.slideshare.net/SparkSummit/07-venkataraman-sun
![Page 9: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/9.jpg)
Advantages• JVM processing, full access to DAG capabilities
and Catalyst optimizer, predicate pushdown, code generation, etc.
databricks.com/blog/2015/06/09/announcing-sparkr-r-on-spark.html
![Page 10: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/10.jpg)
Features - What’s new in SparkR• SQL • Data source (JSON, csv, PostgreSQL, libsvm) • SparkSession & default session (streamlined parameter)
as.DataFrame(iris) • Catalog (external data table management) • Spark packages, spark.addFiles() • ML • R-native UDF • Cluster support (YARN, mesos, standalone)
![Page 11: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/11.jpg)
SparkR for Data Science
![Page 12: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/12.jpg)
Decisions, decisions?
Distributed?
Native R UDF Spark.ml
YesNo
![Page 13: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/13.jpg)
Spark ML Pipeline• Pre-processing, feature extraction, model fitting,
validation stages • Transformer • Estimator • Cross-validation/hyperparameter tuning
Tokenizer HashTF Logistic Regression
![Page 14: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/14.jpg)
SparkR API for ML Pipeline
spark.lda( data = text, k = 20, maxIter = 25, optimizer = "em")
RegexTokenizer
StopWordsRemover
CountVectorizer
R
JVM
LDA
Single-entrypointR API builds JVM ML Pipeline
![Page 15: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/15.jpg)
Model Operations• summary - print a summary of the fitted model • predict - make predictions on new data • write.ml/read.ml - save/load fitted models
(slight layout difference: pipeline model plus R metadata)
![Page 16: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/16.jpg)
Spark.ml in SparkR 2.0.0• Generalized Linear Model (GLM) • Naive Bayes Model • k-means Clustering • Accelerated Failure Time (AFT) Survival Model
![Page 17: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/17.jpg)
Spark.ml in SparkR 2.1.0• Generalized Linear Model (GLM) • Naive Bayes Model • k-means Clustering • Accelerated Failure Time (AFT) Survival Model • Isotonic Regression Model • Gaussian Mixture Model (GMM) • Latent Dirichlet Allocation (LDA) • Alternating Least Squares (ALS) • Multilayer Perceptron Model (MLP) • Kolmogorov-Smirnov Test (K-S test) • Multiclass Logistic Regression • Random Forest • Gradient Boosted Tree (GBT)
![Page 18: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/18.jpg)
RFormula• Specify modeling in symbolic form
y ~ f0 + f1 response y is modeled linearly by f0 and f1
• Support a subset of R formula operators~ , . , : , + , -
• Implemented as feature transformer in core Spark, available to Scala/Java, Python
• String label column is indexed • String term columns are one-hot encoded
![Page 19: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/19.jpg)
Generalized Linear Model# R-likeglm(Sepal_Length ~ Sepal_Width + Species, gaussianDF, family = "gaussian")
spark.glm(binomialDF, Species ~ Sepal_Length + Sepal_Width, family = "binomial")
• “binomial” output string label, prediction
![Page 20: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/20.jpg)
Multilayer Perceptron Model
spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3), solver = “l-bfgs”, maxIter = 100, tol = 0.5, stepSize = 1)
![Page 21: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/21.jpg)
Multiclass Logistic Regression
spark.logit(df, label ~ ., regParam = 0.3, elasticNetParam = 0.8, family = "multinomial", thresholds = c(0, 1, 1))
• binary or multiclass
![Page 22: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/22.jpg)
Random Forestspark.randomForest(df, Employed ~ ., type = "regression", maxDepth = 5, maxBins = 16)
spark.randomForest(df, Species ~ Petal_Length + Petal_Width, "classification", numTree = 30)
• “classification” index label, predicted label to string
![Page 23: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/23.jpg)
Gradient Boosted Treespark.gbt(df, Employed ~ ., type = "regression", maxDepth = 5, maxBins = 16)
spark.gbt(df, IndexedSpecies ~ ., type = "classification", stepSize = 0.1)
• “classification” index label, predicted label to string • Binary classification
![Page 24: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/24.jpg)
Modeling Parametersspark.randomForest
function(data, formula, type = c("regression", "classification"),
maxDepth = 5, maxBins = 32, numTrees = 20, impurity = NULL,
featureSubsetStrategy = "auto", seed = NULL, subsamplingRate = 1.0,
minInstancesPerNode = 1, minInfoGain = 0.0, checkpointInterval = 10,
maxMemoryInMB = 256, cacheNodeIds = FALSE)
![Page 25: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/25.jpg)
Spark.ml Challenges• Limited API sets
• Non-trivial to map spark.ml API to R API • Keeping up to changes
• Almost all (except One vs Rest) • Simple API, but fixed ML pipeline
• Debugging is hard • Not a ML specific problem • Getting better?
![Page 26: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/26.jpg)
Native-R UDF• User-Defined Functions - custom transformation • Apply by Partition • Apply by Group
UDFdata.frame data.frame
![Page 27: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/27.jpg)
Parallel Processing By PartitionR
R
R
Partition
Partition
Partition
UDF
UDF
UDF
data.frame
data.frame
data.frame
data.frame
data.frame
data.frame
![Page 28: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/28.jpg)
UDF: Apply by Partition• Similar to R apply • Function to process each partition of a DataFrame • Mapping of Spark/R data types
dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
![Page 29: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/29.jpg)
UDF: Apply by Partition + Collect• No schema
out <- dapplyCollect( carsSubDF, function(x) { x <- cbind(x, "kmpg" = x$mpg*1.61) })
![Page 30: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/30.jpg)
Example - UDFresults <- dapplyCollect(train, function(x) { model <- randomForest::randomForest(as.factor(dep_delayed_15min) ~ Distance + night + early, data = x, importance = TRUE, ntree = 20) predictions <- predict(model, t) data.frame(UniqueCarrier = t$UniqueCarrier, delayed = predictions) })
closure capture - serialize & broadcast “t”
access package “randomForest::” at each invocation
![Page 31: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/31.jpg)
UDF: Apply by Group• By grouping columns
gapply(carsDF, "cyl", function(key, x) { y <- data.frame(key, max(x$mpg)) }, schema)
![Page 32: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/32.jpg)
UDF: Apply by Group + Collect• No Schema
out <- gapplyCollect(carsDF, "cyl", function(key, x) { y <- data.frame(key, max(x$mpg)) names(y) <- c("cyl", "max_mpg") y })
![Page 33: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/33.jpg)
UDF: data type mapping * not a complete listR Spark
byte byte
integer integer
float float
double, numeric double
character, string string
binary, raw binary
logical boolean
POSIXct, POSIXlt timestamp
Date date
array, list array
env map
![Page 34: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/34.jpg)
UDF Challenges• “struct”
• No support for nested structures as columns • Scaling up / data skew
• What if partition or group too big for single R process? • Not enough data variety to run model? • Performance costs
• Serialization/deserialization, data transfer • esp. beware of closure capture
![Page 35: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/35.jpg)
UDF: lapply• Like R lapply or doParallel • Good for “embarrassingly parallel” tasks
• Such as hyperparameter tuning
![Page 36: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/36.jpg)
UDF: lapply• Take a native R list, distribute it • Run the UDF in parallel
UDFelement *anything*vector/ list list
![Page 37: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/37.jpg)
UDF: parallel distributed processing• Output is a list - needs to fit in memory at the driver costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))train <- function(cost) { model <- e1071::svm(Species ~ ., iris, cost = cost) summary(model)}summaries <- spark.lapply(costs, train)
![Page 38: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/38.jpg)
Walkthrough
![Page 40: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/40.jpg)
One last thing…
![Page 41: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/41.jpg)
SparkR as a Package (target 2.1.1)• Goal: simple one-line installation of SparkR from CRAN
install.packages("SparkR")• Spark Jar downloaded from official release and cached
automatically, or manually install.spark() since Spark 2.0.0 • R vignettes • Community can write packages that depends on SparkR package • Advanced Spark JVM interop APIs
sparkR.newJObject sparkR.callJMethod sparkR.callJStatic
![Page 42: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/42.jpg)
Ecosystem• RStudio sparklyr • RevoScaleR/RxSpark, R Server • H2O R • Apache SystemML (R-like API) • Renjin (not Spark) • IBM BigInsights Big R (not Spark!)
![Page 43: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/43.jpg)
Recap: SparkR 2.0.0, 2.1.0• SparkSession• ML • UDF
![Page 44: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/44.jpg)
What’s coming in SparkR 2.1.1• Fix Gamma family with GLM, optimizer in LDA(SPARK-19133, SPARK-19066)
• Partitioning DataFrame (SPARK-18335, SPARK-18788)
df <- as.DataFrame(cars, numPartitions = 10)getNumPartitions(df)
• Setting column R-friendly shortcuts (SPARK-19130, SPARK-18823) df$foo <- 1df[[myname]] <- 1; df[[2]] <- df$eruptions / 60
• Spark UI URL sparkR.uiWebUrl (SPARK-18903)
• install.spark better download error handling (SPARK-19231)
![Page 45: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/45.jpg)
What’s coming in SparkR 2.2.0• More, richer ML - Bisecting K-means
More in-planning and not committed - feedback appreciated! • Tweedie GLM • collect performance (SPARK-18924) • ML Pipeline in SparkR (SPARK-18822) • Richer RFormula support (SPARK-18570, SPARK-18569) • Better tree ensemble summary (SPARK-18348) • ML persistence format (SPARK-15572)
![Page 46: Scalable Data Science with SparkR: Spark Summit East talk by Felix Cheung](https://reader031.vdocuments.us/reader031/viewer/2022022200/58abb4431a28ab04618b4d39/html5/thumbnails/46.jpg)
Thank You.https://github.com/felixcheung linkedin: http://linkd.in/1OeZDb7 blog: http://bit.ly/1E2z6OI