Download - Spark on YARN
Spark on YARN Best practices
Adarsh Pannu IBM Analytics Platform
DRAFT: This is work in progress. Please send comments to [email protected]
Spark and Cluster Management Spark supports four different cluster managers: ● Local: Useful only for development
● Standalone: Bundled with Spark, doesn’t play well with other applications, fine for PoCs
● YARN: Highly recommended for production
● Mesos: Not supported in BigInsights
Each mode has a similar “logical” architecture although physical details differ in terms of which/where processes and threads are launched.
Spark Cluster Architecture: Logical View
Driver runs the main() function of the application. This can run outside (“client”) or inside the cluster (“cluster”) SparkContext is the main entry point for Spark functionality. Represents the connection to a Spark cluster. Executor is a JVM that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors spread across a cluster.
Task Driver Program
SparkContext
Cluster Manager
Executor
Cache
Task
Spark: What’s Inside an Executor?
Task
Task
Task
RDD P1
RDD P2
RDD P3
Internal Threads
Single JVM
Partitions from 2 different RDDs being processed by 3 tasks
RDD P2
RDD P1
Cached RDD partitions from yet another RDD
Shuffle, Transport, GC, and other system threads
Free Task Slots (“Cores”)
Executor
Spark: Standalone Cluster Manager
Worker
(Client 1)
(Client 2)
Worker
(Client 1)
Client 2
Executor
Executor
Machine 1 Machine 2
• Inter-process communication not shown. • All orange boxes are JVMs • Deploy mode = “Client”
Executor
Master
Client 1
Standalone Mode: Configuration
Per Worker Node Per Application Per Executor
CPU SPARK_WORKER_CORES
spark.cores.max
Memory SPARK_WORKER_MEMORY spark.executor.memory
SPARK_WORKER_CORES # of cores to give to underlying Executors (default: all available cores). SPARK_WORKER_MEMORY Total memory to use on the machine, e.g. 1000m, 2g
(default: total memory minus 1 GB) spark.cores.max maximum # of cores to request for the application
across the cluster (default: all available cores) spark.executor.memory Memory per executor (default: 512m) Standalone mode uses as FIFO scheduler. As applications launch, it will try to balance the resource consumption across the cluster. Strangely, cores are specified per application, yet memory is per executor!
Spark on YARN: Architecture
Resource Manager
Node Manager
Container
Container
Node Manager
Container
Client
Spark Application
Master
Executor
Executor
Machine 1 Machine 2
Machine 0 • Inter-process communication
not shown. • All orange boxes are JVMs
Spark Configuration Spark has scores of configuration options: • For many options, defaults generally work alright • However, there are some critical “knobs” that should be carefully tuned Several settings are cluster manager specific. When running Spark on YARN, you must examine: • Yarn-specific settings: scheduler type and queues • Spark specific settings for YARN: # of executors, per-executor memory and cores, and more Other general techniques will improve your applications on any cluster manager. For example: • Java object serialization schemes (Kryo vs Java) • Proper partitioning and parallelism levels • On-disk data formats (Parquet vs AVRO vs JSON vs ...) • And many more ... (to be covered elsewhere)
Spark on YARN: Managing queues Your cluster may serve different applications/users, each with differing expectations: • Batch jobs could possibly wait but interactive users may not • Tight SLAs need to be honored often at the expense of others There may be more than one instance of the same type of application, and yet, they may need to be treated differently. E.g. different Spark jobs may have differing needs. Step 1: Divide up your cluster resources into “queues” that are organized by target needs: • Choose scheduling strategy: Capacity vs. Fair. • Capacity scheduler is best for applications that need guarantees on availability of cluster resources
(although at the cost of elasticity) • Fair scheduler is best for applications that want to share resources in some pre-determined
proportions. • (This aspect is not covered in this document as it’s adequately documented elsewhere) Step 2: Configure resources for Spark jobs based on the queue capacities. • Described in the next slide Step 3: In your Spark application code, designate the right via –queue or by setting“spark.yarn.queue”
Spark on YARN: Basic Configuration
YARN Settings (Per Node, not Per Queue) Spark Settings (Per Executor)
Executor Count
--num-executors OR spark.executor.instances
CPU yarn.nodemanager.resource.memory-mb --executor-cores OR spark.executor.cores
Memory yarn.nodemanager.resource.cpu-vcores --executor-memory OR spark.executor.memory
Spark internally adds an overhead to spark.executor.memory to account for off-heap JVM usage: overhead = MIN(384 MB, 10% of spark.executor.memory) // As of Spark 1.4 Yarn further adjusts requested container size: 1. Ensures memory is a multiple of yarn.scheduler.minimum-allocation-mb. Unlike its name, this
isn’t merely a minimum bound. CAUTION: Setting yarn.scheduler.minimum-allocation-mb too high can over-allocate memory because of rounding up.
2. Ensures request size is bounded by yarn.scheduler.maximum-allocation-mb
Need to specify these
Spark on YARN: Memory Usage Inside an Executor
App Objects
Shuffle Cache
spark.shuffle.memoryFraction Default = 0.2 (20%) Used for shuffles. Increase this for shuffle-intensive applications wherein spills happen often.
spark.storage.memoryFraction Default = 0.6 (60%) Used for cached RDDs, useful if .cache() or .persist() called.
This is the memory for application objects. It is what is left after setting the other two. If you’re seeing OOMs in your code, you need more memory here!
Guideline: Stick with defaults, and check execution statistics to tweak settings.
May need to tweak Executor memory breakdowns too:
Spark on YARN: Sizing up Executors How many Executors? How many cores? How much memory? Setting spark.executor.memory
! Size up this number first • Don’t use excessively large executors as GC pauses become a problem. • Don’t use overly skinny executors since JVM overhead becomes proportionately higher • 10GB <= spark.executor.memory <= 48GB could be a good guideline? • Choose towards the higher end when working with bigger data partitions, using large broadcast
variables, etc.
Setting spark.executor.instances ! Given spark.executor.memory, compute spark.executor.instances to saturate available memory. ! In reality, spark.executor.memory and spark.executor.instances are computed hand-in-hand. ! Don’t forget to account for overheads (daemons, application master, driver, etc.) • spark.executor.instances ~ #nodes * (yarn.nodemanager.resource.memory-mb * queue-fraction /
spark.executor.memory)
Setting spark.executor.cores • Over-request cores by 2 to 3 times the number of actual cores in your cluster. • Why? Not all tasks are CPU bound at the same time.
Spark on YARN: Sizing up Executors (Example) Sample Cluster Configuration:
8 nodes, 32 cores/node (256 total), 128 GB/node (1024 GB total) Running YARN Capacity Scheduler Spark queue has 50% of the cluster resources
Naive Configuration:
spark.executor.instances = 8 (one Executor per node) spark.executor.cores = 32 * 0.5 = 16 => Undersubscribed spark.executor.memory = 64 MB => GC pauses
Better Configuration:
spark.executor.memory = 16 GB (just as an example) spark.executor.instances = 8 * (128 GB * 0.5 / 16 GB) = 32 total spark.executor.cores = total-available-cores * over-subscription-factor / spark.executor.instances = (256 * 0.5) * 2.5 / 32 = 10
These calculations aren’t perfect -- they don’t account for overheads, for the Application Master container, etc. But hopefully you get the idea ☺ Different applications dictate different settings. EXPERIMENT and FINE TUNE!
Spark on YARN: Exploiting Data Locality • Spark tries to execute tasks on nodes such that there will be minimal data movement (data locality)
! Loss of data locality = suboptimal performance
• These tasks are run on executors, which are (usually) launched when a SparkContext is spawned, and well before Spark knows what data will be “touched.”
• Problem: How does Spark tell YARN where to launch Executors? • Your application can tell Spark the list of nodes that hold data (“preferred locations”). Using a simple
API, you can supply this information when instantiating a SparkContext • See SparkContext constructor (argument preferredNodeLocationData)
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
val hdfspath = “hdfs://...”val sc = new SparkContext(sparkConf,
InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf,
classOf[org.apache.hadoop.mapred.TextInputFormat], hdfspath ))
Spark on YARN: Dynamic Allocation • Prior to Release 1.3, Spark acquired all executors at application startup and held onto them for the
lifetime of an application.
• Starting Release 1.3, Spark supports “dynamic allocation” of executors. This allows applications to launch executors when more tasks are queued up, and release resources when the application is idle.
• Ideally suited for many interactive applications that might have see user down-time. • Major caveat: Spark may release executors with cached RDDs! Ouch! So if you’re application uses
rdd.cache() or rdd.persist() to materialize expensive computations, you may not want to use dynamic allocation for that application.
• On the other hand, you could consider “caching” expensive computations in HDFS.
Spark on YARN: Dynamic Allocation settings
Configuration option Default Description
spark.dynamicAllocation.enabled false Set to true to get elasticity
spark.dynamicAllocation.minExecutors 0 Lower bound on # executors. Leave as is.
spark.dynamicAllocation.maxExecutors
<Infinity> Upper bound on # executors. Set based on worksheet in previous slide.
spark.dynamicAllocation.executorIdleTimeout 600 secs (10 mins)
How long to wait before giving up idle executors? Set to lower value, say 1 minute?
spark.dynamicAllocation.schedulerBacklogTimeout spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
5 secs How to launch new executors to meet incoming demand? Executors are launched in waves of exponentially increasing numbers. Leave as is.