thoughts on efficient spark programming (vancouver spark meetup 03-09-2015)
TRANSCRIPT
Thoughts on Efficient Spark Programming
Niels Hanson Vancouver Apache Spark Meetup
Thursday, September 3 2015
1
Who am I?• Niels Hanson, Recent PhD Graduate in Bioinformatics • BSc UBC Computer Science and Statistics • PhD Thesis focused on developing a Distributed Computing for
processing large Metagenomic datasets ‣ Compute Canada’s WestGrid ‣ Pacific Northwest National Lab (PNNL) Cascade
- Algorithm development on Intel Phi-Cards ‣ Author on 10+ publications
• Blatant Plug: Looking for a Developer/Data Scientist position!
2
Apache Spark• Open-source cluster computing framework
‣ Originally based out of the AMPLab at UC Berkeley ‣ Implements in-memory distributed least-recently used
(LRU) cache at Worker nodes - In contrast to Hadoop’s two-stage disk-based
MapReduce ‣ Cluster management via Sparks stand-alone, Hadoop
YARN, and Apache Mesos ‣ Supports distributed storage provided by Hadoop HDFS,
Cassandra, OpenStack Swift, and Amazon S3 ‣ Also can be used stand-alone for thread-like parallelism ‣ Central Paradigm: Resilient Distributed Datasets (RDDs)
3
Image: EdX BerkeleyX: CS100.1x Introduction to Big Data with Apache Spark 4
“Improvements to YARN worker task memory management would allow jobs with large side data distributions like BLAST to be used more efficiently on machines with smaller memory requirements, further reducing cost of cloud-computing services.”
Kim, Konwar, Hanson, and Hallam. Koonkie: An Automated Software Tool for Processing Environmental Sequence Information using Hadoop. 2014 ASE BigData, Harvard University, Cambridge, MA
• RDDs essentially represent an in-memory LSU cache • What we implemented here using Hadoop 2.3 was a worker-
disk LSU cache (check local disk before going to HDFS)5
Resilient Distributed Datasets• Write programs in terms of operations on distributed datasets • Partitioned collections of objects spread across a cluster, stored in
memory or on disk (LRU-cache style) • RDDs built and manipulated through a diverse set of parallel
transformations (map, filter, join) and actions (count, collect, save) • RDDs automatically rebuilt on machine failure
Image: EdX BerkeleyX: CS100.1x Introduction to Big Data with Apache Spark
6Zaharia, Matei, et al. "Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing." Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation. 2012.
MapReduce Aggregation• My Initial Thoughts: “[GroupByKey()] not such a big deal so long
as the keys are balanced”
Input Split Map
Tasks�6KXIÀH�
5HGXFHTasks
Outputs
a b F G H
• MapReduce: Map task produce (key, value) pairs that are shuffled to a Reducer tasks based on the key ‣ One reducer may get lots of jobs ‣ May be a bottleneck if further map tasks are relying on the
complete output7
GroupByKey()
Image: Databricks Spark Knowledge Base
• Problem still exists in Spark if using GroupByKey()
• Two issues: ‣ Every (key, value) pair shuffled over the network ‣ Unbalanced keys means some workers could be doing very little
work
8
ReduceByKey()• Solution: perform a reduction in the “Map” step (i.e., use combiner)
Image: Databricks Spark Knowledge Base
• Bounds “Map” communication to O(number of Keys) • Bounds “Reducer” input to O(number of Mappers) • Unbalanced Keys could still still mean some workers have unbalanced work
9
CombineByKey()• CombineByKey() when your return type differs from your input type
‣ i.e., can’t use ReduceByKey() • Requires three functions:
‣ createCombiner: used as the very first aggregation step for each key. Argument of this function corresponds to the value in a key-value pair
‣ mergeValue: What to do when a combiner is given a new value. The arguments to this function are a combiner and a new value.
‣ mergeCombiner: how to merge two combiners. Arguments are two combiners.
Source: Using CombineByKey in Apache-Spark 10
Typical Case: Calculate Average per Key
Key Value
A 0
B 3
B 2
C 3
A 1
B 4
mapValues()
Key Value
A (0,1)
B (3,1)
B (2,1)
C (3,1)
A (1,1)
B (4,1)
reduceByKey()
Key Value
A (0,1)
B (5,2)
A (1,1)
B (4,1)
C (3,1)
Key Value
A (0,1)
B (5,2)
C (3,1)
A (1,1)
B (4,1)
Key Value
A (1,2)
B (9,3)
C (3,1)
Key Value
A 1/2
B 9/3
C 3/1
dataRDD.mapValues(lambda val: (val, 1)) .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) .mapValues(lambda val: val[0]/ val[1])
mapValues()
P1
P2
11
combineByKey(): Calculate Average per Key
Key Value
A 0
B 3
B 2
C 3
A 1
B 4
dataRDD.combineByKey( lambda value: (value, 1), #createCombiner lambda acc, value: (acc[0] + value, acc[1] + 1), #mergeValue lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) #mergeCombiner )
P1
P2
accumulators[A] = createCombiner(0)accumulators[B] = createCombiner(3)accumulators[A] = mergeValue(accumulator[A], 2)accumulators[C] = createCombiner(3)accumulators[A] = createCombiner(1)accumulators[B] = createCombiner(4)
12
combineByKey(): Calculate Average per Key
Key Value
A 0
B 3
B 2
C 3
A 1
B 4
sumCount = dataRDD.combineByKey( lambda value: (value, 1), #createCombiner lambda acc, value: (acc[0] + value, acc[1] + 1), #mergeValue lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) #mergeCombiner )
P1
P2
P1.accums
P2.accums
mergeCombiners(P1.accums[A], P2.accums[A])
averageByKey = sumCount.mapValues( lambda (value_sum, count): value_sum / count)
13
combineByKey(): Bag-of-Words Dictionary
dataRDD.combineByKey( createCombiner, mergeValue, mergeCombiner, )
def createCombiner(word): return {word: 1}
def mergeValue(acc,word): return acc[word] += 1
def mergeCombiner(acc1,acc2): for word in acc1: if word in acc2: acc2[word] += acc1[word] // more complicated combing return acc2
• Advantage of combineByKey() being you can have a complicated aggregator
• Assume RDD of document text data (doc,word) and use python dictionaries (Java-Serializable/Pickle-able data structure)
14
Partitioners• Partitioners can reduce communication where an
RDD is used multiple times in key-oriented operations (e.g., joins, XByKey, lookup)
• They distribute data in a way that ensures particular set of keys will appear on the same node ‣ HashPartitioner: Hash based on the Java
hash() function ‣ RangePartitioner: Distributes RDD based on
continuous ranges • Range partitioner now default in Spark 1.2.0
15
Partitionersval sc = new SparkContext(…) val userData = sc.sequenceFile[UserID, UserInfo](“hdfs://…”) .partitionBy(new HashPartitioner(100)) .persist()
16
Partitionersval sc = new SparkContext(…) val userData = sc.sequenceFile[UserID, UserInfo](“hdfs://…”) .partitionBy(new HashPartitioner(100)) .persist()
• HashPartitioner(100): sets level of parallelism of RDD to 100 partitions ‣ rdd.getNumPartitions (Python), rdd.partitions.size (Java/Scala) ‣ Extra argument in most functions allows you to set particular
number of partitions (otherwise spark.default.parallelism) ‣ Want to have num_partitions > num_cores ‣ Rule-of-thumb: probably need 2-times num_nodes because
you’ll also have to run JVM processes ‣ repartition() and coalesce() functions for changing RDD
partition numbers17
Partitioners• Partitioners are automatically set depending upon
certain operations • reduceByKey() following a join() of two RDDs will
be faster because results already hash partitioned • Actions that don’t provide guaranteed partitioning
like map() don’t set a partitioner and remove existing partitioner if present • mapValues() and flatMapValues() retain existing
partitioner • Set Partitioner: cogroup(), groupWith(), join(), Joins(),
XbyKey(), partitionBy(), sort(), mapValues(), and filter()
18
Partitioners
import urlparse !def hash_domain(url): return hash(urlparse.urlparse(url).netloc) !rdd.partitionBy(20, hash_domain) rdd.persist()
• Example of a custom URL-based partitioner in Python ‣ Can’t be a lambda function as it uses the name-
space to track partitioning of RDDs
19
Joins• Fundamental relational algebra operation to combine
two structured datasets matched by some field ‣ Traditionally, creates full cartesian product of
shared field
• LeftJoin and RightJoin express how you handle missing pairs
https://en.wikipedia.org/wiki/Relational_algebra20
• Won’t always have a situation with good Sharding (enough keys to spread out results)
Join Large RDD with Small RDD
US RDD Partition 1
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition n>>50
Small State RDD
All data for CA
All data for RI
• Uneven sharding • Limited parallelism
(50 states)
ShuffledHashJoin
21
• If Small State Info can fit in RAM, may be worth broadcasting to all worker nodes
Join Large RDD with Small RDD
US RDD Partition 1
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition n>>50
Small State RDD
• Parallelism of the Large RDD maintained, shuffle not needed
…Small State RDD
Small State RDD
Small State RDD
val smallRDDBroadcast = sc.broadcast(smallRDD.collectAsMap())22
• If Small State Info can fit in RAM, may be worth broadcasting to all worker nodes
Join Medium RDD and Large RDD
All Data from Both
All CA RDD
• Good parallelism here • However, limited by the Large
table rather than the Medium size table
All CA RDDAll CA RDDAll CA RDDAll CA RDDAll CA RDDAll CA RDDAll CA RDD US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2All World RDD
All Data from Both
All Data from BothAll Data from
BothAll Data from
BothAll Data from
Both
Final Joined Output
Final Joined Output
Final Joined Output
Final Joined Output
Final Joined Output
Final Joined Output
23
• Filter Large RDD for ids in Medium RDD so less shuffle space needed
Join Medium RDD and Large RDD
All CA RDDAll CA RDDAll CA RDDAll CA RDDAll CA RDDAll CA RDDAll CA RDDAll CA RDD US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2All World RDD
Final Joined Output
Final Joined Output
Final Joined Output
Final Joined Output
Final Joined Output
Final Joined Output
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
US RDD Partition 2
Partial World RDD
filter
24
Resources• Learning Spark & Advanced Analytics with Spark:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/
• Apache Spark Programming Guide: https://spark.apache.org/docs/latest/programming-guide.html
• EdX: BerkleyX (Intro to Big Data… CS100.1x, Scalable Machine Learning CS190.1x) https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x
• Alexey Grishchenko’s Distributed Systems Architecture Bloghttp://0x0fff.com/
• Vida Ha & Holden Karau Strata. “Everyday I’m Shuffling: Tips for writing better Spark Jobs” Stata SJ 2015. http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs
• Sameer Farooqui. “DevOpps Advanced Class” Spark Summit East 2015.https://www.youtube.com/watch?v=7ooZ4S7Ay6Yhttps://spark-summit.org/wp-content/uploads/2015/03/SparkSummitEast2015-AdvDevOps-StudentSlides.pdf25
Questions?
26