spark on yarn @ netflix

Post on 15-Apr-2017

168 Views

Category:

Technology

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Ashwin ShankarNezih Yigitbasi

Productionizing Spark on Yarn for ETL

Scale

81+ millionmembers

Global 1000+ devices

supported

125 millionhours / day

Netflix Key Business Metrics

40 PB DW Read 3PB Write 300TB700B Events

Netflix Key Platform Metrics

Outline

●Big Data Platform Architecture

●Technical Challenges

●ETL

Big Data Platform Architecture

Cloudapps

Kafka Ursula

Cassandra

Aegisthus

Dimension Data

Event Data

~ 1 min

Daily

S3

SSTables

Data Pipeline

Storage

Compute

Service

Tools

Big Data APIBig Data Portal

S3 Parquet

Metacat

Transport VisualizationQuality Pig Workflow Vis Job/Cluster Vis

Interface

Execution Metadata

Notebooks

• 3000 EC2 nodes on two clusters (d2.4xlarge)• Multiple Spark versions• Share the same infrastructure with MapReduce jobs

S MS MS M

M

…16 vcores

120 GB

MS

MapReduce

S

S MS MS M

MSSpark

S MS MS M

MS

S MS MS M

MS

Spark on YARN at Netflix

$ spark-shell --ver 1.6 …

s3://…/spark-1.6.tar.gzs3://…/spark-2.0.tar.gzs3://…/spark-2.0-unstable.tar.gz

s3://…/1.6/spark-defaults.conf…s3://…/prod/yarn-site.xmls3://../prod/core-site.xml

ConfigurationApplication

Multi-version Support

Technical Challenges

YARN

ResourceManager

NodeManager

SparkAM

RDD

Custom Coalescer Support [SPARK-14042]• coalesce() can only “merge” using the given number

of partitions– how to merge by size?

• CombineFileInputFormat with Hive

• Support custom partition coalescing strategies

• Parent RDD partitions are listed sequentially

• Slow for tables with lots of partitions

• Parallelize listing of parent RDD partitions

UnionRDD Parallel Listing [SPARK-9926]

YARN

ResourceManager

S3 FilesystemRDD

NodeManager

SparkAM

• Unnecessary getFileStatus() call

• SPARK-9926 and HADOOP-12810 yield faster startup

• ~20x speedup in input split calculation

Optimize S3 Listing Performance [HADOOP-12810]

• Each task writes output to a temp directory• Rename first successful task’s temp directory to final

destination

• Problems with S3• S3 rename => copy + delete• S3 is eventually consistent

Hadoop Output Committer

• Each task writes output to local disk• Copy first successful task’s output to S3

• Advantages• avoid redundant S3 copy• avoid eventual consistency

S3 Output Committer

YARN

ResourceManager

Dyna

mic

Allo

catio

n S3 FilesystemRDD

NodeManager

SparkAM

• Broadcast joins/variables• Replicas can be removed with dynamic allocation

Poor Broadcast Read Performance [SPARK-13328]

...16/02/13 01:02:27 WARN BlockManager: Failed to fetch remote block broadcast_18_piece0 (failed attempt 70)...16/02/13 01:02:27 INFO TorrentBroadcast: Reading broadcast variable 18 took 1051049 ms

• Refresh replica locations from the driver on multiple failures

• Cancel & resend pending container requests• if the locality preference is no longer needed• if no locality preference is set

• No locality information with S3

• Do not cancel requests without locality preference

Incorrect Locality Optimization [SPARK-13779]

YARN

ResourceManager

Parquet R/W

Dyna

mic

Allo

catio

n S3 FilesystemRDD

NodeManager

SparkAM

A B C D

a1 b1 c1 d1

… … … …

aN bN cN dNA B C D

dictionary

from “Analytic Data Storage in Hadoop”, Ryan Blue

Parquet Dictionary Filtering [PARQUET-384*]

DF disabled DF enabled 64MB split

DF enabled _x000d_1G split

0

10

20

30

40

50

60

70

80

Time

~8x ~18x

Parquet Dictionary Filtering [PARQUET-384*]Av

g. C

ompl

etion

Tim

e [m

]

Property Value Descriptionspark.sql.hive.convertMetastoreParquet true enable native Parquet read path

parquet.filter.statistics.enabled true enable stats filtering

parquet.filter.dictionary.enabled true enable dictionary filtering

spark.sql.parquet.filterPushdown true enable Parquet filter pushdown optimization

spark.sql.parquet.mergeSchema false disable schema merging

spark.sql.hive.convertMetastoreParquet.mergeSchema

false use Hive SerDe instead of built-in Parquet support

How to Enable Dictionary Filtering?

Efficient Dynamic Partition Inserts [SPARK-15420*]• Parquet buffers row group data for each file during

writes

• Spark already sorts before writes, but has some limitations

• Detect if the data is already sorted

• Expose the ability to repartition data before write

YARN

ResourceManager

Parquet R/W

Dyna

mic

Allo

catio

n

Spar

k Hi

stor

y Se

rver

S3 FilesystemRDD

NodeManager

SparkAM

Spark History Server – Where is My Job?

• A large application can prevent new applications from showing up• not uncommon to see event logs of GBs

•SPARK-13988 makes the processing multi-threaded

•GC tuning helps further• move from CMS to G1 GC• allocate more space to young generation

Spark History Server – Where is My Job?

ExtractTransformLoad

group

foreach

join

foreach + filter + store

join foreach

foreach

join

join

join

load + filter

load + filter

load + filter

load + filter

load + filter

load + filter

Pig vs. Spark: Job #1

Series10

100

200

300

400

Pig Spark PySpark

~2.4x ~2x

Avg.

Com

pleti

on T

ime

[s]

Pig vs. Spark (Scala) vs. PySpark

load + filter

cogroup

join

order by + store

foreach

load + filter

load + filter

join

foreach

foreach

foreach

foreach

foreach

Pig vs. Spark: Job #2

Series1050

100150200250300350400450

Pig Spark PySpark

~3.2x ~1.6x

Avg.

Com

pleti

on T

ime

[s]

Pig vs. Spark (Scala) vs. PySpark

Prototype

DeployBuild Run

S3

Production Workflow

• A rapid innovation platform for targeting algorithms

• 5 hours (vs. 10s of hours) to compute similarity for all Netflix profiles for 30-day window of new arrival titles

• 10 minutes to score 4M profiles for 14-day window of new arrival titles

Production Spark Application #1: Yogen

• Personalized ordering of rows of titles

• Enrich page/row/title features with play history

• 14 stages, ~10Ks of tasks, several TBs

Production Spark Application #2: ARO

What’s Next?• Improved Parquet support

• Better visibility

• Explore new use cases

Questions?

top related