evolution from shark to spark sql -...
TRANSCRIPT
INS
TIT
UTE
OF
CO
MP
UTIN
G T
EC
HN
OL
OG
Y
Evolution From Shark To Spark SQL: Preliminary Analysis and Qualitative Evaluation
Xinhui Tian and Xiexuan Zhou
Institute of Computing Technology, Chinese Academy of Sciences
and University of Chinese Academy of Sciences
BPOE-6 Sep 4, 2015
BPOE 2015 2
Outline
Background
Evolution from Shark to Spark SQL
Evaluation
Conclusion
BPOE 2015 3
Background
The exploding growth of the market of big data analytics systems
Variety of big data analytics systems
Fast evolution of each system
BPOE 2015 4
Hadoop’s versions
Trunk development (source of new features)
2009 2010 2011 2012 2013 2014
0.20.1 0.20.2
0.21.0 0.22.0
0.23.0 0.23.1 0.23.3 0.23.5 0.23.9
0.20.203
0.20.2041.0.0
1.0.1 1.0.3 1.0.4 1.2.0
2.0.0-alpha2.0.2-alpha 2.0.5-alpha 2.2.0 2.3.0
0.23.10
2.4.0
0.23.11
2.5.0 2.6.0
BPOE 2015 5
Spark’s versions
2012 2013 2014
0.6.0
Released versions
2015
0.6.1
0.6.2
0.7.0
0.7.2
0.7.3
0.8.0 0.8.1 0.9.0
0.9.1
1.0.01.0.1
1.0.2
1.1.0
1.1.1
1.2.0
1.2.1
1.3.01.3.1
0.9.2
1.2.2
1.4.0 1.4.1
Main Versions
Streaming Alpha
MLlib
Streaming Stable
GraphX Alpha
Spark SQL
GraphX Stable
Shark 0.2.0 Shark 0.7.0
Shark 0.8.0
Shark 0.9.0
Libraries
BPOE 2015 6
Background
The fast evolution poses new challenges:
For users, difficult to tune the configurations
For developers, difficult to program
For researchers, difficult to evaluate and analyze
What are we supposed to do?
BPOE 2015 7
Motivation
To investigate the version updates of big data systems
Users: • To understand how the additional configurations or
features affect the execution
Developers:
• To study how to achieve better performance and reliability
• To develop new systems
BPOE 2015 8
Outline
Background
Evolution from Shark to Spark SQL
Evaluation
Conclusion
BPOE 2015 9
Our Target System: Spark
A general-purpose engine based on the abstraction of resilient distributed datasets (RDD)
Developed by the AMPLab of UC Berkeley
Apache top-level project since 2014
RDD:
Data objects reside in memory
In-memory data sharing
Lineage-based fault tolerance
Matei Zaharia, et al. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. NSDI, 2012.
BPOE 2015 11
Programming Model of Spark
Two types of RDD operations:
Transformations
• Each transformation creates a new rdd
• map, mapPartitions, groupByKey, sortByKey…
Actions
• Reduce, Collect, Count…
Lazy scheduling
Not start a job until an action operation appears
BPOE 2015 12
Spark Scheduling A Job consists of many stages
Depend on the dependencies of RDDs
join
union
groupBy
map
Stage 3
Stage 1
Stage 2
A: B:
C: D:
E:
F:
G:
= cached data partition
Matei Zaharia, et al. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. NSDI, 2012.
BPOE 2015 13
Example: PageRank 1. Start each page with a rank of 1
2. On each iteration, update each page’s rank to
Σi∈neighbors ranki / |neighborsi|
links = // RDD of (url, neighbors) pairs ranks = // RDD of (url, rank) pairs
for (i <- 1 to ITERATIONS) { ranks = links.join(ranks).flatMap { (url, (links, rank)) => links.map(dest => (dest, rank/links.size)) }.reduceByKey(_ + _) }
reduce
Contribs0
join
join
Contribs2
Ranks0 (url, rank)
Links (url, neighbors)
. . .
Ranks2
reduce
Ranks1
BPOE 2015 14
Spark’s versions
2012 2013 2014
0.6.0
Released versions
2015
0.6.1
0.6.2
0.7.0
0.7.2
0.7.3
0.8.0 0.8.1 0.9.0
0.9.1
1.0.01.0.1
1.0.2
1.1.0
1.1.1
1.2.0
1.2.1
1.3.01.3.1
0.9.2
1.2.2
1.4.0 1.4.1
Main Versions
Streaming Alpha
MLlib
Streaming Stable
GraphX Alpha
Spark SQL
GraphX Stable
Shark 0.2.0 Shark 0.7.0
Shark 0.8.0
Shark 0.9.0
Libraries
BPOE 2015 15
The Fast Evolution of Spark
Rapidly growth of code bases
• 600+ developers
• LOC: From 16,000+ of Spark 0.5.0 to 80,000+ of 1.4.1
Increasing support of various workloads:
• Machine Learning, Graph, Streaming, Database …
http://spark.apache.org
BPOE 2015 16
Database Support on Spark
The Spark community cares about it!
Two layers:
SQL parser: translate a query to a Spark job
Spark engine: process the job submitted by parser
BPOE 2015 17
What did we do?
Analyzing the evolution of query processing on Spark on two layers:
Query parser
• From hive-based Shark to Spark SQL – The differences of optimization rules
Spark core components:
• The evolution of Spark engine – How does it impact the execution of query processing
BPOE 2015 18
Shark vs. Spark SQL Evolution of the query parser:
Shark: A SQL parser based on Hive
Spark SQL: Spark query processing component based on a new SQL parser called Catalyst
Shark Spark SQL
HQL
Hive Parser
Hive Logical Planner
Hive Optimizer
Spark Work Generator
RDD Generator
SQL
CatalystSqlParser
Catalyst Logical Planner
Catalyst Optimizer
HQL
HiveQlParser
SparkPlan Generator
RDD Generator
Query
Abstract Syntax Tree
Logical Plan
Optimized Plan
Physical Plan
RDD
Query Processing Procedure
BPOE 2015 19
An example: a Join query
SELECT sourceIP, sum(adRevenue) as totalRevenue, avg(pageRank) as pageRank FROM rankings R
JOIN
(SELECT sourceIP, destURL, adRevenue FROM uservisits UV WHERE UV.visitDate > “X" AND UV.visitDate < “Y") NUV ON (R.pageURL =NUV.destURL)
GROUP BY sourceIP
ORDER BY totalRevenue DESC LIMIT 1;
1. Select required columns from table uservisits, filter data based on the where condition 2. Select required columns from Table rankings 3. Join data from two tables 4. GroupBy, sum and avg operations 5. Get rows with the largest totalRevenue
A Join query statement: Explanations of each step:
Pavlo et al. A Comparison of Approaches to Large-Scale Data Analysis. SIGMOD 2009.
BPOE 2015 20
Stage0
HiveTableScan
Filter
Project
Exchange
Stage1
HiveTableScan
Exchange
Stage2
ShuffleHashJoin
Project
Partical Aggregate
Exchange
Stage3
Aggregate
TakeOrdered & Limit
From SQL to Spark plans
Stage0
TableScanOperator
FilterOperator
SelectOperator
ReduceSinkOperator
Stage1
TableScanOperator
ReduceSinkOperator
Stage2
JoinOperator
SelectOperator
GroupByPreShuffleOperator
ReduceSinkOperator
Stage3
GroupByPostShuffleOperator
SelectOperator
ReduceSinkOperator
ExtractOperator
LimitOperator
FileSinkOperator
Stage4
Shark Spark SQL
BPOE 2015 21
Stage0
HiveTableScan
Filter
Project
Exchange
Stage1
HiveTableScan
Exchange
Stage2
ShuffleHashJoin
Project
Partical Aggregate
Exchange
Stage3
Aggregate
TakeOrdered & Limit
From SQL to Spark plans
Stage0
TableScanOperator
FilterOperator
SelectOperator
ReduceSinkOperator
Stage1
TableScanOperator
ReduceSinkOperator
Stage2
JoinOperator
SelectOperator
GroupByPreShuffleOperator
ReduceSinkOperator
Stage3
GroupByPostShuffleOperator
SelectOperator
ReduceSinkOperator
ExtractOperator
LimitOperator
FileSinkOperator
Stage4
Shark Spark SQL
Shark: ReduceSink operator Originally Designed for MapReduce, cannot avoid for aggregation and join
Spark SQL: Exchange operator Not added if two operations use the same partition type
BPOE 2015 22
Stage0
HiveTableScan
Filter
Project
Exchange
Stage1
HiveTableScan
Exchange
Stage2
ShuffleHashJoin
Project
Partical Aggregate
Exchange
Stage3
Aggregate
TakeOrdered & Limit
Stage0
TableScanOperator
FilterOperator
SelectOperator
ReduceSinkOperator
Stage1
TableScanOperator
ReduceSinkOperator
Stage2
JoinOperator
SelectOperator
GroupByPreShuffleOperator
ReduceSinkOperator
Stage3
GroupByPostShuffleOperator
SelectOperator
ReduceSinkOperator
ExtractOperator
LimitOperator
FileSinkOperator
Stage4
Shark Spark SQL
Shark: use FileSinkOperator Have to write result into HDFS
Spark SQL: use actions Can return results to driver or be saved as a RDD
BPOE 2015 23
Major differences (SQL->Spark plans)
Stage separated Shark: ReduceSink operator
• Originally Designed for MapReduce, cannot avoid for aggregation and join
Spark SQL: Exchange operator
• Not added if two operations use the same partition type
TerminalOperator Shark: use FileSinkOperator
• Have to write result into HDFS
Spark SQL: use actions
• Can return result to driver or be saved as a RDD
Additional optimization rules Spark SQL:
• Aggregation on small data size can be executed on driver
BPOE 2015 24
From plan to RDD (Join)
Shark: Shuffle operation in ReduceSinkOp
• Serialize the value to reduce size
• A pair of (key, value) is serialized as bytes
Create a new RDD called CoGroupedRDD for JoinOp • Specialized shuffle fetch
HadoopRDD1 HadoopRDD2
RDD { filterOp.processPartition }
RDD {ReduceSinkOp.processPartition}
ShuffledRDD(Repartition) ShuffledRDD(Repartition)
CoGroupedRDD (RDDs)
RDD {ReduceSinkOp.processPartition}
MapPartitionsRDD (ComputeJoin)
HadoopRDD1 HadoopRDD2
MapPartitionsRDD(FilterFunc)
MapPartitionsRDD(ProjectFunc)
ShuffledRDD(Exchange) ShuffledRDD(Exchange)
ZippedPartitionsRDD (ShuffleHashJoin)
MapPartitionsRDD(Exchange) MapPartitionsRDD(Exchange)
Spark SQL: All operations are fitted into
existing RDDs • Just like a normal Spark job
• But may lack low-level execution optimization for query processing
BPOE 2015 25
The Real Execution on Spark Query
Abstract Syntax Tree
Logical Plan
Optimized Logical Plan
Physical Plan
Optimized Physical Plan
RDDs
Spark Job
BPOE 2015 26
Evolution of the Spark Core: From 0.9.1 to 1.3.1
RDD
Job DAG
Operator1 Operator2 OperatorN
Stage1 Stage2 StageN
DAGScheduler
Tasks of each stage
TaskSet TaskSet TaskSet
SubmitMissingTasks
TaskSet
TaskManager TaskManager TaskManager
TaskScheduler.submitTasks
LaunchedTasks:
Seq[Seq[Task]] Seq[Seq[Task]] ...
SchedulerBackend.reviveOffers
runningTasks on each executor
TaskRunner TaskRunner
runningTasks on each executor
TaskRunner TaskRunner
Executor.launchTask
Components 0.9.1 1.3.1 Purpose
DAGScheduler Actor-based Thread-based Better Scalability
Task Submit No serialize Serialize before sending
Optimize communication
Block Transfer Java Nio Netty-based Better network performance
Memory GC MetadataCleaner to periodically clean up states
A daemon thread ContextCleaner and weak refs
Faster memory garbage collection
Scheduler, Communication and GC
BPOE 2015 27
Evolution of the Spark Core: From 0.9.1 to 1.3.1
RDD
Job DAG
Operator1 Operator2 OperatorN
Stage1 Stage2 StageN
DAGScheduler
Tasks of each stage
TaskSet TaskSet TaskSet
SubmitMissingTasks
TaskSet
TaskManager TaskManager TaskManager
TaskScheduler.submitTasks
LaunchedTasks:
Seq[Seq[Task]] Seq[Seq[Task]] ...
SchedulerBackend.reviveOffers
runningTasks on each executor
TaskRunner TaskRunner
runningTasks on each executor
TaskRunner TaskRunner
Executor.launchTask
Components 0.9.1 1.3.1 Purpose
DAGScheduler Actor-based Thread-based Better Scalability
Task Submit No serialize Serialize before sending
Optimize communication
Block Transfer Java Nio Netty-based Better network performance
Memory GC MetadataCleaner to periodically clean up states
A daemon thread ContextCleaner and weak refs
Faster memory garbage collection
Scheduler, Communication and GC
BPOE 2015 28
Evolution of the Spark Core: From 0.9.1 to 1.3.1
RDD
Job DAG
Operator1 Operator2 OperatorN
Stage1 Stage2 StageN
DAGScheduler
Tasks of each stage
TaskSet TaskSet TaskSet
SubmitMissingTasks
TaskSet
TaskManager TaskManager TaskManager
TaskScheduler.submitTasks
LaunchedTasks:
Seq[Seq[Task]] Seq[Seq[Task]] ...
SchedulerBackend.reviveOffers
runningTasks on each executor
TaskRunner TaskRunner
runningTasks on each executor
TaskRunner TaskRunner
Executor.launchTask
Components 0.9.1 1.3.1 Purpose
DAGScheduler Actor-based Thread-based Better Scalability
Task Submit No serialize Serialize before sending
Optimize communication
Block Transfer Java Nio Netty-based Better network performance
Memory GC MetadataCleaner to periodically clean up states
A daemon thread ContextCleaner and weak refs
Faster memory garbage collection
Scheduler, Communication and GC
BPOE 2015 29
Evolution of the Spark Core: From 0.9.1 to 1.3.1
RDD
Job DAG
Operator1 Operator2 OperatorN
Stage1 Stage2 StageN
DAGScheduler
Tasks of each stage
TaskSet TaskSet TaskSet
SubmitMissingTasks
TaskSet
TaskManager TaskManager TaskManager
TaskScheduler.submitTasks
LaunchedTasks:
Seq[Seq[Task]] Seq[Seq[Task]] ...
SchedulerBackend.reviveOffers
runningTasks on each executor
TaskRunner TaskRunner
runningTasks on each executor
TaskRunner TaskRunner
Executor.launchTask
Components 0.9.1 1.3.1 Purpose
DAGScheduler Actor-based Thread-based Better Scalability
Task Submit No serialize Serialize before sending
Optimize communication
Block Transfer Java Nio Netty-based Better network performance
Memory GC MetadataCleaner to periodically clean up states
A daemon thread ContextCleaner and weak refs
Faster memory garbage collection
Scheduler, Communication and GC
BPOE 2015 30
Evolution of the Spark Core: From 0.9.1 to 1.3.1
Components
0.9.1 1.3.1 Purpose
Default Shuffle Manager
Hash Sort Map-side Sort & Spill data
Shuffle Block Manager
File block Adding index block manager
Sort-based shuffle support
Cache Manager
first put data into memory
Large data into disk
Better memory management
Disk Storage Using memory map for reading
Small file direct reading
Better memory usage
Tachyon Support
No Yes Separate computation and storage
Shuffle and Storage
Cache Cache
Stage 0
Stage 1
Block Manager
Memory Block Store
BPOE 2015 31
Outline
Background
Evolution from Shark to Spark SQL
Evaluation
Conclusion
BPOE 2015 32
Benchmarks Versions we investigated:
Spark 0.9.1 with Shark 0.9.1
Spark 1.3.1 with Spark SQL
Three queries are from Pavlo’s paper: Table Scan
Aggregation
Complex Join
Queries are with different conditions for data filtering
Pavlo et al. A Comparison of Approaches to Large-Scale Data Analysis. SIGMOD 2009.
BPOE 2015 33
Spark Configuration
Configuration Value Description
Spark.shuffle.manager hash No Sort operation
Spark.shuffle.compress true Compress shuffle data Decrease the size of shuffle data
Spark.shuffle.memoryFraction 0.2 Memory for shuffle aggregation
Spark.storage.memoryFraction 0.2 Memory for cache, no need here
Spark.default.parallelism 300 Default partition number for shuffle, increase it to decrease memory usage of shuffle
Spark.serializer JavaSerializer Default partition number for shuffle, increase it to decrease memory usage of shuffle
BPOE 2015 34
Performance comparison 9 nodes, each has 16GB memory
Data size: rankings 90 million rows, 5 GB
uservisits 700 million rows, 100 GB
Each runs with 3 different where conditions for different sizes of filtered data (in ascending order)
0
5
10
15
20
25
30
35
40
45
1 2 3
Ru
n T
ime
(s)
Select
SparkSQL Shark
0
100
200
300
400
500
600
700
800
1 2 3
Ru
m T
ime
(s)
Aggregation
Spark SQL Shark
0
200
400
600
800
1000
1200
1 2 3
Ru
n T
ime
(s)
Join
Spark SQL Shark
BPOE 2015 35
Observations But…Spark SQL performs worse than Shark when the size of
filtered data becomes large The huge increment of GC time!
The huge data size of shuffle fetching
0
50
100
150
200
250
Aggregation3 Join3
Tim
e (s
)
GC Time
Shark Spark SQL
0
5
10
15
20
25
30
35
40
Aggregation3 Join3
Dat
a Si
ze (
GB
)
Total Shuffle Read Data Size
Shark Spark SQL
BPOE 2015 36
Conclusions (1)
Spark SQL achieved better when the memory is enough
Users should carefully design the query relied on the size of data
Spark SQL needs more optimizations on data filtering and join mechanism
The low-level execution should have specific mechanism for query processing
“One-size-fits-all” is difficult
BPOE 2015 37
Conclusions (2)
Garbage collection remains to be a main factor impacting the overall performance
Lack of resource awareness and effective memory management
Mainly rely on JVM for memory management and thread scheduling
Produce lots of intermediate data when executing complex computations • The Read-only design of RDD
BPOE 2015 38
Thank you!