spark on yarn

16
Spark on YARN Best practices Adarsh Pannu IBM Analytics Platform DRAFT: This is work in progress. Please send comments to [email protected]

Upload: crabtunescribd

Post on 10-Apr-2016

36 views

Category:

Documents


3 download

DESCRIPTION

"Spark on YARN"

TRANSCRIPT

Page 1: Spark on YARN

Spark on YARN Best practices

Adarsh Pannu IBM Analytics Platform

DRAFT: This is work in progress. Please send comments to [email protected]

Page 2: Spark on YARN

Spark and Cluster Management Spark supports four different cluster managers: ●  Local: Useful only for development

●  Standalone: Bundled with Spark, doesn’t play well with other applications, fine for PoCs

●  YARN: Highly recommended for production

●  Mesos: Not supported in BigInsights

Each mode has a similar “logical” architecture although physical details differ in terms of which/where processes and threads are launched.

Page 3: Spark on YARN

Spark Cluster Architecture: Logical View

Driver runs the main() function of the application. This can run outside (“client”) or inside the cluster (“cluster”) SparkContext is the main entry point for Spark functionality. Represents the connection to a Spark cluster. Executor is a JVM that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors spread across a cluster.

Task Driver Program

SparkContext

Cluster Manager

Executor

Cache

Task

Page 4: Spark on YARN

Spark: What’s Inside an Executor?

Task

Task

Task

RDD P1

RDD P2

RDD P3

Internal Threads

Single JVM

Partitions from 2 different RDDs being processed by 3 tasks

RDD P2

RDD P1

Cached RDD partitions from yet another RDD

Shuffle, Transport, GC, and other system threads

Free Task Slots (“Cores”)

Executor

Page 5: Spark on YARN

Spark: Standalone Cluster Manager

Worker

(Client 1)

(Client 2)

Worker

(Client 1)

Client 2

Executor

Executor

Machine 1 Machine 2

•  Inter-process communication not shown. •  All orange boxes are JVMs •  Deploy mode = “Client”

Executor

Master

Client 1

Page 6: Spark on YARN

Standalone Mode: Configuration

Per Worker Node Per Application Per Executor

CPU SPARK_WORKER_CORES

spark.cores.max

Memory SPARK_WORKER_MEMORY spark.executor.memory

SPARK_WORKER_CORES # of cores to give to underlying Executors (default: all available cores). SPARK_WORKER_MEMORY Total memory to use on the machine, e.g. 1000m, 2g

(default: total memory minus 1 GB) spark.cores.max maximum # of cores to request for the application

across the cluster (default: all available cores) spark.executor.memory Memory per executor (default: 512m) Standalone mode uses as FIFO scheduler. As applications launch, it will try to balance the resource consumption across the cluster. Strangely, cores are specified per application, yet memory is per executor!

Page 7: Spark on YARN

Spark on YARN: Architecture

Resource Manager

Node Manager

Container

Container

Node Manager

Container

Client

Spark Application

Master

Executor

Executor

Machine 1 Machine 2

Machine 0 •  Inter-process communication

not shown. •  All orange boxes are JVMs

Page 8: Spark on YARN

Spark Configuration Spark has scores of configuration options: •  For many options, defaults generally work alright •  However, there are some critical “knobs” that should be carefully tuned Several settings are cluster manager specific. When running Spark on YARN, you must examine: •  Yarn-specific settings: scheduler type and queues •  Spark specific settings for YARN: # of executors, per-executor memory and cores, and more Other general techniques will improve your applications on any cluster manager. For example: •  Java object serialization schemes (Kryo vs Java) •  Proper partitioning and parallelism levels •  On-disk data formats (Parquet vs AVRO vs JSON vs ...) •  And many more ... (to be covered elsewhere)

Page 9: Spark on YARN

Spark on YARN: Managing queues Your cluster may serve different applications/users, each with differing expectations: •  Batch jobs could possibly wait but interactive users may not •  Tight SLAs need to be honored often at the expense of others There may be more than one instance of the same type of application, and yet, they may need to be treated differently. E.g. different Spark jobs may have differing needs. Step 1: Divide up your cluster resources into “queues” that are organized by target needs: •  Choose scheduling strategy: Capacity vs. Fair. •  Capacity scheduler is best for applications that need guarantees on availability of cluster resources

(although at the cost of elasticity) •  Fair scheduler is best for applications that want to share resources in some pre-determined

proportions. •  (This aspect is not covered in this document as it’s adequately documented elsewhere) Step 2: Configure resources for Spark jobs based on the queue capacities. •  Described in the next slide Step 3: In your Spark application code, designate the right via –queue or by setting“spark.yarn.queue”

Page 10: Spark on YARN

Spark on YARN: Basic Configuration

YARN Settings (Per Node, not Per Queue) Spark Settings (Per Executor)

Executor Count

--num-executors OR spark.executor.instances

CPU yarn.nodemanager.resource.memory-mb --executor-cores OR spark.executor.cores

Memory yarn.nodemanager.resource.cpu-vcores --executor-memory OR spark.executor.memory

Spark internally adds an overhead to spark.executor.memory to account for off-heap JVM usage: overhead = MIN(384 MB, 10% of spark.executor.memory) // As of Spark 1.4 Yarn further adjusts requested container size: 1.  Ensures memory is a multiple of yarn.scheduler.minimum-allocation-mb. Unlike its name, this

isn’t merely a minimum bound. CAUTION: Setting yarn.scheduler.minimum-allocation-mb too high can over-allocate memory because of rounding up.

2.  Ensures request size is bounded by yarn.scheduler.maximum-allocation-mb

Need to specify these

Page 11: Spark on YARN

Spark on YARN: Memory Usage Inside an Executor

App Objects

Shuffle Cache

spark.shuffle.memoryFraction Default = 0.2 (20%) Used for shuffles. Increase this for shuffle-intensive applications wherein spills happen often.

spark.storage.memoryFraction Default = 0.6 (60%) Used for cached RDDs, useful if .cache() or .persist() called.

This is the memory for application objects. It is what is left after setting the other two. If you’re seeing OOMs in your code, you need more memory here!

Guideline: Stick with defaults, and check execution statistics to tweak settings.

May need to tweak Executor memory breakdowns too:

Page 12: Spark on YARN

Spark on YARN: Sizing up Executors How many Executors? How many cores? How much memory? Setting spark.executor.memory

!  Size up this number first •  Don’t use excessively large executors as GC pauses become a problem. •  Don’t use overly skinny executors since JVM overhead becomes proportionately higher •  10GB <= spark.executor.memory <= 48GB could be a good guideline? •  Choose towards the higher end when working with bigger data partitions, using large broadcast

variables, etc.

Setting spark.executor.instances !  Given spark.executor.memory, compute spark.executor.instances to saturate available memory. !  In reality, spark.executor.memory and spark.executor.instances are computed hand-in-hand. !  Don’t forget to account for overheads (daemons, application master, driver, etc.) •  spark.executor.instances ~ #nodes * (yarn.nodemanager.resource.memory-mb * queue-fraction /

spark.executor.memory)

Setting spark.executor.cores •  Over-request cores by 2 to 3 times the number of actual cores in your cluster. •  Why? Not all tasks are CPU bound at the same time.

Page 13: Spark on YARN

Spark on YARN: Sizing up Executors (Example) Sample Cluster Configuration:

8 nodes, 32 cores/node (256 total), 128 GB/node (1024 GB total) Running YARN Capacity Scheduler Spark queue has 50% of the cluster resources

Naive Configuration:

spark.executor.instances = 8 (one Executor per node) spark.executor.cores = 32 * 0.5 = 16 => Undersubscribed spark.executor.memory = 64 MB => GC pauses

Better Configuration:

spark.executor.memory = 16 GB (just as an example) spark.executor.instances = 8 * (128 GB * 0.5 / 16 GB) = 32 total spark.executor.cores = total-available-cores * over-subscription-factor / spark.executor.instances = (256 * 0.5) * 2.5 / 32 = 10

These calculations aren’t perfect -- they don’t account for overheads, for the Application Master container, etc. But hopefully you get the idea ☺ Different applications dictate different settings. EXPERIMENT and FINE TUNE!

Page 14: Spark on YARN

Spark on YARN: Exploiting Data Locality •  Spark tries to execute tasks on nodes such that there will be minimal data movement (data locality)

!  Loss of data locality = suboptimal performance

•  These tasks are run on executors, which are (usually) launched when a SparkContext is spawned, and well before Spark knows what data will be “touched.”

•  Problem: How does Spark tell YARN where to launch Executors? •  Your application can tell Spark the list of nodes that hold data (“preferred locations”). Using a simple

API, you can supply this information when instantiating a SparkContext •  See SparkContext constructor (argument preferredNodeLocationData)

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

val hdfspath = “hdfs://...”val sc = new SparkContext(sparkConf,

InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf,

classOf[org.apache.hadoop.mapred.TextInputFormat], hdfspath ))

Page 15: Spark on YARN

Spark on YARN: Dynamic Allocation •  Prior to Release 1.3, Spark acquired all executors at application startup and held onto them for the

lifetime of an application.

•  Starting Release 1.3, Spark supports “dynamic allocation” of executors. This allows applications to launch executors when more tasks are queued up, and release resources when the application is idle.

•  Ideally suited for many interactive applications that might have see user down-time. •  Major caveat: Spark may release executors with cached RDDs! Ouch! So if you’re application uses

rdd.cache() or rdd.persist() to materialize expensive computations, you may not want to use dynamic allocation for that application.

•  On the other hand, you could consider “caching” expensive computations in HDFS.

Page 16: Spark on YARN

Spark on YARN: Dynamic Allocation settings

Configuration option Default Description

spark.dynamicAllocation.enabled false Set to true to get elasticity

spark.dynamicAllocation.minExecutors 0 Lower bound on # executors. Leave as is.

spark.dynamicAllocation.maxExecutors

<Infinity> Upper bound on # executors. Set based on worksheet in previous slide.

spark.dynamicAllocation.executorIdleTimeout 600 secs (10 mins)

How long to wait before giving up idle executors? Set to lower value, say 1 minute?

spark.dynamicAllocation.schedulerBacklogTimeout spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

5 secs How to launch new executors to meet incoming demand? Executors are launched in waves of exponentially increasing numbers. Leave as is.