lessons from running large scale spark workloads

33
Lessons from Running Large Scale Spark Workloads Reynold Xin, Matei Zaharia Feb 19, 2015 @ Strata

Upload: databricks

Post on 14-Jul-2015

9.774 views

Category:

Software


1 download

TRANSCRIPT

Page 1: Lessons from Running Large Scale Spark Workloads

Lessons from Running Large Scale Spark Workloads Reynold Xin, Matei Zaharia Feb 19, 2015 @ Strata

Page 2: Lessons from Running Large Scale Spark Workloads

About Databricks

Founded by the creators of Spark in 2013 Largest organization contributing to Spark End-to-end hosted service, Databricks Cloud

2

Page 3: Lessons from Running Large Scale Spark Workloads

A slide from 2013 …

3

Page 4: Lessons from Running Large Scale Spark Workloads

4

Does Spark scale?

Page 5: Lessons from Running Large Scale Spark Workloads

5

Does Spark scale? Yes!

Page 6: Lessons from Running Large Scale Spark Workloads

6

On-Disk Sort Record: Time to sort 100TB

2100 machines 2013 Record: Hadoop

2014 Record: Spark

Source: Daytona GraySort benchmark, sortbenchmark.org

72 minutes

207 machines

23 minutes

Also sorted 1PB in 4 hours

Page 7: Lessons from Running Large Scale Spark Workloads

Agenda

Spark “hall of fame” Architectural improvements for scalability Q&A

7

Page 8: Lessons from Running Large Scale Spark Workloads

8

Spark “Hall of Fame”

LARGEST SINGLE-DAY INTAKE

LONGEST-RUNNING JOB

LARGEST SHUFFLE

MOST INTERESTING APP

Tencent (1PB+ /day)

Alibaba (1 week on 1PB+ data)

Databricks PB Sort (1PB)

Jeremy Freeman Mapping the Brain at Scale

(with lasers!)

LARGEST CLUSTER

Tencent (8000+ nodes)

Based on Reynold’s personal knowledge

Page 9: Lessons from Running Large Scale Spark Workloads

Largest Cluster & Daily Intake

9

800 million+ active users

8000+ nodes

150 PB+ 1 PB+/day

Page 10: Lessons from Running Large Scale Spark Workloads

Spark at Tencent

10 Images courtesy of Lianhui Wang

Ads CTR prediction Similarity metrics on billions of nodes Spark SQL for BI and ETL …

Spark

SQL streaming machine learning graph ETL

HDFS Hive HBase MySQL PostgreSQL others

See Lianhui’s session this afternoon

Page 11: Lessons from Running Large Scale Spark Workloads

Longest Running Spark Job

Alibaba Taobao uses Spark to perform image feature extraction for product recommendation. 1 week runtime on petabytes of images! Largest e-commerce site (800m products, 48k sold/min) ETL, machine learning, graph computation, streaming

11

Page 12: Lessons from Running Large Scale Spark Workloads

Alibaba Taobao: Extensive Use of GraphX

12

clustering (community detection)

belief propagation (influence & credibility)

collaborative filtering (recommendation)

Images courtesy of Andy Huang, Joseph Gonzales

See upcoming talk at Spark Summit on streaming graphs

Page 13: Lessons from Running Large Scale Spark Workloads

13

Mapping the brain at scale

Images courtesy of Jeremy Freeman

Page 14: Lessons from Running Large Scale Spark Workloads

14 Images courtesy of Jeremy Freeman

Page 15: Lessons from Running Large Scale Spark Workloads

15 Images courtesy of Jeremy Freeman

Page 16: Lessons from Running Large Scale Spark Workloads

Architectural Improvements

Elastic Scaling: improve resource utilization (1.2) Revamped Shuffle: shuffle at scale (1.2) DataFrames: easier high performance at scale (1.3) 16

Page 17: Lessons from Running Large Scale Spark Workloads

Static Resource Allocation

17

Resource (CPU / Mem)

Time

Allocated Used

New job New job

Stragglers Stragglers

Page 18: Lessons from Running Large Scale Spark Workloads

Static Resource Allocation

18

Resource (CPU / Mem)

Time

Allocated Used

More resources allocated than are used!

Page 19: Lessons from Running Large Scale Spark Workloads

Static Resource Allocation

19

Resource (CPU / Mem)

Time

Allocated Used

Resources wasted!

Page 20: Lessons from Running Large Scale Spark Workloads

Elastic Scaling

20

Resource (CPU / Mem)

Time

Allocated Used

Page 21: Lessons from Running Large Scale Spark Workloads

Elastic Scaling

spark.dynamicAllocation.enabled true spark.dynamicAllocation.minExecutors 3 spark.dynamicAllocation.maxExecutors 20 - Optional - spark.dynamicAllocation.executorIdleTimeout spark.dynamicAllocation.schedulerBacklogTimeout spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

21

Page 22: Lessons from Running Large Scale Spark Workloads

Shuffle at Scale

Sort-based shuffle Netty-based transport

22

Page 23: Lessons from Running Large Scale Spark Workloads

Sort-based Shuffle

Old hash-based shuffle: requires R (# reduce tasks) concurrent streams with buffers; limits R. New sort-based shuffle: sort records by partition first, and then write them; one active stream at a time

Went up to 250,000 reduce tasks in PB sort!

Page 24: Lessons from Running Large Scale Spark Workloads

Netty-based Network Transport

Zero-copy network send Explicitly managed memory buffers for shuffle (lowering GC pressure) Auto retries on transient network failures

24

Page 25: Lessons from Running Large Scale Spark Workloads

Network Transport

Sustaining 1.1GB/s/node in shuffle

Page 26: Lessons from Running Large Scale Spark Workloads

DataFrames in Spark

Current Spark API is based on Java / Python objects –  Hard for engine to store compactly –  Cannot understand semantics of user functions

DataFrames make it easy to leverage structured data

–  Compact, column-oriented storage –  Rich optimization via Spark SQL’s Catalyst optimizer

26

Page 27: Lessons from Running Large Scale Spark Workloads

DataFrames in Spark

Distributed collection of data with known schema Domain-specific functions for common tasks

– Metadata –  Sampling –  Project, filter, aggregation, join, … –  UDFs

27

Page 28: Lessons from Running Large Scale Spark Workloads

28

DataFrames

Similar API to data frames in R and Pandas

Optimized and compiled to bytecode by Spark SQL

Read/write to Hive, JSON, Parquet, ORC, Pandas

df = jsonFile(“tweets.json”)

df[df.user == “matei”]

.groupBy(“date”)

.sum(“retweets”)

0

5

10

Python Scala DataFrame Ru

nnin

g Ti

me

Page 29: Lessons from Running Large Scale Spark Workloads

29

joined = users.join(events, users.id == events.uid) filtered = joined.filter(events.date >= “2015-01-01”)

logical plan

filter

join

scan (users)

scan (events)

physical plan

join

scan (users) filter

scan (events)

this join is expensive à

Page 30: Lessons from Running Large Scale Spark Workloads

30

logical plan

filter

join

scan (users)

scan (events)

optimized plan

join

scan (users) filter

scan (events)

optimized plan with intelligent data sources

join

scan (users)

filter scan (events)

joined = users.join(events, users.id == events.uid) filtered = joined.filter(events.date >= “2015-01-01”)

DataFrames arrive in Spark 1.3!

Page 31: Lessons from Running Large Scale Spark Workloads

From MapReduce to Spark public static class WordCountMapClass extends MapReduceBase! implements Mapper<LongWritable, Text, Text, IntWritable> { !! private final static IntWritable one = new IntWritable(1); ! private Text word = new Text(); !! public void map(LongWritable key, Text value, ! OutputCollector<Text, IntWritable> output, ! Reporter reporter) throws IOException { ! String line = value.toString(); ! StringTokenizer itr = new StringTokenizer(line); ! while (itr.hasMoreTokens()) { ! word.set(itr.nextToken()); ! output.collect(word, one); ! } ! } !} !!public static class WorkdCountReduce extends MapReduceBase! implements Reducer<Text, IntWritable, Text, IntWritable> { !! public void reduce(Text key, Iterator<IntWritable> values, ! OutputCollector<Text, IntWritable> output, ! Reporter reporter) throws IOException { ! int sum = 0; ! while (values.hasNext()) { ! sum += values.next().get(); ! } ! output.collect(key, new IntWritable(sum)); ! } !} !

val file = spark.textFile("hdfs://...") !val counts = file.flatMap(line => line.split(" ")) ! .map(word => (word, 1)) ! .reduceByKey(_ + _) !counts.saveAsTextFile("hdfs://...") !

Page 32: Lessons from Running Large Scale Spark Workloads

32

Page 33: Lessons from Running Large Scale Spark Workloads

Thank you! Questions?