tuning tips for apache spark jobs

22
Spark Tuning @samklr

Upload: samir-bessalah

Post on 06-Jan-2017

843 views

Category:

Software


4 download

TRANSCRIPT

Page 1: Tuning tips for Apache Spark Jobs

Spark Tuning@samklr

Page 2: Tuning tips for Apache Spark Jobs

Memory Management

- Careful with executors and driver layout :

- If your job runs multiple large shuffles, make sure to allocate the spark.shuffle.memoryFraction accordingly, at the expense of spark.executor.memoryfraction

- Consequently this might induce multiple GC : Watch the GC time/ Task Duration ratio

Page 3: Tuning tips for Apache Spark Jobs

Garbage Collection

Might require some tuning.

First plug your job into your monitorin tool. And check for GC hikes.

Most of the time, for long running jobs, G1 works well, but sometimes a properly tuned Concurrent Mark Sweep with appropriate metrics works better.

Most of time, especially with versions >= 1.6.0 use fixed size Heap

e.g Xms4g Xmx4g

Page 4: Tuning tips for Apache Spark Jobs

Garbage Collection--conf spark.executor.extraJavaOptions="-server -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:NewSize=4g -

XX:MaxNewSize=4g -XX:+UseParNewGC -XX:MaxTenuringThreshold=2 -XX:SurvivorRatio=4 -XX:

+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=32768 -XX:+UseConcMarkSweepGC -XX:

+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+CMSClassUnloadingEnabled -

XX:CMSInitiatingOccupancyFraction=80 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -XX:

+PrintGCDetails -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution -XX:+PrintGCDateStamps"

--driver-java-options "-server -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:NewSize=4g -XX:MaxNewSize=4g -

XX:+UseParNewGC -XX:MaxTenuringThreshold=2 -XX:SurvivorRatio=4 -XX:+UnlockDiagnosticVMOptions -

XX:ParGCCardsPerStrideChunk=32768 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:

+ParallelRefProcEnabled -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:

+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -XX:+PrintGCDetails -XX:+PrintAdaptiveSizePolicy -XX:

+PrintTenuringDistribution -XX:+PrintGCDateStamps -Xloggc:gc.log"

Page 5: Tuning tips for Apache Spark Jobs

Shuffle

Use groupByKey only if you have to. And mostly if it’s the last stage of the job.

Prefer reduceByKey, foldByKey, AggregateByKey or better, CombineByKey

Increase your number of partitions, to gain more parallelism

Either within your spark.default.parallelism or when reading with

textfile(..., 100), etc.

Page 6: Tuning tips for Apache Spark Jobs
Page 7: Tuning tips for Apache Spark Jobs
Page 8: Tuning tips for Apache Spark Jobs

Shuffle

Or increase your number of partitions with :

repartition(n) : mostly to increase the number of partitions. This induces suffling your data all over your cluster.

coalesce(): mostly to reduce your number of partitions without a shuffle step

repartion(n) = coalesce(n, shuffle=true)

http://blog.cloudera.com/blog/2015/03/how-to-tune-yestingour-apache-spark-jobs-part-2/

Page 9: Tuning tips for Apache Spark Jobs

Tuning - Careful with HDFS IOPS. More concurrent can badly decrease your

throughput. so do not use too many cores per executors. Depending on the number of size of your job and cluster do some maths. Rule of thumb for maximum throughput use a max of 5 cores per executors. Then optimize.

- Spark Shuffle block can’t be greater to 2 GB (ByteBuffers max size) so careful with your shuffle step, tune the shuffle memory portion. Use appropriate storage level

- Prefer persist(StorageLevel.MEMORY_DISK_SER_2) to cache

Page 10: Tuning tips for Apache Spark Jobs

Tuning

- Use Kryo and register your classes.

- With Spark SQL tune spark.sql.shuffle.partitions to increase parallelism of shuffle steps.

- Tune your partitions size, try to have something close to one partition per block hdfs. depending on your install, one partition ~ 128 MB

- Give your drive some love, and avoid as much as possible sc.reduce(), sc.collect() or sc.accumulator()

- Look at TreeReduce, and Map Side aggregation ( Check Algebird Monoids)

Page 11: Tuning tips for Apache Spark Jobs
Page 12: Tuning tips for Apache Spark Jobs

Join and skews

- Careful with data skew. Performance killer within Cartesian and Join operations

- Salt your keys : key + random.nextIn(Factor)

- Avoid Cartesian Join, with windowing, Skip tests via filtering, Nested Structures

- Broadcast Join, by broadcasting the smaller dataset.

Page 13: Tuning tips for Apache Spark Jobs

Tuning the number and size of partitions

Read this deep dive: (Thanks Oleksandr)

http://www.bigsynapse.com/spark-input-output

Page 14: Tuning tips for Apache Spark Jobs

Speculation

- Restart tasks when they’re running behind the others

--conf spark.speculation=true

--conf spark.speculation.quantile=0.75 (default)

--conf spark.speculation.multiplier

Page 15: Tuning tips for Apache Spark Jobs

Monitoring

- Use metric.properties to export your Spark jobs metrics

- Monitor your DAGScheduler stages, and tasks behaviour

- Check the JVM for GC Behaviour and Cores placement

- Add Custom metrics via Accumulators

- Check spark internals counters for proper debugging, via custom listener e.g https://gist.github.com/squito/2f7cc02c313e4c9e7df4

- Play with https://github.com/holdenk/spark-validator

Page 16: Tuning tips for Apache Spark Jobs

Notebooks and spark-shell

- Spark notebook : http://spark-notebook.io

- Zeppelin http://zeppelin.apache.org

- Jupyter

- Spark-shell : great tool to get started

Page 17: Tuning tips for Apache Spark Jobs

Testing

- Unit Testing : spark-testing-base

test(" simple transformation with rdd - rdd comparision") {

val inputList = List("hi", "hi hello", "bye")

val inputRDD = tokenize(sc.parallelize(inputList)) val expectedList = List(List("hi"), List("hi", "hello"), List("bye")) val expectedRDD = sc.parallelize(expectedList) assert(None === RDDComparisons.compare(expectedRDD, inputRDD)) }

Page 18: Tuning tips for Apache Spark Jobs

Testing

- Unit Testing : spark-testing-base

test(" simple transformation with rdd - rdd comparision") {

val inputList = List("hi", "hi hello", "bye")

val inputRDD = tokenize(sc.parallelize(inputList)) val expectedList = List(List("hi"), List("hi", "hello"), List("bye")) val expectedRDD = sc.parallelize(expectedList) assert(None === RDDComparisons.compare(expectedRDD, inputRDD)) }

Page 19: Tuning tips for Apache Spark Jobs

Testing

- Property testing with scalacheck

class SampleScalaCheckTest extends FunSuite with SharedSparkContext with Checkers {

// A trivial property that the map doesn't change the number of elements

test("map should not change number of elements") { val property = forAll(RDDGenerator.genRDD[String](sc)(Arbitrary.arbitrary[String])) { rdd => rdd.map(_.length).count() == rdd.count() } check(property) }

Page 20: Tuning tips for Apache Spark Jobs

Testing

- Property testing with scalacheck

test("assert dataframes created correctly") { val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType))) val sqlContext = new SQLContext(sc) val dataframeGen = DataframeGenerator.genDataFrame(sqlContext, schema) val property = forAll(dataframeGen.arbitrary) { dataframe => dataframe.schema === schema && dataframe.count >= 0 }

check(property) }

Page 21: Tuning tips for Apache Spark Jobs
Page 22: Tuning tips for Apache Spark Jobs