galvanise nyc - scaling r with hadoop & spark. v1.0
Post on 15-Apr-2017
196 Views
Preview:
TRANSCRIPT
© 2016 IBM Corporation
Scaling R using Hadoop and Spark
Virender S. Thakur- Big Data Specialist @IBM
Open Source Analytics Meetup – NYC
March 8th, 2016
© 2016 IBM Corporation2
IBM’s Framework for Getting Value out of Big Data
All agree on Big Data’s potential, but wide divergence on how to
exploit it
Pioneers who have started to harness Big Data have benefited greatly
We see Big Data adoption as a continual process – maturity levels
IBM’s approach enables faster adoption of Big Data technologies
Open source innovation (Hadoop, Spark)
Standards-based technologies (ODP, SQL, R)
Familiar interfaces and integration with established tools (IBM innovations)
Advanced analytics (IBM innovations)
IBM’s commitment for continued innovation
© 2016 IBM Corporation3
IBM is Committed to Open Source
Open source technologies are the base for IBM software and solutions
IBM’s long history of deep open source commitment
Apache Software Foundation: Founding member in 1999
Cloud Foundry: #1 contributor; Basis for Bluemix
OpenStack: #4 contributor; Basis for IBM’s IaaS
Linux: #3 contributor; IBM first enterprise backer of Linux
Hadoop/Spark: Extensive investment in open source contribution; Integration with
Analytics software
Infrastructure
Systems
Application
© 2016 IBM Corporation4
IBM has the largest investment in Spark of any company in the world
visit www.spark.tc for more informationIBM | Spark
IBM Spark Technology Center
Top committer/contributor
300+ inventors
Commitment to educate 1 million data scientists
Contributed SystemML
Founding member of AMPLab
Partnerships in the ecosystem
IBM Has the Largest Investment in Spark in the World
© 2016 IBM Corporation5
IBM is all-in on its commitment to Spark
Foster
Community
Educate 1M+ data scientists and engineers
via online courses
Sponsor AMPLab, creators and
evangelists of Spark
Infuse the
Portfolio
Integrate Spark throughout portfolio
3,500 employees working on Spark-related topics
Spark however customers want it –
standalone, platform or products
Source: https://www-03.ibm.com/press/us/en/pressrelease/47107.wss
Launch Spark Technology Cluster
(STC), 300 engineers
Open source SystemML
Partner with databricks
Contribute to
the Core
"It's like Spark
just got blessed
by the enterprise
rabbi."
Ben Horowitz
Andreessen Horowitz
© 2016 IBM Corporation6
Open Data Platform Initiative
Why is IBM involved?
Strong history of leadership in open source &
standards
Supports our commitment to open source currency
in all future releases
Accelerates our innovation within Hadoop &
surrounding applications
Open Data Platform (ODP) vs. Apache Software
Foundation (ASF)
ODP supports the ASF mission
ASF provides a governance model around individual
projects without looking at ecosystem
ODP aims to provide a vendor-led consistent
packaging model for core Apache components as an
ecosystem
All Standard Apache Open Source Components
HDFS
YARN
MapReduce
Ambari HBase
Spark
Flume
Hive Pig
Sqoop
HCatalog
Solr/Lucene
ODP
© 2016 IBM Corporation7
Text Analytics
POSIX Distributed
Filesystem
Multi-workload, Multi-
tenant scheduling
IBM BigInsights
Enterprise Management
Machine Learning on
Big R
Big R
IBM BigInsights
Data Scientist
IBM BigInsights
Analyst
Big SQL
BigSheets
Big SQL
BigSheets
IBM BigInsights for
Apache Hadoop
IBM Open Platform with Apache Hadoop – all open source
HDFS
YARN
MapReduce
Ambari HBase
Spark
Flume
Hive Pig
Sqoop
HCatalog
Solr/Lucene
Zookeeper Oozie Knox Slider
IBM BigInsights for Apache Hadoop
7
Initial ODP Scope
Cloud On Prem Appliance
© 2016 IBM Corporation8
What Makes Us Different?
Open: Open Data Platform
Insight: Tools and accelerators to visualize, filter, and
analyze large data sets
Anywhere: Cloud, On Premise, Appliance
© 2015 International Business Machines Corporation 8
Key Benefits
» BigSQL = Makes Hive faster and more secure
» BigSheets for visualization and exploration
» Text Analytics from IBM Research
IBM Hadoop
Distribution
IBM
Hadoop Ecosystem
ODP Core
100% Open
source Apache
Hadoop
distribution
including Spark
SQL
Security
Business
Intelligence
Predictive
Analytics
Streaming
Text Analytics
Data
Management
MDM
Visualization
Workload
Optimization
BigSQL
GPFS
HA
Integration with IBM Portfolio
» Analytics: SPSS, Cognos, Streams
» Data Warehouse: Netezza, DB2
» Governance + Security: Optim, Guardium, Information
Governance
» + Data Integration + Security Intelligence + Watson Explorer
+ MDM + Data Replication
© 2016 IBM Corporation
Big R OverviewScalable in-Hadoop Analytics
© 2016 IBM Corporation10
Challenges with Running Large-Scale Analytics
TRADITIONAL APPROACH BIG DATA APPROACH
Analyze small subsets of information
Analyze all information
Analyzedinformation
All available information
All available informationanalyzed
© 2016 IBM Corporation11
What is Big R?
R Clients
Scalable Statistics Engine
Data Sources
Embedded R Execution
R Packages
R Packages
1
2
3
1. Explore, visualize, transform, and model big data using familiar R syntax and paradigm (no MapReduce code)
2. Scale out R• Partitioning of large data (“divide”)
• Parallel cluster execution of pushed down R code (“conquer”)
• All of this from within the R environment (Jaql, Map/Reduce are hidden from you
• Almost any R package can run in this environment
3. Scalable machine learning• A scalable statistics engine that
provides canned algorithms, and an ability to author new ones, all via R
“End-to-end integration of R-Project with BigInsights”
Pull data
(summaries) to
R client
Or, push R
functions
right on the
data
© 2016 IBM Corporation12
User Experience for Big R
Connect to BI cluster
Data frame proxy to large data file
Data transformation step
Run scalable linear regression on cluster
© 2016 IBM Corporation13
Job Summary (Ambari)
© 2016 IBM Corporation14
Rich Functionality in Big R
Big R Function
Connection connect, disconnect, …
HDFS listfs, rmfs
Types & Functions
Types bigr.frame, bigr.vector
Functionsdim, nrow, colnames, coltypes, head, tail, na.string, na.omit, sort, summary
Coercion and
Casting
as.bigr.frame, as.data.frame, ….vectoras.integer, as.logical, as.numeric
Built-in Functions
Arithmetic +, -, *, /, ^
Mathematical abs, acos, asin, atan, ceiling, floor, exp, …
String grepl, substr
Statistical cor, cov, mean, sd
Miscellaneous attach, pull, random, sample, ifelse
Visualization histogram
Apply R functions groupApply, tableApply, rowApply
Run scalable algorithms bigr.lm, bigr.svm, bigr. … (see subsequent slide)3
2
1
© 2016 IBM Corporation15
How Big R compares with other R solutions . . . .
RHIPE implementation
Other solutions offer an R API for writing MapReduce from R.Example: Compute the mean departure delay for each airline on a monthly basis*.
RHadoop implementation Big R implementation
*Dataset: “airline”. Scheduled flights in US 1987-2009.
© 2016 IBM Corporation16
Machine learning with Big R
Based on SystemML (IBM Research,.now open source)
Scalability for large data sets
R API inspired by R’s ML libraries
Big R functions Inspired by R’s Algorithm
bigr.lm() lm() Linear regression
bigr.glm() glm() Generalized Linear Models
. . . . . . . . .
bigr.kmeans() kmeans() K-means clustering
bigr.naive.bayes() naiveBayes() Naïve Bayes classifier
bigr.sample() sample() Uniform sample by percentage, exact number of samples, or partitioned sampling.
© 2016 IBM Corporation17
Arbitrary Large Data Structures: bigr.frame
> data <- bigr.frame (dataPath = "/user/bigr/email_data_virender.csv",
+ dataSource="DEL",
+ delimiter=",",
+ header=TRUE,
+ coltypes=ifelse(1:10 %in% c(2,4,7,9), "numeric", "character"),
+ useMapReduce=TRUE)
> eval(parse(text=paste0(paste0("data$newColumn",1:100),"<- as.integer(bigr.random()
* 100) + 1")))
> dim(data)
[1] 7897921 110
> summary(data[,c("TITLE_CD","newColumn99")])
TITLE_CD newColumn99
A Min. :JC Min. :1
B Max. :MW Max. :100
C Mean :50
© 2016 IBM Corporation18
Open Source R: glm2 Logistic Regression
> modelBuildTime1 <- proc.time()
> lg <- glm2(Target_JC_oo ~.,
+ data=data2,
+ family=binomial,
+ x=FALSE,
+ y=TRUE)
> modelBuildTime2 <- proc.time()
> resultTimeModel1 <- modelBuildTime2 - modelBuildTime1
> print(resultTimeModel1)
user system elapsed
1.90 0.19 2.09
© 2016 IBM Corporation19
Big R: bigr.glm Logistic Regression
> modelBuildTime0 <- proc.time()
> glmModel <- bigr.glm(Target_JC_oo ~ .,
+ data=train,
+ family=binomial(logit),
+ neg.binomial.class=1,
+ intercept=TRUE,
+ shiftAndRescale=TRUE,
+ directory="/user/bigr/glm/glm.model")
> modelBuildTime1 <- proc.time()
> resultTimeModel0 <- modelBuildTime1 - modelBuildTime0
> print(resultTimeModel0)
user system elapsed
4.06 0.05 20.66
© 2016 IBM Corporation20
HDFS
instead of
Memory
Spills to disk over
course of a job
PDL 500 GB Dataset
Big R’s Scalability Beyond R and Aggregate Cluster
Memory
Scaling Beyond Aggregate Memory
Automatically “spills to disk”:
If more data than what fits into memory
e.g., IBM Research 4 TB Dataset:
6.6X More Data than RAM
Arbitrarily Large Data Structures
bf1 <- bigr.frame(...arbitrarily_large_data...)
df1 <- as.data.frame(…small_data…)
bf2 <- as.bigr.frame(…data.frame…)
1
2
© 2016 IBM Corporation21
Scalable Machine Learning Algorithms in Big R
Category Description Big R Function
Descriptive Statistics
Univariate bigr.univariateStats()
Bivariate bigr.bivariateStats()
Stratified Bivariate bigr.bivariateStats()
Classification
Logistic Regression (multinomial) bigr.logistic.regression()
Multi-Class SVM bigr.svm()
Naïve Bayes (multinomial) bigr.naive.bayes()
Clustering k-Means bigr.kmeans()
Regression
Linear
Regression
system of equations bigr.lm()
CG (conjugate gradient descent) bigr.lm()
Generalized
Linear
Models
(GLM)
Distributions: Gaussian, Poisson,
Gamma, Inverse Gaussian,
Binomial and Bernoulli
bigr.glm()
Links for all distributions: identity,
log, sq. root, inverse, 1/μ2bigr.glm()
Links for Binomial / Bernoulli: logit,
probit, cloglog, cauchitbigr.glm()
Predict Scoring bigr.predict()
Transformationdummy coding, binning, scaling,
missing value imputationbigr.transform()
3
© 2016 IBM Corporation22
Big R Machine Learning -- Scalability and Performance
bigr.lm
28x
Performance (data fit in memory)
Scalability (data larger than
aggr. memory)R out-of-memory28X Speedup
Scales beyond
cluster memory
© 2016 IBM Corporation
Apache Spark R
© 2016 IBM Corporation24
Spark includes a set of core libraries that enable various
analytic methods which can process data from many sources
Spark Core
general compute
engine, handles
distributed task
dispatching, scheduling
and basic I/O functions
Spark SQLSpark
Streaming
MLlib
(machine
learning)
GraphX
(graph)
executes SQL
statements
performs
streaming
analytics using
micro-batches
common
machine
learning and
statistical
algorithms
distributed
graph
processing
framework
large variety of
data sources and
formats can be
supported, both
on-premise or
cloud
BigInsights
(HDFS)
Cloudant
dashDB
Object
Storage
SQL
DB
…many
others
IBM CLOUD OTHER CLOUD CLOUD APPS ON-PREMISE
© 2016 IBM Corporation25
Relationship to Hadoop – Spark both competes and
coexists with Hadoop MapReduce
Standalone on
HDFS
Hadoop Yarn
Deployment
Spark in
MapReduce
Run side-by-side
with MapReduce,
leverages current
Hadoop stack
Simply run on Yarn
without any admin
access required
Launch Spark jobs
inside of
MapReduce
Spark is a
superset of
MapReduce
and is based
on similar
distributed
computing
principles
Source: https://databricks.com/blog/2014/01/21/spark-and-hadoop.html
© 2016 IBM Corporation26
Spark Technology Center
Focal point for IBM investment in Spark
Code contributions to Apache Spark project
Build industry solutions using Spark
Evangelize Spark technology inside/outside IBM
Agile engagement across IBM divisions
Systems: contribute enhancements to Spark core, and optimized infrastructure
(hardware/software) for Spark
Analytics: IBM Analytics software will exploit Spark processing
Research: build innovations above (solutions that use Spark), inside
(improvements to Spark core), and below (improve systems that execute Spark)
the Spark stack
Goal: To be the #1 contributor and adopter in the Spark ecosystem
© 2016 IBM Corporation27
SparkR: R on Spark
Opens Apache Spark to the world of R users
Exposes Spark functionally in R-friendly syntax via DataFrames API
V1.4 supports operations like selection, filtering, aggregation, etc.
Roadmap for SparkR ~v1.5 (v1.6 is here too)
Extend machine learning capabilities into SparkR (e.g., SparkML, MLlib)
library(SparkR)
sc <- sparkR.init("local[*]")
demo <- read.df(sqlCtx, "/pathtofile/customerdata.json",
"json")
printSchema(demo)
head(demo)
© 2016 IBM Corporation28
Spark DataFrame
DataFrame = RDD + Schema + DSL
A tabular data structure
Distributed collection of rows organized into named columns
Unified interface for reading and writing data (I/O)
Abstractions for filtering, slicing & dicing, aggregation, visualization
Catalyst optimizer: plan optimization + execution
Simple data structure. Less code. Language Performance
Parity.
© 2016 IBM Corporation29
Code Execution (1)
// Create RDD
val quotes =
sc.textFile("hdfs:/sparkdata/sparkQuotes.txt")
// Transformations
val danQuotes = quotes.filter(_.startsWith("DAN"))
val danSpark = danQuotes.map(_.split(" ")).map(x =>
x(1))
// Action
danSpark.filter(_.contains("Spark")).count()
DAN Spark is cool
BOB Spark is fun
BRIAN Spark is great
DAN Scala is awesome
BOB Scala is flexible
File: sparkQuotes.txt
‘spark-shell’ provides Spark context as ‘sc’
© 2016 IBM Corporation30
Spark DataFrame Execution
Scala/Java
DF
Logical Plan
Physical
Execution
R
DF
Python
DF
Simple wrappers to create logical plan
Intermediate
representation for
computation
Catalyst
optimizer
© 2016 IBM Corporation31
Spark DataFrames have uniform performance across all
languages
© 2016 IBM Corporation32
MLlib
Library providing machine learning primitives on top of Apache Spark
Ease of use
Use any Hadoop data source (e.g. HDFS, HBase, or local files)
Build single application with other Spark components
(e.g., GraphX, Spark Streaming, SparkSQL, etc.)
Performance
Contains high-quality iterative algorithms
MLlib runs up to 100X faster than MapReduce
Easy to Deploy
Runs on existing Hadoop clusters
Standalone mode, EC2, Mesos
© 2016 IBM Corporation33
MLlib “off-the-shelf algorithms”
Category Algorithm
Basic Statistics
Summary statistics
Correlations
Stratified sampling
Hypothesis testing
Random data generation
Classification and
Regression
Linear models (SVMs, logistic regression, linear regression)
Naïve Bayes
Decision trees
Ensembles of trees (Random Forests and Gradient-Boosted Trees)
Isotonic regression
Collaborative Filtering Alternating least squares (ALS)
Clustering
K-means
Gaussian mixture
Power iteration clustering (PIC)
Latent Dirichlet allocation (LDA)
Streaming k-means
© 2016 IBM Corporation34
MLlib “off-the-shelf algorithms” (con’t)
Category Algorithm
Dimensionality ReductionSingular value decomposition (SVD)
Principal component analysis (PCA)
Feature Extraction and
Transformation
Term frequency-inverse document frequency (TF-IDF)
Word2Vec
StandardScaler
Normalizer
ChiSqSelector
ElementwiseProduct
Frequent Pattern Mining FP-growth
OptimizationStochastic gradient descent
Limited-memory BFGS (L-BFGS)
PMML Model Export
KMeansModel
LinearRegressionModel
LassoModel
SVMModel
BinaryLogisticRegressionModel
© 2016 IBM Corporation35
SparkML Pipelines API
Robust systems for end-to-end machine learning pipelines
Specify pipeline, inspect and debug, re-run on new data, tune parameters
KeystoneML is a research compliment to SparkML
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),
outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
TokenizerHashingT
F
Logistic
Regressio
n
Logistic
Regressio
n Model
© 2016 IBM Corporation36
Project Tungsten: Hardware Exploitation for Apache
Spark
DataFram
e
Logical
Plan
SQL Python Scala/Java R …
JVM LLVM GPU NVRAM …
Language frontend
Intermediate
representation
Tungsten backend
© 2016 IBM Corporation37
Asynchronous Design Patterns
Sophisticated communication patterns for sophisticated machine learning
DeepDist “Lightening-Fast Deep Learning on Spark”
• Training deep belief networks requires extensive data and computation
• Asychronous stochastic gradient descent (convergence happens more
quickly)
• Based on Google’s DistBelief project
Exciting machine
learning ecosystem
developing on and
around Spark:
• GitHub projects
• Simple APIs
• Exploiting hardware
© 2016 IBM Corporation
Where to go next?
© 2016 IBM Corporation39
Spark will be infused throughout IBM products and will be
delivered however the customer wants to access its power
Standalone
Within
Platforms
Within
Solutions
Spark as a Service (Bluemix)
IBM Open Platform (w/ Spark)
BigInsights on Cloud (w/ Spark)
IBM Streams
…many others underway
Analytics
Commerce
Watson Health
…many others underway
© 2016 IBM Corporation40
IBM’s vision for IBM Analytics for Apache Spark (Spark-
as-a-Service)
We make Spark
ACCESSIBLE,
INTEGRATED and
POWERFUL
© 2016 IBM Corporation41
IBM Bluemix (https://console.ng.bluemix.net/)
© 2016 IBM Corporation42
Data Scientist Workbench (http://www.datascientistworkbench.com/)
© 2016 IBM Corporation43
Big Data University (http://bigdatauniversity.com/)
© 2016 IBM Corporation
IBM big data • IBM big data • IBM big data
IBM big data • IBM big data • IBM big data
IBM
big
da
ta •
IB
M b
ig d
ata
IBM
big
data
• IBM
big
data
THINK
© 2016 IBM Corporation45
Landing and
Archive Zone
Real-time
Analytics
Zone
Enterprise Warehouse
and Mart Zone
Information Governance, Security and Business Continuity
Analytic Appliances
Big Data Platform Capabilities
Streaming Data
Text Data
Applications Data
Time Series
Geo Spatial
Relational
• Information Ingest
• Real Time Analytics
• Warehouse & Data Marts
• Analytic Appliances
Social Network
Video &
Image
All Data Sources
Advanced Analytics /
New InsightsNew / Enhanced
Applications
Automated Process
Case Management
Analytic Applications
CognitiveLearn Dynamically?
PrescriptiveBest Outcomes?
PredictiveWhat Could Happen?
DescriptiveWhat Has Happened?
Exploration and
DiscoveryWhat Do You Have?
Watson
Cloud Services
ISV Solutions
Alerts
IBM Big Data and analytics sample architecture
Ingestion
and
Operational
Information
top related