visualapi spark
TRANSCRIPT
-
8/20/2019 visualapi spark
1/122
TRANSFORMATIONS AND ACTIONSA Visual Ghttp://training.databricks.com/visualapi.pdf
-
8/20/2019 visualapi spark
2/122
Databricks would like to give a special thanks to Jeff Thomspon for contributing 67
visual diagrams depicting the Spark API under the MIT license to the Spark
community.
Jeff’s original, creative work can be found here and you can read more about
Jeff’s project in his blog post.
After talking to Jeff, Databricks commissioned Adam Breindel to further evolveJeff’s work into the diagrams you see in this deck.
LinkedIn
Blog: data-frack
http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynbhttp://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttps://www.linkedin.com/profile/view?id=8052185https://www.linkedin.com/profile/view?id=128303555http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttp://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttps://www.linkedin.com/profile/view?id=128303555https://www.linkedin.com/profile/view?id=8052185http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttp://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb
-
8/20/2019 visualapi spark
3/122
making big data simple
Databricks Cloud:
“A unified platform for building Big Data pExploration and Dashboards, to Advanced
Products.”
• Founded in late 2013
• by the creators of Apache Spark
• Original team from UC Berkeley AMPLab
• Raised $47 Million in 2 rounds
• ~55 employees
• We’re hiring!
• Level 2/3 support partnerships with
• Hortonworks
• MapR
• DataStax
(http://databricks.workable.com)
-
8/20/2019 visualapi spark
4/122
key
RDD Ele
original
transfortype
object on
RDD
partition(s) A
B
user functions
user input
input
emitted value
Legend
-
8/20/2019 visualapi spark
5/122
Randomized operation
Legend
Set Theory / Relational operation
Numeric calculation
-
8/20/2019 visualapi spark
6/122
Operations =
TRANSFORMATIO
ACTION
+
-
8/20/2019 visualapi spark
7/122
• map
• filter
• flatMap
• mapPartitions
• mapPartitionsWithIndex
• groupBy
• sortBy
= medium
Essential Core & Intermediate Spark Operatio
T R A N S F O R M A
T I O N S
A C T I O N S
General
• sample
• randomSplit
Math / Statistical
= easy
Set Theory / Relational
• union
• intersection
• subtract
• distinct
• cartesian
• zip
• takeOrdered
Data Structure / I/O
• saveAsTextFile• saveAsSequenceFil
• saveAsObjectFile
• saveAsHadoopDatas
• saveAsHadoopFile
• saveAsNewAPIHadoo
• saveAsNewAPIHadoo
• keyBy
• zipWithIndex
• zipWithUniqueID
• zipPartitions
• coalesce
• repartition
• repartitionAndSor
• pipe
• count• takeSample
• max
• min
• sum
• histogram
• mean
• variance
• stdev
• sampleVariance
• countApprox
• countApproxDistinct
• reduce• collect
• aggregate
• fold
• first
• take
• forEach
• top
• treeAggregate
• treeReduce
• forEachPartition
• collectAsMap
-
8/20/2019 visualapi spark
8/122
= medium
Essential Core & Intermediate PairRDD Opera
T R A N S F O R M A T I O N S
A C T I O N S
General
• sampleByKey
Math / Statistical
= easy
Set Theory / Relational Data Structure
• keys• values
• partitionBy
• countByKey• countByValue
• countByValueApprox
• countApproxDistinctByKey
• countApproxDistinctByKey
• countByKeyApprox
• sampleByKeyExact
• cogroup (=groupWith)
• join
• subtractByKey
• fullOuterJoin
• leftOuterJoin
• rightOuterJoin
• flatMapValues
• groupByKey
• reduceByKey
• reduceByKeyLocally
• foldByKey
• aggregateByKey
• sortByKey
• combineByKey
-
8/20/2019 visualapi spark
9/122
vs
narrow wide
each partition of the parent RDD is used by
at most one partition of the child RDD
multiple child RDD partitions may d
on a single parent RDD partition
-
8/20/2019 visualapi spark
10/122
“One of the challenges in providing RDDs as an abstraction is choosing arepresentation for them that can track lineage across a wide range of
transformations.”
“The most interesting question in designing this interface is how to representdependencies between RDDs.”
“We found it both sufficient and useful to classify dependencies into two types:•
narrow dependencies, where each partition of the parent RDD is used by atmost one partition of the child RDD
• wide dependencies, where multiple child partitions may depend on it.”
-
8/20/2019 visualapi spark
11/122
narrow wide
each partition of the parent RDD is used by
at most one partition of the child RDD
multiple child RDD partitions may d
on a single parent RDD partition
map, filter union
join w/ inputsco-partitioned
groupByKey
-
8/20/2019 visualapi spark
12/122
TRANSFORMATIONSCore
-
8/20/2019 visualapi spark
13/122
MAP
3 items in RDD
RDD: x
-
8/20/2019 visualapi spark
14/122
MAP
User function
applied item by item
RDD: x RDD: y
-
8/20/2019 visualapi spark
15/122
MAP
RDD: x RDD: y
-
8/20/2019 visualapi spark
16/122
MAP
RDD: x RDD: y
-
8/20/2019 visualapi spark
17/122
MAP
RDD: x RDD: y
After map() has been applied…
before after
-
8/20/2019 visualapi spark
18/122
MAP
RDD:x
RDD:y
Return a new RDD by applying a function to each element of this RDD.
-
8/20/2019 visualapi spark
19/122
MAP
x = sc.parallelize(["b", "a", "c"])
y = x.map(lambda z: (z, 1))
print(x.collect())
print(y.collect())
['b', 'a', 'c']
[('b', 1), ('a', 1),
RDD: x RDD: y
x:
y:
map( f , preservesPartitioning=False)
Return a new RDD by applying a function to each element of this RDD
val x = sc.parallelize(Array("b", "a", "c"))
val y = x.map(z => (z,1))
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))
-
8/20/2019 visualapi spark
20/122
FILTER
3 items in RDD
RDD:x
-
8/20/2019 visualapi spark
21/122
FILTER
Apply user function:
keep item if functionreturns true
RDD: x RDD: y
emitsTrue
-
8/20/2019 visualapi spark
22/122
FILTER
RDD: x RDD: y
emitsFalse
-
8/20/2019 visualapi spark
23/122
FILTER
RDD: x RDD: y
emitsTrue
-
8/20/2019 visualapi spark
24/122
FILTER
RDD: x RDD: y
After filter() has been applied…
before after
RDD: x RDD: y
-
8/20/2019 visualapi spark
25/122
FILTER
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x%2 == 1) #keep odd values
print(x.collect())
print(y.collect())[1, 2, 3]
[1, 3]
RDD: x RDD: y
x:
y:
filter( f )
Return a new RDD containing only the elements that satisfy a predicate
val x = sc.parallelize(Array(1,2,3))
val y = x.filter(n => n%2 == 1)
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))
-
8/20/2019 visualapi spark
26/122
FLATMAP
3 items in RDD
RDD: x
-
8/20/2019 visualapi spark
27/122
FLATMAP
RDD: x RDD: y
-
8/20/2019 visualapi spark
28/122
FLATMAP
RDD: x RDD: y
-
8/20/2019 visualapi spark
29/122
FLATMAP
RDD: x RDD: y
-
8/20/2019 visualapi spark
30/122
FLATMAP
RDD: x RDD: y
After flatmap() has been applied…
before after
-
8/20/2019 visualapi spark
31/122
FLATMAP
RDD: x RDD: y
Return a new RDD by first applying a function to all elements of this RDD, and then flattening
RDD: x RDD: y
-
8/20/2019 visualapi spark
32/122
FLATMAP
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*100, 42))
print(x.collect())
print(y.collect())
[1, 2, 3]
[1, 100, 42, 2, 200,
x:
y:
RDD: y
flatMap( f , preservesPartitioning=False)
Return a new RDD by first applying a function to all elements of this RDD, and then flattening
val x = sc.parallelize(Array(1,2,3))
val y = x.flatMap(n => Array(n, n*100, 42))
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))
-
8/20/2019 visualapi spark
33/122
GROUPBY
4 items in RDD
RDD: x
James
Anna
Fred
John
-
8/20/2019 visualapi spark
34/122
GROUPBY
RDD: x
James
Anna
Fred
John
emits‘J’
J [ “John” ]
RDD: y
-
8/20/2019 visualapi spark
35/122
F [ “Fred” ]
GROUPBY
RDD: x
James
Anna
Fred
emits‘F’
J [ “John” ]John
RDD: y
-
8/20/2019 visualapi spark
36/122
[ “Fred” ]
GROUPBY
RDD: x
James
Anna
emits‘A’
J [ “John” ]
A [ “Anna” ]
Fred
John
F
RDD: y
-
8/20/2019 visualapi spark
37/122
[ “Fred” ]
GROUPBY
RDD: x
James
Anna
emits‘J’
J [ “John”, “James” ]
[ “Anna” ]
Fred
John
F
A
RDD: y
RDD: x RDD: y
-
8/20/2019 visualapi spark
38/122
GROUPBY
x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
y = x.groupBy(lambda w: w[0])
print [(k, list(v)) for (k, v) in y.collect()]
['John', 'Fred', 'Anna', 'James
[('A',['Anna']),('J',['John','J
x:
y:
groupBy( f , numPartitions=None)
Group the data in the original RDD. Create pairs where the key is the outputa user function, and the value is all items for which the function yields this key
val x = sc.parallelize(
Array("John", "Fred", "Anna", "James"))
val y = x.groupBy(w => w.charAt(0))
println(y.collect().mkString(", "))
-
8/20/2019 visualapi spark
39/122
GROUPBYKEY
5 items in RDD
Pair RDD: x
B
B
A
A
A
5
4
3
2
1
-
8/20/2019 visualapi spark
40/122
GROUPBYKEY
Pair RDD: x
5
4
3
2
1
RDD: y
A [ 2 , 3 , 1 ]
B
B
A
A
A
-
8/20/2019 visualapi spark
41/122
GROUPBYKEY
Pair RDD: x RDD: y
B [ 5 , 4 ]
A [ 2 , 3 , 1 ]
5
4
3
2
1
B
B
A
A
A
RDD: x RDD: y
-
8/20/2019 visualapi spark
42/122
GROUPBYKEY
x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y = x.groupByKey()
print(x.collect())
print(list((j[0], list(j[1])) for j in y.collect()))
[('B', 5),('B', 4),('A'
[('A', [2, 3, 1]),('B',
x:
y:
groupByKey(numPartitions=None)
Group the values for each key in the original RDD. Create a new pair whereoriginal key corresponds to this collected group of values.
val x = sc.parallelize(
Array(('B',5),('B',4),('A',3),('A',2),('A',1)))
val y = x.groupByKey()
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))
-
8/20/2019 visualapi spark
43/122
MAPPARTITIONS
RDD: x RDD: y
partitions
A
B
A
B
-
8/20/2019 visualapi spark
44/122
REDUCEBYKEY VS GROUPBYKEY
val words = Array("one", "two", "two", "three", "three", "three
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()
val wordCountsWithGroup = wordPairsRDD.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()
-
8/20/2019 visualapi spark
45/122
REDUCEBYKEY
(a, 1)
(b, 1)
(a, 1)
(b, 1)
(a, 1)(a, 1) (a, 2)
(b, 2)(b, 1)
(b, 1)
(a, 1)
(a, 1)(a,
(b, (a, 1)
(b, 1)
(b, 1)
(a, 1)
(a, 2) (a, 6)
(a, 3)
(b, 1)
(b, 2) (b, 5)
(b, 2)
a b
-
8/20/2019 visualapi spark
46/122
GROUPBYKEY
(a, 1)
(b, 1)
(a, 1)(a, 1)
(b, 1)
(b, 1)
(a, 1)
(a, 1)
(a, 1)
(b, 1)
(b, 1)
(a, 1)
(a, 1)
(a, 6)(a, 1)
(b, 5)
a b
(a, 1)
(a, 1)
(a, 1)
(b, 1)
(b, 1)
(b, 1)
(b, 1)
(b, 1)
-
8/20/2019 visualapi spark
47/122
MAPPARTITIONS
x:
y:
mapPartitions( f , preservesPartitioning=Fa
Return a new RDD by applying a function to each part
A
B
A
B
x = sc.parallelize([1,2,3], 2)
def f(iterator): yield sum(iterator); yield 42
y = x.mapPartitions(f)
# glom() flattens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())
[[1], [2,
[[1, 42],
-
8/20/2019 visualapi spark
48/122
MAPPARTITIONS
x:
y:
mapPartitions( f , preservesPartitioning=Fa
Return a new RDD by applying a function to each part
A
B
A
B
Array(Arra
Array(Arra
val x = sc.parallelize(Array(1,2,3), 2)
def f (i:Iterator[Int])={ (i.sum,42).productIterator }
val y = x.mapPartitions(f)
// glom() flattens elements on the same partition
val xOut = x.glom().collect()
val yOut = y.glom().collect()
-
8/20/2019 visualapi spark
49/122
MAPPARTITIONSWITHINDEX
RDD: x RDD: y
partitionsA
B
A
B
input
partition index
-
8/20/2019 visualapi spark
50/122
x:
y:
mapPartitionsWithIndex( f , preservesPartitio
Return a new RDD by applying a function to eachwhile tracking the index of the original partition
A
B
B
x = sc.parallelize([1,2,3], 2)
def f(partitionIndex, iterator): yield (partitionIndex, sum(iterator))
y = x.mapPartitionsWithIndex(f)
# glom() flattens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())
[[1],
[[0, 1
MAPPARTITIONSWITHINDEX
partition index
B
-
8/20/2019 visualapi spark
51/122
x:
y:
mapPartitionsWithIndex( f , preservesPartitio
Return a new RDD by applying a function to eachwhile tracking the index of the original partition.
A
B
B
Array(Array(1),
Array(Array(0,
MAPPARTITIONSWITHINDEX
partition index
B
val x = sc.parallelize(Array(1,2,3), 2)
def f(partitionIndex:Int, i:Iterator[Int]) = {(partitionIndex, i.sum).productIterator
}
val y = x.mapPartitionsWithIndex(f)
// glom() flattens elements on the same partition
val xOut = x.glom().collect()
val yOut = y.glom().collect()
-
8/20/2019 visualapi spark
52/122
SAMPLE
RDD: x RDD: y
1
3
5
4
3
2
1
RDD: x RDD: y
-
8/20/2019 visualapi spark
53/122
SAMPLE
x = sc.parallelize([1, 2, 3, 4, 5])
y = x.sample(False, 0.4, 42)
print(x.collect())
print(y.collect())
[1, 2, 3, 4, 5]
[1, 3]
x:
y:
sample(withReplacement, fraction, seed=None)
Return a new RDD containing a statistical sample of the origin
val x = sc.parallelize(Array(1, 2, 3, 4, 5))
val y = x.sample(false, 0.4)
// omitting seed will yield different output
println(y.collect().mkString(", "))
-
8/20/2019 visualapi spark
54/122
UNIONRDD: x
4
3
3
21
A
BC
43
3
2
1
A
B
C
RDD: z
-
8/20/2019 visualapi spark
55/122
UNION
x = sc.parallelize([1,2,3], 2)
y = sc.parallelize([3,4], 1)
z = x.union(y)
print(z.glom().collect())
[1, 2, 3]
[3, 4]
[[1], [2, 3],
x:
y:
union(otherRDD)
Return a new RDD containing all items from two original RDDs.
val x = sc.parallelize(Array(1,2,3), 2)
val y = sc.parallelize(Array(3,4), 1)
val z = x.union(y)
val zOut = z.glom().collect()
z:
-
8/20/2019 visualapi spark
56/122
5B
JOINRDD: x
42B A1A
3A
-
8/20/2019 visualapi spark
57/122
JOINRDD: x
RDD: z
(1, 3)A
5B
42
B A1A
3A
-
8/20/2019 visualapi spark
58/122
JOINRDD: x
RDD: z(1, 4)A
(1, 3)A
5B
42
B A1A
3A
-
8/20/2019 visualapi spark
59/122
JOINRDD: x
(2, 5) RDD: zB(1, 4)A
(1, 3)A
5B
42B
A1A
3A
-
8/20/2019 visualapi spark
60/122
JOIN
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
z = x.join(y)
print(z.collect())[("a", 1), ("b", 2)]
[("a", 3), ("a", 4), (
[('a', (1, 3)), ('a',
x:
y:
union(otherRDD, numPartitions=None)
Return a new RDD containing all pairs of elements having the same
val x = sc.parallelize(Array(("a", 1), ("b", 2)))
val y = sc.parallelize(Array(("a", 3), ("a", 4), ("b", 5)))
val z = x.join(y)
println(z.collect().mkString(", "))
z:
-
8/20/2019 visualapi spark
61/122
DISTINCT
4
3
3
2
1
RDD: x
-
8/20/2019 visualapi spark
62/122
DISTINCTRDD: x
4
3
3
2
1
4
3
3
2
1
RDD: y
-
8/20/2019 visualapi spark
63/122
DISTINCTRDD: x
4
3
3
2
1
4
3
2
1
RDD: y
**
*¤
¤
-
8/20/2019 visualapi spark
64/122
DISTINCT
x = sc.parallelize([1,2,3,3,4])
y = x.distinct()
print(y.collect())[1, 2, 3,
[1, 2, 3,
x:
y:
distinct(numPartitions=None)
Return a new RDD containing distinct items from the original RDD (o
val x = sc.parallelize(Array(1,2,3,3,4))
val y = x.distinct()
println(y.collect().mkString(", "))
COALESCE
-
8/20/2019 visualapi spark
65/122
COALESCERDD: x
A
B
C
COALESCE
-
8/20/2019 visualapi spark
66/122
COALESCERDD: x
B
C
RDD:
AAB
COALESCE
-
8/20/2019 visualapi spark
67/122
C
COALESCERDD: x
B
C
RDD:
A
AB
COALESCE
C
B
C
-
8/20/2019 visualapi spark
68/122
COALESCE
x = sc.parallelize([1, 2, 3, 4, 5], 3)
y = x.coalesce(2)
print(x.glom().collect())
print(y.glom().collect())
[[1], [2, 3], [
[[1], [2, 3, 4,
x:
y:
coalesce(numPartitions, shuffle=False
Return a new RDD which is reduced to a smaller num
val x = sc.parallelize(Array(1, 2, 3, 4, 5), 3)
val y = x.coalesce(2)
val xOut = x.glom().collect()
val yOut = y.glom().collect()
B
A
AB
KEYBY
-
8/20/2019 visualapi spark
69/122
KEYBY
RDD: x
James
Anna
Fred
John
RDD: y
J “John”
emits‘J’
KEYBY
-
8/20/2019 visualapi spark
70/122
KEYBY
RDD: x
James
Anna
‘F’
Fred “Fred”F
RDD: y
J “John”John
KEYBY
-
8/20/2019 visualapi spark
71/122
James
“Anna”A
KEYBY
RDD: x
Anna
‘A’
Fred
John
“Fred”F
RDD: y
J “John”
KEYBY
-
8/20/2019 visualapi spark
72/122
J “James”
“Anna”A
KEYBY
RDD: x
James
Anna
emits‘J’
Fred
John
“Fred”F
RDD: y
J “John”
KEYBY
RDD: x RDD: y
-
8/20/2019 visualapi spark
73/122
KEYBY
x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
y = x.keyBy(lambda w: w[0])
print y.collect()
['John', 'Fred', 'Anna', 'James
[('J','John'),('F','Fred'),('A'
x:
y:
keyBy( f )
Create a Pair RDD, forming one pair for each item in the
pair’s key is calculated from the value via a user-supplied
val x = sc.parallelize(
Array("John", "Fred", "Anna", "James"))
val y = x.keyBy(w => w.charAt(0))
println(y.collect().mkString(", "))
PARTITIONBY
-
8/20/2019 visualapi spark
74/122
PARTITIONBYRDD: x
J “John”“Anna”A
“Fred”F
J “James”
PARTITIONBY
-
8/20/2019 visualapi spark
75/122
PARTITIONBYRDD: x
J “John”“Anna”A
“Fred”F
J “James”
RDD:RDD:
J “James”
1
PARTITIONBY
-
8/20/2019 visualapi spark
76/122
PARTITIONBYRDD: x
J “John”“Anna”A
“Fred”F
RDD:RDD:
J “James”
“FredF
0
J “James”
PARTITIONBY
-
8/20/2019 visualapi spark
77/122
PARTITIONBYRDD: x
J “John”“Anna”A
RDD:RDD:
J “James”
“Anna”A
“FredF
0
“Fred”F
J “James”
PARTITIONBY
-
8/20/2019 visualapi spark
78/122
PARTITIONBYRDD: x
J“John”
RDD:RDD:
J“John”
J “James”
“Anna”A
“FredF1
“Anna”A
“Fred”F
J “James”
PARTITIONBY
-
8/20/2019 visualapi spark
79/122
PARTITIONBY
x = sc.parallelize([('J','James'),('F','Fred'),
('A','Anna'),('J','John')], 3)
y = x.partitionBy(2, lambda w: 0 if w[0] < 'H' else 1)
print x.glom().collect()
print y.glom().collect()
[[('J', 'James'[('A', 'Anna')
[[('A', 'Anna')
[('J', 'James'
x:
y:
partitionBy(numPartitions, partitione
Return a new RDD with the specified number of partitems into the partition returned by a user supplied f
PARTITIONBY
-
8/20/2019 visualapi spark
80/122
PARTITIONBY
Array(Array((A,Array((J,J
Array(Array((F,
Array((J,J
x:
y:
partitionBy(numPartitions, partitione
Return a new RDD with the specified number of partitems into the partition returned by a user supplied f
import org.apache.spark.Partitioner
val x = sc.parallelize(Array(('J',"James"),('F',"Fred"),
('A',"Anna"),('J',"John")), 3)
val y = x.partitionBy(new Partitioner() {val numPartitions = 2
def getPartition(k:Any) = {
if (k.asInstanceOf[Char] < 'H') 0 else 1
}
})
val yOut = y.glom().collect()
ZIP
-
8/20/2019 visualapi spark
81/122
ZIPRDD: x RD
3
2
1
A
B
9
4
1
A
B
ZIP
-
8/20/2019 visualapi spark
82/122
ZIPRDD: x RD
3
2
1
A
B
4
A
RDD: z
9
4
1
A
B
1 1
ZIP
-
8/20/2019 visualapi spark
83/122
ZIPRDD: x RD
3
2
1
A
B
4
A
RDD: z
9
4
1
A
B
2
1
4
1
ZIP
-
8/20/2019 visualapi spark
84/122
ZIPRDD: x RD
3
2
1
A
B
4
3
A
B
RDD: z
9
4
1
A
B
2
1
9
4
1
ZIP
-
8/20/2019 visualapi spark
85/122
ZIP
x = sc.parallelize([1, 2, 3])
y = x.map(lambda n:n*n)
z = x.zip(y)
print(z.collect()) [1, 2, 3]
[1, 4, 9]
[(1, 1), (2, 4
x:
y:
zip(otherRDD)
Return a new RDD containing pairs whose key is the item in the
value is that item’s corresponding element (same partition, same
val x = sc.parallelize(Array(1,2,3))
val y = x.map(n=>n*n)
val z = x.zip(y)
println(z.collect().mkString(", "))
z:
-
8/20/2019 visualapi spark
86/122
ACTIONSCore
vs
-
8/20/2019 visualapi spark
87/122
vs
distributed driver
occurs across the cluster result must fit in driver JVM
GETNUMPARTITIONS
-
8/20/2019 visualapi spark
88/122
partition(s) A
B
2
GETNUMPARTITIONS B
-
8/20/2019 visualapi spark
89/122
x:
y:
getNumPartitions()
Return the number of partitions
[[1], [2,
2
A
x = sc.parallelize([1,2,3], 2)
y = x.getNumPartitions()
print(x.glom().collect())
print(y)
val x = sc.parallelize(Array(1,2,3), 2)
val y = x.partitions.size
val xOut = x.glom().collect()
println(y)
COLLECT
-
8/20/2019 visualapi spark
90/122
partition(s) A
B[ ]
BCOLLECT [
-
8/20/2019 visualapi spark
91/122
x:
y:
collect()
Return all items in the RDD to the driver in a
[[1], [2,
[1, 2, 3]
A
x = sc.parallelize([1,2,3], 2)
y = x.collect()
print(x.glom().collect())
print(y)
val x = sc.parallelize(Array(1,2,3), 2)
val y = x.collect()
val xOut = x.glom().collect()
println(y)
[
REDUCE
-
8/20/2019 visualapi spark
92/122
3
2
1
emits
4
3
REDUCE
-
8/20/2019 visualapi spark
93/122
4
3
2
1
emits
3
input
6
REDUCE
-
8/20/2019 visualapi spark
94/122
4
3
2
1
10input
6
10
REDUCE
******
***
***
-
8/20/2019 visualapi spark
95/122
x:
y:
reduce( f )
Aggregate all the elements of the RDD by applying apairwise to elements and partial results, and returns a
[1, 2, 3,
10
x = sc.parallelize([1,2,3,4])
y = x.reduce(lambda a,b: a+b)
print(x.collect())
print(y)
val x = sc.parallelize(Array(1,2,3,4))
val y = x.reduce((a,b) => a+b)
println(x.collect.mkString(", "))
println(y)
AGGREGATE
-
8/20/2019 visualapi spark
96/122
3
2
1
4
A
B
AGGREGATE
-
8/20/2019 visualapi spark
97/122
3
2
1
4
AGGREGATE
-
8/20/2019 visualapi spark
98/122
3
2
1
4
emits
([], 0)
([1], 1)
AGGREGATE
-
8/20/2019 visualapi spark
99/122
3
2
1
4
([], 0)
([2], 2)([1], 1)
AGGREGATE
-
8/20/2019 visualapi spark
100/122
3
2
1
4
([2], 2)([1], 1)([1,2], 3)
AGGREGATE
-
8/20/2019 visualapi spark
101/122
3
2
1
4
([2], 2)([1], 1)([1,2], 3)
([3], 3)
([], 0)
AGGREGATE ([], 0)
-
8/20/2019 visualapi spark
102/122
3
2
1
4
([2], 2)([1], 1)([1,2], 3)
([3], 3)
([4], 4)
AGGREGATE
-
8/20/2019 visualapi spark
103/122
3
2
1
4
([2], 2)([1], 1)([1,2], 3)
([4], 4)
([3], 3)([3,4], 7)
AGGREGATE
-
8/20/2019 visualapi spark
104/122
3
2
1
4
([2], 2)([1], 1)([1,2], 3)
([4], 4)
([3], 3)([3,4], 7)
AGGREGATE
-
8/20/2019 visualapi spark
105/122
3
2
1
4
([1,2], 3)
([3,4], 7)
AGGREGATE
-
8/20/2019 visualapi spark
106/122
3
2
1
4
([1,2], 3)
([3,4], 7)
([1,2,3,4], 10)
([1,2,3,4]
AGGREGATE*******
*****
[( )
-
8/20/2019 visualapi spark
107/122
aggregate(identity, seqOp, combOp
Aggregate all the elements of the RDD by:- applying a user function to combine elements with us- then combining those user-defined results via a secon
- and finally returning a result to the driver.
seqOp = lambda data, item: (data[0] + [item], data[1] + item)
combOp = lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1])
x = sc.parallelize([1,2,3,4])y = x.aggregate(([], 0), seqOp, combOp)
print(y)
x:
y:
[1,
([1,
AGGREGATE*******
*****
[( )
-
8/20/2019 visualapi spark
108/122
def seqOp = (data:(Array[Int], Int), item:Int) =>
(data._1 :+ item, data._2 + item)
def combOp = (d1:(Array[Int], Int), d2:(Array[Int], Int)) =>(d1._1.union(d2._1), d1._2 + d2._2)
val x = sc.parallelize(Array(1,2,3,4))
val y = x.aggregate((Array[Int](), 0))(seqOp, combOp)
println(y)
x:
y:
aggregate(identity, seqOp, combOp
Aggregate all the elements of the RDD by:- applying a user function to combine elements with us- then combining those user-defined results via a secon
- and finally returning a result to the driver.
[1,
(Arr
MAX
-
8/20/2019 visualapi spark
109/122
42
1
4
MAX 2
1
4 max
-
8/20/2019 visualapi spark
110/122
x:
y:
max()
Return the maximum item in the
[2, 4, 1]
4
x = sc.parallelize([2,4,1])
y = x.max()
print(x.collect())
print(y)
val x = sc.parallelize(Array(2,4,1))
val y = x.max
println(x.collect().mkString(", "))
println(y)
SUM
-
8/20/2019 visualapi spark
111/122
72
1
4
SUM 2
1
4 Σ
-
8/20/2019 visualapi spark
112/122
x:
y:
sum()
Return the sum of the items in th
[2, 4, 1]
7
x = sc.parallelize([2,4,1])
y = x.sum()
print(x.collect())
print(y)
val x = sc.parallelize(Array(2,4,1))
val y = x.sum
println(x.collect().mkString(", "))
println(y)
MEAN
-
8/20/2019 visualapi spark
113/122
2.333333332
1
4
MEAN 2
1
4 x
-
8/20/2019 visualapi spark
114/122
x:
y:
mean()
Return the mean of the items in
[2, 4, 1]
2.3333333
x = sc.parallelize([2,4,1])
y = x.mean()
print(x.collect())
print(y)
val x = sc.parallelize(Array(2,4,1))
val y = x.mean
println(x.collect().mkString(", "))
println(y)
STDEV
-
8/20/2019 visualapi spark
115/122
1.24721912
1
4
STDEV 2
1
4 σ
-
8/20/2019 visualapi spark
116/122
x:
y:
stdev()
Return the standard deviation of the ite
[2, 4, 1]
1.2472191
x = sc.parallelize([2,4,1])
y = x.stdev()
print(x.collect())
print(y)
val x = sc.parallelize(Array(2,4,1))
val y = x.stdev
println(x.collect().mkString(", "))
println(y)
COUNTBYKEY
-
8/20/2019 visualapi spark
117/122
{'A': 1, 'J': 2, '
J “John”
“Anna”A
“Fred”F
J “James”
COUNTBYKEY
-
8/20/2019 visualapi spark
118/122
x:
y:
countByKey()
Return a map of keys and counts of their oc
[('J', 'J('A','A
{'A': 1,
x = sc.parallelize([('J', 'James'), ('F','Fred'),
('A','Anna'), ('J','John')])
y = x.countByKey()
print(y)
val x = sc.parallelize(Array(('J',"James"),('F',"Fred"),
('A',"Anna"),('J',"John")))
val y = x.countByKey()
println(y)
SAVEASTEXTFILE
-
8/20/2019 visualapi spark
119/122
SAVEASTEXTFILE
-
8/20/2019 visualapi spark
120/122
x:
y:
saveAsTextFile( path, compressionCoSave the RDD to the filesystem indica
[2, 4, 1]
[u'2', u'
dbutils.fs.rm("/temp/demo", True)
x = sc.parallelize([2,4,1])
x.saveAsTextFile("/temp/demo")
y = sc.textFile("/temp/demo")
print(y.collect())
dbutils.fs.rm("/temp/demo", true)
val x = sc.parallelize(Array(2,4,1))
x.saveAsTextFile("/temp/demo")
val y = sc.textFile("/temp/demo")
println(y.collect().mkString(", "))
LAB
-
8/20/2019 visualapi spark
121/122
Q&A
-
8/20/2019 visualapi spark
122/122