spark on yarn @ netflix
Post on 15-Apr-2017
168 Views
Preview:
TRANSCRIPT
Ashwin ShankarNezih Yigitbasi
Productionizing Spark on Yarn for ETL
Scale
81+ millionmembers
Global 1000+ devices
supported
125 millionhours / day
Netflix Key Business Metrics
40 PB DW Read 3PB Write 300TB700B Events
Netflix Key Platform Metrics
Outline
●Big Data Platform Architecture
●Technical Challenges
●ETL
Big Data Platform Architecture
Cloudapps
Kafka Ursula
Cassandra
Aegisthus
Dimension Data
Event Data
~ 1 min
Daily
S3
SSTables
Data Pipeline
Storage
Compute
Service
Tools
Big Data APIBig Data Portal
S3 Parquet
Metacat
Transport VisualizationQuality Pig Workflow Vis Job/Cluster Vis
Interface
Execution Metadata
Notebooks
• 3000 EC2 nodes on two clusters (d2.4xlarge)• Multiple Spark versions• Share the same infrastructure with MapReduce jobs
S MS MS M
M
…16 vcores
120 GB
MS
MapReduce
S
S MS MS M
MSSpark
S MS MS M
MS
S MS MS M
MS
Spark on YARN at Netflix
$ spark-shell --ver 1.6 …
s3://…/spark-1.6.tar.gzs3://…/spark-2.0.tar.gzs3://…/spark-2.0-unstable.tar.gz
s3://…/1.6/spark-defaults.conf…s3://…/prod/yarn-site.xmls3://../prod/core-site.xml
ConfigurationApplication
Multi-version Support
Technical Challenges
YARN
ResourceManager
NodeManager
SparkAM
RDD
Custom Coalescer Support [SPARK-14042]• coalesce() can only “merge” using the given number
of partitions– how to merge by size?
• CombineFileInputFormat with Hive
• Support custom partition coalescing strategies
• Parent RDD partitions are listed sequentially
• Slow for tables with lots of partitions
• Parallelize listing of parent RDD partitions
UnionRDD Parallel Listing [SPARK-9926]
YARN
ResourceManager
S3 FilesystemRDD
NodeManager
SparkAM
• Unnecessary getFileStatus() call
• SPARK-9926 and HADOOP-12810 yield faster startup
• ~20x speedup in input split calculation
Optimize S3 Listing Performance [HADOOP-12810]
• Each task writes output to a temp directory• Rename first successful task’s temp directory to final
destination
• Problems with S3• S3 rename => copy + delete• S3 is eventually consistent
Hadoop Output Committer
• Each task writes output to local disk• Copy first successful task’s output to S3
• Advantages• avoid redundant S3 copy• avoid eventual consistency
S3 Output Committer
YARN
ResourceManager
Dyna
mic
Allo
catio
n S3 FilesystemRDD
NodeManager
SparkAM
• Broadcast joins/variables• Replicas can be removed with dynamic allocation
Poor Broadcast Read Performance [SPARK-13328]
...16/02/13 01:02:27 WARN BlockManager: Failed to fetch remote block broadcast_18_piece0 (failed attempt 70)...16/02/13 01:02:27 INFO TorrentBroadcast: Reading broadcast variable 18 took 1051049 ms
• Refresh replica locations from the driver on multiple failures
• Cancel & resend pending container requests• if the locality preference is no longer needed• if no locality preference is set
• No locality information with S3
• Do not cancel requests without locality preference
Incorrect Locality Optimization [SPARK-13779]
YARN
ResourceManager
Parquet R/W
Dyna
mic
Allo
catio
n S3 FilesystemRDD
NodeManager
SparkAM
A B C D
a1 b1 c1 d1
… … … …
aN bN cN dNA B C D
dictionary
from “Analytic Data Storage in Hadoop”, Ryan Blue
Parquet Dictionary Filtering [PARQUET-384*]
DF disabled DF enabled 64MB split
DF enabled _x000d_1G split
0
10
20
30
40
50
60
70
80
Time
~8x ~18x
Parquet Dictionary Filtering [PARQUET-384*]Av
g. C
ompl
etion
Tim
e [m
]
Property Value Descriptionspark.sql.hive.convertMetastoreParquet true enable native Parquet read path
parquet.filter.statistics.enabled true enable stats filtering
parquet.filter.dictionary.enabled true enable dictionary filtering
spark.sql.parquet.filterPushdown true enable Parquet filter pushdown optimization
spark.sql.parquet.mergeSchema false disable schema merging
spark.sql.hive.convertMetastoreParquet.mergeSchema
false use Hive SerDe instead of built-in Parquet support
How to Enable Dictionary Filtering?
Efficient Dynamic Partition Inserts [SPARK-15420*]• Parquet buffers row group data for each file during
writes
• Spark already sorts before writes, but has some limitations
• Detect if the data is already sorted
• Expose the ability to repartition data before write
YARN
ResourceManager
Parquet R/W
Dyna
mic
Allo
catio
n
Spar
k Hi
stor
y Se
rver
S3 FilesystemRDD
NodeManager
SparkAM
Spark History Server – Where is My Job?
• A large application can prevent new applications from showing up• not uncommon to see event logs of GBs
•SPARK-13988 makes the processing multi-threaded
•GC tuning helps further• move from CMS to G1 GC• allocate more space to young generation
Spark History Server – Where is My Job?
ExtractTransformLoad
group
foreach
join
foreach + filter + store
join foreach
foreach
join
join
join
load + filter
load + filter
load + filter
load + filter
load + filter
load + filter
Pig vs. Spark: Job #1
Series10
100
200
300
400
Pig Spark PySpark
~2.4x ~2x
Avg.
Com
pleti
on T
ime
[s]
Pig vs. Spark (Scala) vs. PySpark
load + filter
cogroup
join
order by + store
foreach
load + filter
load + filter
join
foreach
foreach
foreach
foreach
foreach
Pig vs. Spark: Job #2
Series1050
100150200250300350400450
Pig Spark PySpark
~3.2x ~1.6x
Avg.
Com
pleti
on T
ime
[s]
Pig vs. Spark (Scala) vs. PySpark
Prototype
DeployBuild Run
S3
Production Workflow
• A rapid innovation platform for targeting algorithms
• 5 hours (vs. 10s of hours) to compute similarity for all Netflix profiles for 30-day window of new arrival titles
• 10 minutes to score 4M profiles for 14-day window of new arrival titles
Production Spark Application #1: Yogen
• Personalized ordering of rows of titles
• Enrich page/row/title features with play history
• 14 stages, ~10Ks of tasks, several TBs
Production Spark Application #2: ARO
What’s Next?• Improved Parquet support
• Better visibility
• Explore new use cases
Questions?
top related