making sense at scale with the berkeley data analytics · pdf filemaking sense at scale with...
TRANSCRIPT
Making Sense at Scale with the���Berkeley Data Analytics Stack���
UC BERKELEY
Michael FranklinFebruary 3, 2015
WSDM 2015 Shanghai
Agenda• A Little Bit on Big Data
• AMPLab Overview
• BDAS Philosophy: Unification not Specialization
• Spark, GraphX, and other BDAS components
• Wrap Up: Thoughts on Big Data Software
Sources Driving Big DataIt’s All Happening On-‐line
Every: Click Ad impression Billing event Fast Forward, pause,… Friend Request Transaction Network message Fault …
User Generated (Web & Mobile)
…..
Internet of Things / M2M Scientific Computing
Big Data – A Bad Definition
Data sets, typically consisting of billions or trillions of records, that are so vast and complex that they require new and powerful computational resources to process.
- Dictionary.com
<Big Data as a Problem!>
Big Data as a Resource“For a long time, we thought that Tamoxifen was roughly 80% effective for breast cancer patients. But now we know much more:
we know that it’s 100% effective in 70% to 80% of the patients, and ineffective in the rest.”
- Tim O’Reilly et al. “How Data Science is Transforming Health Care”
With enough of the right data you can determine precisely who the treatment will work for.
Thus, even a 1% effective drug could save lives
A Big Data Pattern
6
750,000+ downloads
AMPLab: Integrating 3 Resources
Algorithms
• Machine Learning, Statistical Methods • Prediction, Business Intelligence
Machines
• Clusters and Clouds • Warehouse Scale Computing
People
• Crowdsourcing, Human Computation • Data Scientists, Analysts
AMPLab OverviewUC BERKELEY
• 70+ Students, Postdocs, Faculty and Staff from:Databases, Machine Learning, Systems, Security, and Networking
• 50/50 Split: Industry Sponsors and GovtWhite House Big Data Program:NSF CISE Expeditions in Computing and Darpa XData
• Fixed Timeline (ends Dec 2016); Collaborative Working Space
See Dave Patterson “How to Build a Bad Research Center”, CACM March, 2014
“… Berkeley’s AMPLab has already left an indelible mark on world of information technology, and even the web. But we haven’t yet experienced the
full impact of the group … Not even close.” – Derrick Harris, GigaOM, Aug 2, 2014
Franklin Jordan Stoica Patterson ShenkerRechtKatzJosephGoldbergCuller
A Nexus of Industrial Engagement• Open Source Software – Industrial-Strength • Twice-yearly 3-day offsite retreats; AMPCamp training
Open Source Community Building���
MeetUp on MLBase @Twitter (Aug 6, 2013)
Spark Summit SF (June 30, 2014)
Big Data Ecosystem Evolution
MapReduce
Pregel
Dremel
GraphLab Storm
Giraph
Drill Tez
Impala
S4 …
Specialized systems (iterative, interactive and"
streaming apps)
General batch"processing
AMPLab Unification PhilosophyDon’t specialize MapReduce – Generalize it!
Two additions to Hadoop MR can enable all the models shown earlier!
1. General Task DAGs
2. Data Sharing
For Users: Fewer Systems to Use Less Data Movement
Spark St
ream
ing
Gra
phX
… Sp
arkS
QL
MLb
ase
Berkeley Data Analytics Stack���(Apache and BSD open source)
ResourceVirtualization
Storage
ProcessingEngine
Access andInterfaces
In-houseApps
In-MemoryDataflowSystem
M. Zaharia, M. Choudhury, M. Franklin, I. Stoica, S. Shenker, “Spark: Cluster Computing with Working Sets, USENIX HotCloud, 2010.
“It’s only September but it’s already clear that 2014 will be the year of Apache Spark”
-- Datanami, 9/15/14
• Developed in AMPLab and its predecessor the RADLab• Alternative to Hadoop MapReduce
• 10-100x speedup for ML and interactive queries• Central component of the BDAS Stack• “Graduated” to Apache Foundation -> Apache Spark
Apache Spark Contributors: ���
0
25
50
75
100
2011 2012 2013 2014
400+ contributors to current release
Apache Spark: ���Compared to Other Projects
Map
Redu
ce
YARN H
DFS Stor
m
Spar
k
0200400600800
100012001400160018002000
Map
Redu
ce
YARN
HD
FS
Stor
m
Spar
k
0
50000
100000
150000
200000
250000
300000
350000
Commits Lines of Code ChangedActivity in past 6 months
2-3x more activity than: Hadoop, Storm, MongoDB, NumPy, D3, Julia, …
Iteration in Map-Reduce
TrainingData
Map Reduce LearnedModel
w(1)
w(2)
w(3)
w(0)
InitialModel
Cost of Iteration in Map-ReduceMap Reduce Learned
Model
w(1)
w(2)
w(3)
w(0)
InitialModel
TrainingData
Read 2Repeatedly ���load same data
Cost of Iteration in Map-ReduceMap Reduce Learned
Model
w(1)
w(2)
w(3)
w(0)
InitialModel
TrainingDataRedundantly save ���output between ���
stages
Dataflow View
Training Data
(HDFS)
Map
Reduce
Map
Reduce
Map
Reduce
Memory Opt. Dataflow
Training Data
(HDFS)
Map
Reduce
Map
Reduce
Map
Reduce
CachedLoad
Memory Opt. Dataflow View
Training Data
(HDFS)
Map
Reduce
Map
Reduce
Map
Reduce
Efficiently���move data between ���stages
Spark:10-100× faster than Hadoop MapReduce
Resilient Distributed Datasets (RDDs)API: coarse-grained transformations (map, group-by, join, sort, filter, sample,…) on immutable collections
Resilient Distributed Datasets (RDDs)» Collections of objects that can be stored in memory or
disk across a cluster» Built via parallel transformations (map, filter, …)» Automatically rebuilt on failure
Rich enough to capture many models:» Data flow models: MapReduce, Dryad, SQL, …» Specialized models: Pregel, Hama, …
M. Zaharia, et al, Resilient Distributed Datasets: A fault-tolerant abstraction for in-memory cluster computing, NSDI 2012.
Abstraction: Dataflow Operators
map
filter
groupBy
sort
union
join
leftOuterJoin
rightOuterJoin
reduce
count
fold
reduceByKey
groupByKey
cogroup
cross
zip
sample
take
first
partitionBy
mapWith
pipe
save
...
Fault Tolerance with RDDsRDDs track the series of transformations used to build them (their lineage)» Log one operation to apply to many elements» No cost if nothing fails
Enables per-node recomputation of lost datamessages = textFile(...).filter(_.contains(“error”)) .map(_.split(‘\t’)(2))
HadoopRDD path = hdfs://…
FilteredRDD func = _.contains(...)
MappedRDD func = _.split(…)
A Unified System for SQL & MLdef logRegress(points: RDD[Point]): Vector { var w = Vector(D, _ => 2 * rand.nextDouble - 1) for (i <- 1 to ITERATIONS) { val gradient = points.map { p => val denom = 1 + exp(-p.y * (w dot p.x)) (1 / denom - 1) * p.y * p.x }.reduce(_ + _) w -= gradient } w } val users = sql2rdd("SELECT * FROM user u JOIN comment c ON c.uid=u.uid") val features = users.mapRows { row => new Vector(extractFeature1(row.getInt("age")), extractFeature2(row.getStr("country")), ...)} val trainedVector = logRegress(features.cache())
Deep integration of SQL and Spark
Both share the same set of workers and caches
Can move seamlessly between SQL and Machine Learning worlds
R. Xin, J. Rosen, M. Zaharia, M. Franklin,S. Shenker, I. Stoica, “Shark: SQL and Rich Analytics at Scale, SIGMOD 2013.
Spark StreamingMicrobatch approach provides lower latency
Additional operators provide windowed operations
M. Zaharia, et al, Discretized Streams: Fault-Tollerant Streaming Computation at Scale, SOSP 2013.
Batch/Streaming UnificationBatch and streaming codes virtually the same» Easy to develop and maintain consistency
// count words from a file (batch) val file = sc.textFile("hdfs://.../pagecounts-*.gz") val words = file.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print()
// count words from a network stream, every 10s (streaming) val ssc = new StreamingContext(args(0), "NetCount", Seconds(10), ..) val lines = ssc.socketTextStream("localhost”, 3456) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start()
Apache Spark v1.2 (12/14)
Includes» Spark (core)» Spark Streaming» GraphX (alpha release)» MLlib» SparkSQL – Query
Processing
Wide range of interfaces:» Python / interactive ipython» Scala / interactive scala shell» R / interactive R-shell» Java
Now included in all major Hadoop Distributions
Graph Analytics
Raw Wikipedia
< / >< / >< / >XML
Hyperlinks PageRank Top 20 PagesTitle PR
Link TableTitle Link
Editor GraphCommunityDetection
User Community
User Com.
DiscussionTable
User Disc.
Top CommunitiesCom. PR..
Separate Systems
Tables Graphs
Separate Systems
GraphsDataflow Systems
Table
Result
Row
Row
Row
Row
Separate SystemsDataflow Systems Graph Systems
Dependency Graph
6. Before
8. After
7. After
Table
Result
Row
Row
Row
Row
Difficult to UseUsers must Learn, Deploy, and Manage
multiple systems
Leads to brittle and often ���complex interfaces
34
22
354
1340
0 200 400 600 800 1000 1200 1400 1600
GraphLab
Spark
Hadoop
Runtime (in seconds, PageRank for 10 iterations)Live-Journal Graph
Efficiencies are Possible
Specialized Graph System can be faster thangeneral MR computation
But
36
Extensive data movement and duplication across ���the network and file system
< / >< / >< / >XML
HDFS HDFS HDFS HDFS
Limited reuse internal data-structures ���across stages
GraphX – Adding Graphs to the Mix
GraphX UnifiedRepresentation
Graph ViewTable View
Tables and Graphs are composable views of the same physical data
Each view has its own operators that exploit the semantics of the view to achieve efficient execution
J. Gonzalez, R. Xin, A. Dave, D. Crankshaw, M. Franklin, I. Stoica, “GraphX: Graph Processing in a Distributed Dataflow Framework“ OSDI Conf., Oct 2014
Representation
Optimizations
DistributedGraphs
HorizontallyPartitioned Tables
Join
VertexPrograms
DataflowOperators
Advances in Graph Processing SystemsDistributed Join Optimization
Materialized View Maintenance
Property Graph Data Model
B C
A D
F E
A DD
Property Graph
B C
D
E
AA
F
Vertex Property:• User Profile• Current PageRank Value
Edge Property:• Weights• Timestamps
Part. 2
Part. 1
Vertex Table
(RDD)
B C
A D
F E
A D
Encoding Property Graphs as Tables
D
Property Graph
B C
D
E
AA
F
Machine 1
Machine 2
Edge Table(RDD)
A B
A C
C D
B C
A E
A F
E F
E D
B
C
D
E
A
F
RoutingTable
(RDD)
B
C
D
E
A
F
1
2
1 2
1 2
1
2
Vertex Cut
class Graph [ V, E ] { def Graph(vertices: Table[ (Id, V) ], edges: Table[ (Id, Id, E) ])
// Table Views ----------------- def vertices: Table[ (Id, V) ] def edges: Table[ (Id, Id, E) ] def triplets: Table [ ((Id, V), (Id, V), E) ] // Transformations ------------------------------ def reverse: Graph[V, E] def subgraph(pV: (Id, V) => Boolean,
pE: Edge[V,E] => Boolean): Graph[V,E] def mapV(m: (Id, V) => T ): Graph[T,E] def mapE(m: Edge[V,E] => T ): Graph[V,T] // Joins ---------------------------------------- def joinV(tbl: Table [(Id, T)]): Graph[(V, T), E ] def joinE(tbl: Table [(Id, Id, T)]): Graph[V, (E, T)] // Computation ---------------------------------- def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)], reduceF: (T, T) => T): Graph[T, E]
}
Graph Operators (Scala)
41
class Graph [ V, E ] { def Graph(vertices: Table[ (Id, V) ], edges: Table[ (Id, Id, E) ])
// Table Views ----------------- def vertices: Table[ (Id, V) ] def edges: Table[ (Id, Id, E) ] def triplets: Table [ ((Id, V), (Id, V), E) ] // Transformations ------------------------------ def reverse: Graph[V, E] def subgraph(pV: (Id, V) => Boolean,
pE: Edge[V,E] => Boolean): Graph[V,E] def mapV(m: (Id, V) => T ): Graph[T,E] def mapE(m: Edge[V,E] => T ): Graph[V,T] // Joins ---------------------------------------- def joinV(tbl: Table [(Id, T)]): Graph[(V, T), E ] def joinE(tbl: Table [(Id, Id, T)]): Graph[V, (E, T)] // Computation ---------------------------------- def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)], reduceF: (T, T) => T): Graph[T, E]
}
Graph Operators (Scala)
42
def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)], reduceF: (T, T) => T): Graph[T, E]
capture the Gather-Scatter pattern fromGraphLab
Graph System Optimizations
43
Specialized���Data-Structures
Vertex-CutsPartitioning
RemoteCaching / Mirroring
Message Combiners Active Set Tracking
0500
100015002000250030003500
0100020003000400050006000700080009000
Twitter Graph (42M Vertices,1.5B Edges) UK-Graph (106M Vertices, 3.7B Edges)
PageRank Benchmark
GraphX performs comparably to ���state-of-the-art graph processing systems.
Runt
ime
(Sec
onds
)
EC2 Cluster of 16 x m2.4xLarge Nodes + 1GigE
7x 18x
The GraphX Stack���(Lines of Code)
GraphX (2,500)
Spark (30,000)
GraphLab/Pregel API (34)
PageRank (20)
Connected Comp. (20)
K-core(60) Triangle
Count(50)
LDA(220)
SVD++(110)
Some algorithms are more naturally expressed using the GraphX primitive operators
Graphs are just one stage…. ������
What about a pipeline?
HDFSHDFS
ComputeSpark Preprocess Spark Post.
A Small Pipeline in GraphX
Timed end-to-end GraphX is the fastest
Raw Wikipedia
< / >< / >< / >XML
Hyperlinks PageRank Top 20 Pages
0 200 400 600 800 1000 1200 1400 1600
GraphXGraphLab + Spark
Giraph + SparkSpark
Total Runtime (in Seconds)
605375
1492
342
MLBase: Distributed ML Made EasyDB Query Language Analogy:
Specify What not How
MLBase chooses:• Algorithms/Operators • Ordering and Physical
Placement• Parameter and
Hyperparameter Settings
• FeaturizationLeverages Spark for Speed and Scale
T. Kraska, A. Talwalkar, J. Duchi, R. Griffith, M. Franklin, M. Jordan, “MLBase: A Distributed Machine Learning System”, CIDR 2013.
ML Pipeline Generation & Optimization
MLBase Optimizer Focus:
• Better Resource Utilization
• Algorithmic Speedups
• Reduced Model Search Time
Ex: Image Classification Pipeline
E. Sparks, S. Venkataraman, M. Franklin , B. Recht, “ML Pipelines”, In Process. (see AMPLab Blog for an overview)
50
Data ModelWhere do models go?
Conference���Papers
SalesReports
DriveActions
Training
Introducing Velox: Model Serving
Driving Actions
51
Suggesting Items���at Checkout
Fraud ���Detection
Cognitive ���Assistance
Internet ofThings
Low-Latency Personalized Rapidly Changing
Problem: Separate Systems
52
Offline Analytics���Systems
6. Before
8. After
7. After
Sophisticated ML ���on static data.
Low-Latency ���data serving
How do we serve low-latency predictions and ���train on live data?
Online ServingSystems
MongoDB
Velox Model Serving SystemDecompose personalized predictive models:
53
[CIDR’15]
Velox Model Serving SystemDecompose personalized predictive models:
54
[CIDR’15]
Split
PersonalizationModel
Feature���Model
OnlineBatch
FeatureCaching
Approx.Features
OnlineUpdates
ActiveLearning
Order-of-magnitude reductions in prediction latencies.
Berkeley Data Analytics Stack���(Apache and BSD open source)
ResourceVirtualization
Storage
ProcessingEngine
Access andInterfaces
In-houseApps
Big Data Architecture Open Questions & Research Issues
• What is the role of a unified stack in a fast-changing software landscape?
• Single-node vs. Cluster ; Elastic Cloud vs HPC
• GPUs, FPGAs, XYZs, ???
• New memory hierarchies: SSDs, RDMA, …
• Serving vs. Analytics Workloads
• What correctness guarantees are really needed?
Summary• Big Data – yes, there’s hype but it is a Big Deal
• AmpLab project• 6 Yrs, Cross-disciplinary team, Industry engagement• Open Source development and community building
• BDAS philosophy: Unification• Spark + SQL + Graphs + ML + …
• The Big Data landscape continues to evolve• Open Source enables Academic research to play a
huge role
…
To find out more or get involved:
amplab.berkeley.edu
UC BERKELEY
Thanks to NSF CISE Expeditions in Computing, DARPA XData, Founding Sponsors: Amazon Web Services, Google, and SAP,
the Thomas and Stacy Siebel Foundation,all our industrial sponsors and partners, and all the members of the AMPLab Team.