spark motivation - db.lcs.mit.edudb.lcs.mit.edu/6.830/lectures/lec20_2018.pdf · spark motivation...
TRANSCRIPT
SPARK MOTIVATION
MapReduce simplified “big data” analysis on large, unreliable clusters
But as soon as organizations started using it widely, users wanted more:
• More complex, multi-stage applications
• More interactive queries
• More low-latency online processing
SPARK MOTIVATION
Complex jobs, interactive queries and online processing all need one thing that MR lacks:
Efficient primitives for data sharing
Sta
ge
1
Sta
ge
2
Sta
ge
3
Iterative job
Query 1
Query 2
Query 3
Interactive mining
Job
1
Job
2 …
Stream processing
EXAMPLES
iter. 1 iter. 2 . . .
Input
HDFS read
HDFS write
HDFS read
HDFS write
Input
query 1
query 2
query 3
result 1
result 2
result 3
. . .
HDFS read
EXAMPLES
iter. 1 iter. 2 . . .
Input
HDFS read
HDFS write
HDFS read
HDFS write
Input
query 1
query 2
query 3
result 1
result 2
result 3
. . .
HDFS read
Problem: in MR, only way to share data across jobs is stable storage (e.g. file
system) -> slow!
iter. 1 iter. 2 . . .
Input
GOAL: IN-MEMORY DATA SHARING
Distributed memory
Input
query 1
query 2
query 3
. . .
one-timeprocessing
10-100× faster than network and disk
SOLUTION: RESILIENT DISTRIBUTED DATASETS (RDDS)
Partitioned collections of records that can be stored in memory across the cluster
Manipulated through a diverse set of transformations (map, filter, join, etc)
Fault recovery without costly replication
• Remember the series of transformations that built an RDD (its lineage) to recompute lost data
Scala programming language
EXAMPLE: LOG MINING
Load error messages from a log into memory, then interactively search for various patterns
lines = spark.textFile(“hdfs://...”) errors = lines.filter(_.startsWith(“ERROR”)) messages = errors.map(_.split(‘\t’)(2)) messages.cache()
Block 1
Block 2
Block 3
Worker
Worker
Worker
Driver
messages.filter(_.contains(“foo”)).countmessages.filter(_.contains(“bar”)).count. . .
tasks
results
Cache 1
Cache 2
Cache 3
Base RDDTransformed RDD
Result: full-text search of Wikipedia in <1 sec (vs 20 sec for on-disk data)Result: scaled to 1 TB data in 5-7 sec
(vs 170 sec for on-disk data)
Fault Tolerance
Types of Dependencies
Scheduling Stages
Not Cached RDD
Cached RDD
EXAMPLE: LINEAR REGRESSION
Find best line separating two sets of points
+
–
+ ++
+
+
++ +
– ––
–
–
–– –
+
target
–
random initial line
LINEAR REGRESSION CODE
val data = spark.textFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { val gradient = data.map(p => (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _) w -= gradient}
println("Final w: " + w)
LINEAR REGRESSION PERFORMANCE
Run
ning
Tim
e (s
)
0
1000
2000
3000
4000
Number of Iterations
1 5 10 20 30
HadoopSpark
127 s / iteration
first iteration 174 s further iterations 6
s
OTHER PROJECTS
Hive on Spark (SparkSQL): SQL engine
Spark Streaming: incremental processing with in-memory state
MLLib: Machine learning library
GraphX: Graph processing on top of Spark
OTHER RESOURCES
Hadoop: http://hadoop.apache.org/common
Pig: http://hadoop.apache.org/pig
Hive: http://hadoop.apache.org/hive
Spark: http://spark-project.org
Hadoop video tutorials: www.cloudera.com/hadoop-training
Amazon Elastic MapReduce:http://aws.amazon.com/elasticmapreduce/
Problem with Existing Systems
and many others….
Flink
Unique Opportunity: Enterprise ClustersMost important differences: • Cluster Size
• Median Hadoop cluster: <10 nodes 1 • 65% of Hadoop clusters: <50 nodes 1
• Memory Size 1-8 TB
• More reliable hardware • More advanced CPU features • Fast interconnects (e.g.,
Infiniband FDR 4x)
1Nadkarni and L. DuBois. Trends in enterprise Hadoop deployments. IDC, 2013.
Can You Use Hadoop/Spark/Flink on these clusters
20
You can…
21
… but probably not the best idea
22
- Built from scratch for enterprise clusters
- No runtime, no fault-tolerance, no complex resource sharing,…
- Co-design of language, framework and compiler for a specific type of hardware
- Key ingredient: Tupleware compiles workflows into distributed programs
23
Architecture
1. Programming model with explicit shared state.
2. Code generation
3. Distributed program deployment instead of run-time
d
d
24
Compiler/HPC
Low-level information Minimize indirection Hardware-dependent
Tupleware Optimizations
DBMS
High-level rewrites Language semantics
Data Statistics
New Hybrid Optimizations 1) Program Structure (H1) 2) Context Variables (H2) 3) Selection Strategies (H3) ….
….
Andrew Crotty, Alex Galakatos, Kayhan Dursun, Tim Kraska, Carsten Binnig, Ugur Çetintemel, Stan Zdonik: An Architecture for Compiling UDF-centric Workflows. PVLDB 8(12):1466-1477 (2015)
Tupleware Achieves Orders-Of-Magnitude Performance Improvements
• Amazon EC2 (10 x c3.8xlarge, 600GB memory) ! Does not include RDMA benefits to be more fair
• Common machine learning workflows • Important: (1) same algorithm in all systems; (2) log scale
[VLDB15, CIDR15] 26
STREAMING
6.830 / 6.814 LECTURE 20 TIM KRASKA
STREAM PROCESSING
Data
Query Result
ResultQueryData
SIMPLE STREAM PROCESSING
STATELESS CONVERSION
STATEFUL CONVERSATION
WINDOW TYPES
e1
e2
e3
e4
e5
e6
e7
e8
t0
t1
t2
t5
t6
t7
t8
t9
Sliding Tumbling Landmark
Count-based of 4 step-size 1
Note, landmark windows are pretty exotic and most commonly Used with a size of 1 and a step-size of 1.
…
WINDOW TYPES
e1
e2
e3
e4
e5
e6
e7
e8
t1
t2
t3
t4
t9
t10
t11
t12
Sliding Tumbling Landmark
Time-based of 5 step-size 2
…
STREAM PROCESSING AS CHAINS
BIG DATA STREAM CHALLENGES
• Large amounts of data to process in realtime
• Examples
• Social network trends (#trending)
• Intrusion detection systems (networks, datacenters)
• Sensors: Detect earthquakes by correlating vibrations of millions of smartphones
• Fraud detection (e.g., Visa: 2000 txn / sec on average, peak ~47,000 / sec )
PROCESSING
PARALLIZATION AND FAULT-TOLERANCE
How to ensure exact once semantics?
JOINS?
SPARK STREAMING
40
Spark
Spark Streaming
batches of X seconds
live data stream
processed results
▪ Chop up the live stream into batches of X seconds
▪ Spark treats each batch of data as RDDs and processes them using RDD operations
▪ Finally, the processed results of the RDD operations are returned in batches
DISCRETIZED STREAM PROCESSING Run a streaming computation as a series of very small, deterministic batch jobs
DISCRETIZED STREAM PROCESSING Run a streaming computation as a series of very small, deterministic batch jobs
41
Spark
Spark Streaming
batches of X seconds
live data stream
processed results
▪ Batch sizes as low as ½ second, latency ~ 1 second
▪ Potential for combining batch processing and streaming processing in the same system
EXAMPLE – GET HASHTAGS FROM TWITTER
val tweets = ssc.twitterStream()val hashTags = tweets.flatMap (status => getTags(status))hashTags.saveAsHadoopFiles("hdfs://...")
output operation: to push data to external storage
flatMap flatMap flatMap
save save save
batch @ t+1batch @ t batch @
t+2tweets DStreamhashTags DStream Every batch saved to
HDFS, Write to database, update analytics UI, do
whatever appropriate
SIMILAR COMPONENTSHigher throughput than Storm
• Spark Streaming: 670k records/second/node • Storm: 115k records/second/node • Apache S4: 7.5k records/second/node
WordCount
Thro
ughp
ut p
er
node
(M
B/s)
0
7.5
15
22.5
30
Record Size (bytes)
100 1000
SparkStorm
Grep
Thro
ughp
ut
per
node
(M
B/s)
0
40
80
120
Record Size (bytes)
100 1000
SparkStorm
FAULT-TOLERANCERDDs are remember the sequence of operations that created it from the original fault-tolerant input data
Batches of input data are replicated in memory of multiple worker nodes, therefore fault-tolerant
Data lost due to worker failure, can be recomputed from input data
input data replicated in memory
flatMap
lost partitions recomputed
on other workers
tweets RDD
hashTags RDD
TRILL
TRILL – USE CASES
• Real-time
• Monitor app telemetry (e.g., ad clicks)
• Real-time with historical
• Correlate live data stream with historical activity
• Offline
• Develop initial monitoring queries using logs
• Back-testing
• Progressive
• Non-temporal analysis over large data sets. For example, get quick approximate results
Use Cases in Microsoft
- Azure Stream Analytics Cloud service
- Bing Ads
- Halo game monitoring
- ….
EXAMPLE
Define even data-type in C#struct ClickEvent { long ClickTime; long User; long AdId; }
Define ingressvar str = Network.ToStream(e => e.ClickTime, Latency(10secs));
Write Query (in C# app)var query =
str.Where(e => e.User % 100 < 5) .Select( e => { e.AdId }) .GroupApply( e => e.AdId,
s => s.Window(5min).Aggregate(w => w.Count()));
Subscribe to resultsquery.Subscribe(e => Console.Write(e));
LATENCY-THROUGHPUT SPECTRUM
• Data organized as stream of batches• User specifies latency (e.g., 10s)
• Batch up to 10sec of data
• Small batches ! Low latency
• Large batches ! High throughput • More load ! larger batches
• Punctation forces batches at time limit• Columnar format• Bitvector to indicate row absence
class DataBatch { long[] SyncTime;
... Bitvector BV;
}
FABRIC & LANGUAGE INTEGRATION
• User view is row-oriented• Our example filter (where)
str.Where(e => e.User % 100 < 5)
• General Technique• Generate tight loops over batches with
inlined expressions
• Avoid methods calls within loops
• Timestamps are also columns
PERFORMANCE
APACHE STORM
• Architectural components • Data: streams of tuples, e.g., Tweet = <Author, Msg, Time> • Sources of data: “spouts” • Operators to process data: “bolts” • Topology: Directed graph of spouts & bolts
• Multiple processes (tasks) run per bolt
• Incoming streams split among tasks • Shuffle Grouping: Round-robin distribute tuples to tasks
• Fields Grouping: Partitioned by key / field
• All Grouping: All tasks receive all tuples (e.g., for joins)
APACHE STORM - FAULT TOLERANCE
FAULT TOLERANCE VIA RECORD ACKNOWLEDGEMENT (APACHE STORM -- AT LEAST ONCE SEMANTICS)