visualapi spark

Upload: plbrook

Post on 07-Aug-2018

213 views

Category:

Documents


0 download

TRANSCRIPT

  • 8/20/2019 visualapi spark

    1/122

    TRANSFORMATIONS AND ACTIONSA Visual Ghttp://training.databricks.com/visualapi.pdf

  • 8/20/2019 visualapi spark

    2/122

    Databricks would like to give a special thanks to Jeff Thomspon for contributing 67

    visual diagrams depicting the Spark API under the MIT license to the Spark

    community.

    Jeff’s original, creative work can be found here and you can read more about

    Jeff’s project in his blog post.

    After talking to Jeff, Databricks commissioned Adam Breindel to further evolveJeff’s work into the diagrams you see in this deck.

    LinkedIn

    Blog: data-frack

    http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynbhttp://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttps://www.linkedin.com/profile/view?id=8052185https://www.linkedin.com/profile/view?id=128303555http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttp://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttps://www.linkedin.com/profile/view?id=128303555https://www.linkedin.com/profile/view?id=8052185http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.htmlhttp://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb

  • 8/20/2019 visualapi spark

    3/122

    making big data simple

    Databricks Cloud:

    “A unified platform for building Big Data pExploration and Dashboards, to Advanced

    Products.”

    • Founded in late 2013

    • by the creators of Apache Spark

    • Original team from UC Berkeley AMPLab

    • Raised $47 Million in 2 rounds

    • ~55 employees

    • We’re hiring!

    • Level 2/3 support partnerships with

    • Hortonworks

    • MapR

    • DataStax

    (http://databricks.workable.com)

  • 8/20/2019 visualapi spark

    4/122

    key

    RDD Ele

    original

    transfortype

    object on

    RDD

    partition(s) A

    B

    user functions

    user input

    input

    emitted value

    Legend

  • 8/20/2019 visualapi spark

    5/122

    Randomized operation

    Legend

    Set Theory / Relational operation

    Numeric calculation

  • 8/20/2019 visualapi spark

    6/122

    Operations =

    TRANSFORMATIO

    ACTION

    +

  • 8/20/2019 visualapi spark

    7/122

    • map

    • filter

    • flatMap

    • mapPartitions

    • mapPartitionsWithIndex

    • groupBy

    • sortBy

    = medium

    Essential Core & Intermediate Spark Operatio

           T       R       A       N       S       F        O       R       M       A

           T       I        O       N       S

           A       C       T       I        O       N       S

    General

    • sample

    • randomSplit

    Math / Statistical

    = easy

    Set Theory / Relational

    • union

    • intersection

    • subtract

    • distinct

    • cartesian

    • zip

    • takeOrdered

    Data Structure / I/O

    • saveAsTextFile• saveAsSequenceFil

    • saveAsObjectFile

    • saveAsHadoopDatas

    • saveAsHadoopFile

    • saveAsNewAPIHadoo

    • saveAsNewAPIHadoo

    • keyBy

    • zipWithIndex

    • zipWithUniqueID

    • zipPartitions

    • coalesce

    • repartition

    • repartitionAndSor

    • pipe

    • count• takeSample

    • max

    • min

    • sum

    • histogram

    • mean

    • variance

    • stdev

    • sampleVariance

    • countApprox

    • countApproxDistinct

    • reduce• collect

    • aggregate

    • fold

    • first

    • take

    • forEach

    • top

    • treeAggregate

    • treeReduce

    • forEachPartition

    • collectAsMap

  • 8/20/2019 visualapi spark

    8/122

    = medium

    Essential Core & Intermediate PairRDD Opera

           T       R       A       N       S       F        O       R       M       A       T       I        O       N       S

           A       C       T       I        O       N       S

    General

    • sampleByKey

    Math / Statistical

    = easy

    Set Theory / Relational Data Structure

    • keys• values

    • partitionBy

    • countByKey• countByValue

    • countByValueApprox

    • countApproxDistinctByKey

    • countApproxDistinctByKey

    • countByKeyApprox

    • sampleByKeyExact

    • cogroup (=groupWith)

    • join

    • subtractByKey

    • fullOuterJoin

    • leftOuterJoin

    • rightOuterJoin

    • flatMapValues

    • groupByKey

    • reduceByKey

    • reduceByKeyLocally

    • foldByKey

    • aggregateByKey

    • sortByKey

    • combineByKey

  • 8/20/2019 visualapi spark

    9/122

    vs

    narrow wide

    each partition of the parent RDD is used by

    at most one partition of the child RDD

    multiple child RDD partitions may d

    on a single parent RDD partition

  • 8/20/2019 visualapi spark

    10/122

    “One of the challenges in providing RDDs as an abstraction is choosing arepresentation for them that can track lineage across a wide range of

    transformations.”

    “The most interesting question in designing this interface is how to representdependencies between RDDs.”

    “We found it both sufficient and useful to classify dependencies into two types:•

    narrow dependencies, where each partition of the parent RDD is used by atmost one partition of the child RDD

    • wide dependencies, where multiple child partitions may depend on it.”

  • 8/20/2019 visualapi spark

    11/122

    narrow wide

    each partition of the parent RDD is used by

    at most one partition of the child RDD

    multiple child RDD partitions may d

    on a single parent RDD partition

    map, filter union

    join w/ inputsco-partitioned

    groupByKey

  • 8/20/2019 visualapi spark

    12/122

    TRANSFORMATIONSCore

  • 8/20/2019 visualapi spark

    13/122

    MAP

    3 items in RDD

    RDD: x

  • 8/20/2019 visualapi spark

    14/122

    MAP

    User function

    applied item by item

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    15/122

    MAP

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    16/122

    MAP

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    17/122

    MAP

    RDD: x RDD: y

    After map() has been applied…

    before after

  • 8/20/2019 visualapi spark

    18/122

    MAP

    RDD:x

    RDD:y

    Return a new RDD by applying a function to each element of this RDD.

  • 8/20/2019 visualapi spark

    19/122

    MAP

    x = sc.parallelize(["b", "a", "c"])

    y = x.map(lambda z: (z, 1))

    print(x.collect())

    print(y.collect())

    ['b', 'a', 'c']

    [('b', 1), ('a', 1),

    RDD: x RDD: y

    x:

    y:

    map( f , preservesPartitioning=False)

    Return a new RDD by applying a function to each element of this RDD

    val x = sc.parallelize(Array("b", "a", "c"))

    val y = x.map(z => (z,1))

    println(x.collect().mkString(", "))

    println(y.collect().mkString(", "))

  • 8/20/2019 visualapi spark

    20/122

    FILTER

    3 items in RDD

    RDD:x

  • 8/20/2019 visualapi spark

    21/122

    FILTER

    Apply user function:

    keep item if functionreturns true

    RDD: x RDD: y

    emitsTrue

  • 8/20/2019 visualapi spark

    22/122

    FILTER

    RDD: x RDD: y

    emitsFalse

  • 8/20/2019 visualapi spark

    23/122

    FILTER

    RDD: x RDD: y

    emitsTrue

  • 8/20/2019 visualapi spark

    24/122

    FILTER

    RDD: x RDD: y

    After filter() has been applied…

    before after

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    25/122

    FILTER

    x = sc.parallelize([1,2,3])

    y = x.filter(lambda x: x%2 == 1) #keep odd values

    print(x.collect())

    print(y.collect())[1, 2, 3]

    [1, 3]

    RDD: x RDD: y

    x:

    y:

    filter( f )

    Return a new RDD containing only the elements that satisfy a predicate

    val x = sc.parallelize(Array(1,2,3))

    val y = x.filter(n => n%2 == 1)

    println(x.collect().mkString(", "))

    println(y.collect().mkString(", "))

  • 8/20/2019 visualapi spark

    26/122

    FLATMAP

    3 items in RDD

    RDD: x

  • 8/20/2019 visualapi spark

    27/122

    FLATMAP

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    28/122

    FLATMAP

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    29/122

    FLATMAP

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    30/122

    FLATMAP

    RDD: x RDD: y

    After flatmap() has been applied…

    before after

  • 8/20/2019 visualapi spark

    31/122

    FLATMAP

    RDD: x RDD: y

    Return a new RDD by first applying a function to all elements of this RDD, and then flattening

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    32/122

    FLATMAP

    x = sc.parallelize([1,2,3])

    y = x.flatMap(lambda x: (x, x*100, 42))

    print(x.collect())

    print(y.collect())

    [1, 2, 3]

    [1, 100, 42, 2, 200,

    x:

    y:

    RDD: y

    flatMap( f , preservesPartitioning=False)

    Return a new RDD by first applying a function to all elements of this RDD, and then flattening

    val x = sc.parallelize(Array(1,2,3))

    val y = x.flatMap(n => Array(n, n*100, 42))

    println(x.collect().mkString(", "))

    println(y.collect().mkString(", "))

  • 8/20/2019 visualapi spark

    33/122

    GROUPBY

    4 items in RDD

    RDD: x

    James

    Anna

    Fred

    John

  • 8/20/2019 visualapi spark

    34/122

    GROUPBY

    RDD: x

    James

    Anna

    Fred

    John

    emits‘J’

    J [ “John” ]

    RDD: y

  • 8/20/2019 visualapi spark

    35/122

    F [ “Fred” ]

    GROUPBY

    RDD: x

    James

    Anna

    Fred

    emits‘F’

    J [ “John” ]John

    RDD: y

  • 8/20/2019 visualapi spark

    36/122

    [ “Fred” ]

    GROUPBY

    RDD: x

    James

    Anna

    emits‘A’

    J [ “John” ]

    A [ “Anna” ]

    Fred

    John

    F

    RDD: y

  • 8/20/2019 visualapi spark

    37/122

    [ “Fred” ]

    GROUPBY

    RDD: x

    James

    Anna

    emits‘J’

    J [ “John”, “James” ]

    [ “Anna” ]

    Fred

    John

    F

    A

    RDD: y

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    38/122

    GROUPBY

    x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])

    y = x.groupBy(lambda w: w[0])

    print [(k, list(v)) for (k, v) in y.collect()]

    ['John', 'Fred', 'Anna', 'James

    [('A',['Anna']),('J',['John','J

    x:

    y:

    groupBy( f , numPartitions=None)

    Group the data in the original RDD. Create pairs where the key is the outputa user function, and the value is all items for which the function yields this key

    val x = sc.parallelize(

    Array("John", "Fred", "Anna", "James"))

    val y = x.groupBy(w => w.charAt(0))

    println(y.collect().mkString(", "))

  • 8/20/2019 visualapi spark

    39/122

    GROUPBYKEY

    5 items in RDD

    Pair RDD: x

    B

    B

    A

    A

    A

    5

    4

    3

    2

    1

  • 8/20/2019 visualapi spark

    40/122

    GROUPBYKEY

    Pair RDD: x

    5

    4

    3

    2

    1

    RDD: y

    A [ 2 , 3 , 1 ]

    B

    B

    A

    A

    A

  • 8/20/2019 visualapi spark

    41/122

    GROUPBYKEY

    Pair RDD: x RDD: y

    B [ 5 , 4 ]

    A [ 2 , 3 , 1 ]

    5

    4

    3

    2

    1

    B

    B

    A

    A

    A

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    42/122

    GROUPBYKEY

    x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])

    y = x.groupByKey()

    print(x.collect())

    print(list((j[0], list(j[1])) for j in y.collect()))

    [('B', 5),('B', 4),('A'

    [('A', [2, 3, 1]),('B',

    x:

    y:

    groupByKey(numPartitions=None)

    Group the values for each key in the original RDD. Create a new pair whereoriginal key corresponds to this collected group of values.

    val x = sc.parallelize(

    Array(('B',5),('B',4),('A',3),('A',2),('A',1)))

    val y = x.groupByKey()

    println(x.collect().mkString(", "))

    println(y.collect().mkString(", "))

  • 8/20/2019 visualapi spark

    43/122

    MAPPARTITIONS

    RDD: x RDD: y

    partitions

    A

    B

    A

    B

  • 8/20/2019 visualapi spark

    44/122

    REDUCEBYKEY   VS   GROUPBYKEY

    val words = Array("one", "two", "two", "three", "three", "three

    val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

    val wordCountsWithReduce = wordPairsRDD

    .reduceByKey(_ + _)

    .collect()

    val wordCountsWithGroup = wordPairsRDD.groupByKey()

    .map(t => (t._1, t._2.sum))

    .collect()

  • 8/20/2019 visualapi spark

    45/122

    REDUCEBYKEY

    (a, 1)

    (b, 1)

    (a, 1)

    (b, 1)

    (a, 1)(a, 1) (a, 2)

    (b, 2)(b, 1)

    (b, 1)

    (a, 1)

    (a, 1)(a,

    (b, (a, 1)

    (b, 1)

    (b, 1)

    (a, 1)

    (a, 2) (a, 6)

    (a, 3)

    (b, 1)

    (b, 2) (b, 5)

    (b, 2)

    a b

  • 8/20/2019 visualapi spark

    46/122

    GROUPBYKEY

    (a, 1)

    (b, 1)

    (a, 1)(a, 1)

    (b, 1)

    (b, 1)

    (a, 1)

    (a, 1)

    (a, 1)

    (b, 1)

    (b, 1)

    (a, 1)

    (a, 1)

    (a, 6)(a, 1)

    (b, 5)

    a b

    (a, 1)

    (a, 1)

    (a, 1)

    (b, 1)

    (b, 1)

    (b, 1)

    (b, 1)

    (b, 1)

  • 8/20/2019 visualapi spark

    47/122

    MAPPARTITIONS

    x:

    y:

    mapPartitions( f , preservesPartitioning=Fa

    Return a new RDD by applying a function to each part

    A

    B

    A

    B

    x = sc.parallelize([1,2,3], 2)

    def f(iterator): yield sum(iterator); yield 42

    y = x.mapPartitions(f)

    # glom() flattens elements on the same partition

    print(x.glom().collect())

    print(y.glom().collect())

    [[1], [2,

    [[1, 42],

  • 8/20/2019 visualapi spark

    48/122

    MAPPARTITIONS

    x:

    y:

    mapPartitions( f , preservesPartitioning=Fa

    Return a new RDD by applying a function to each part

    A

    B

    A

    B

    Array(Arra

    Array(Arra

    val x = sc.parallelize(Array(1,2,3), 2)

    def f (i:Iterator[Int])={ (i.sum,42).productIterator }

    val y = x.mapPartitions(f)

    // glom() flattens elements on the same partition

    val xOut = x.glom().collect()

    val yOut = y.glom().collect()

  • 8/20/2019 visualapi spark

    49/122

    MAPPARTITIONSWITHINDEX

    RDD: x RDD: y

    partitionsA

    B

    A

    B

    input

    partition index

  • 8/20/2019 visualapi spark

    50/122

    x:

    y:

    mapPartitionsWithIndex( f , preservesPartitio

    Return a new RDD by applying a function to eachwhile tracking the index of the original partition

    A

    B

    B

    x = sc.parallelize([1,2,3], 2)

    def f(partitionIndex, iterator): yield (partitionIndex, sum(iterator))

    y = x.mapPartitionsWithIndex(f)

    # glom() flattens elements on the same partition

    print(x.glom().collect())

    print(y.glom().collect())

    [[1],

    [[0, 1

    MAPPARTITIONSWITHINDEX

    partition index

    B

  • 8/20/2019 visualapi spark

    51/122

    x:

    y:

    mapPartitionsWithIndex( f , preservesPartitio

    Return a new RDD by applying a function to eachwhile tracking the index of the original partition.

    A

    B

    B

    Array(Array(1),

    Array(Array(0,

    MAPPARTITIONSWITHINDEX

    partition index

    B

    val x = sc.parallelize(Array(1,2,3), 2)

    def f(partitionIndex:Int, i:Iterator[Int]) = {(partitionIndex, i.sum).productIterator

    }

    val y = x.mapPartitionsWithIndex(f)

    // glom() flattens elements on the same partition

    val xOut = x.glom().collect()

    val yOut = y.glom().collect()

  • 8/20/2019 visualapi spark

    52/122

    SAMPLE

    RDD: x RDD: y

    1

    3

    5

    4

    3

    2

    1

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    53/122

    SAMPLE

    x = sc.parallelize([1, 2, 3, 4, 5])

    y = x.sample(False, 0.4, 42)

    print(x.collect())

    print(y.collect())

    [1, 2, 3, 4, 5]

    [1, 3]

    x:

    y:

    sample(withReplacement, fraction, seed=None)

    Return a new RDD containing a statistical sample of the origin

    val x = sc.parallelize(Array(1, 2, 3, 4, 5))

    val y = x.sample(false, 0.4)

    // omitting seed will yield different output

    println(y.collect().mkString(", "))

  • 8/20/2019 visualapi spark

    54/122

    UNIONRDD: x

    4

    3

    3

    21

    A

    BC

    43

    3

    2

    1

    A

    B

    C

    RDD: z

  • 8/20/2019 visualapi spark

    55/122

    UNION

    x = sc.parallelize([1,2,3], 2)

    y = sc.parallelize([3,4], 1)

    z = x.union(y)

    print(z.glom().collect())

    [1, 2, 3]

    [3, 4]

    [[1], [2, 3],

    x:

    y:

    union(otherRDD)

    Return a new RDD containing all items from two original RDDs.

    val x = sc.parallelize(Array(1,2,3), 2)

    val y = sc.parallelize(Array(3,4), 1)

    val z = x.union(y)

    val zOut = z.glom().collect()

    z:

  • 8/20/2019 visualapi spark

    56/122

    5B

    JOINRDD: x

    42B A1A

    3A

  • 8/20/2019 visualapi spark

    57/122

    JOINRDD: x

    RDD: z

    (1, 3)A

    5B

    42

    B A1A

    3A

  • 8/20/2019 visualapi spark

    58/122

    JOINRDD: x

    RDD: z(1, 4)A

    (1, 3)A

    5B

    42

    B A1A

    3A

  • 8/20/2019 visualapi spark

    59/122

    JOINRDD: x

    (2, 5) RDD: zB(1, 4)A

    (1, 3)A

    5B

    42B

    A1A

    3A

  • 8/20/2019 visualapi spark

    60/122

    JOIN

    x = sc.parallelize([("a", 1), ("b", 2)])

    y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])

    z = x.join(y)

    print(z.collect())[("a", 1), ("b", 2)]

    [("a", 3), ("a", 4), (

    [('a', (1, 3)), ('a',

    x:

    y:

    union(otherRDD, numPartitions=None)

    Return a new RDD containing all pairs of elements having the same

    val x = sc.parallelize(Array(("a", 1), ("b", 2)))

    val y = sc.parallelize(Array(("a", 3), ("a", 4), ("b", 5)))

    val z = x.join(y)

    println(z.collect().mkString(", "))

    z:

  • 8/20/2019 visualapi spark

    61/122

    DISTINCT

    4

    3

    3

    2

    1

    RDD: x

  • 8/20/2019 visualapi spark

    62/122

    DISTINCTRDD: x

    4

    3

    3

    2

    1

    4

    3

    3

    2

    1

    RDD: y

  • 8/20/2019 visualapi spark

    63/122

    DISTINCTRDD: x

    4

    3

    3

    2

    1

    4

    3

    2

    1

    RDD: y

    **

    ¤

  • 8/20/2019 visualapi spark

    64/122

    DISTINCT

    x = sc.parallelize([1,2,3,3,4])

    y = x.distinct()

    print(y.collect())[1, 2, 3,

    [1, 2, 3,

    x:

    y:

    distinct(numPartitions=None)

    Return a new RDD containing distinct items from the original RDD (o

    val x = sc.parallelize(Array(1,2,3,3,4))

    val y = x.distinct()

    println(y.collect().mkString(", "))

    COALESCE

  • 8/20/2019 visualapi spark

    65/122

    COALESCERDD: x

    A

    B

    C

    COALESCE

  • 8/20/2019 visualapi spark

    66/122

    COALESCERDD: x

    B

    C

    RDD:

    AAB

    COALESCE

  • 8/20/2019 visualapi spark

    67/122

    C

    COALESCERDD: x

    B

    C

    RDD:

    A

    AB

    COALESCE

    C

    B

    C

  • 8/20/2019 visualapi spark

    68/122

    COALESCE

    x = sc.parallelize([1, 2, 3, 4, 5], 3)

    y = x.coalesce(2)

    print(x.glom().collect())

    print(y.glom().collect())

    [[1], [2, 3], [

    [[1], [2, 3, 4,

    x:

    y:

    coalesce(numPartitions, shuffle=False

    Return a new RDD which is reduced to a smaller num

    val x = sc.parallelize(Array(1, 2, 3, 4, 5), 3)

    val y = x.coalesce(2)

    val xOut = x.glom().collect()

    val yOut = y.glom().collect()

    B

    A

    AB

    KEYBY

  • 8/20/2019 visualapi spark

    69/122

    KEYBY

    RDD: x

    James

    Anna

    Fred

    John

    RDD: y

    J “John”

    emits‘J’

    KEYBY

  • 8/20/2019 visualapi spark

    70/122

    KEYBY

    RDD: x

    James

    Anna

    ‘F’

    Fred “Fred”F

    RDD: y

    J “John”John

    KEYBY

  • 8/20/2019 visualapi spark

    71/122

    James

    “Anna”A

    KEYBY

    RDD: x

    Anna

    ‘A’

    Fred

    John

    “Fred”F

    RDD: y

    J “John”

    KEYBY

  • 8/20/2019 visualapi spark

    72/122

    J “James”

    “Anna”A

    KEYBY

    RDD: x

    James

    Anna

    emits‘J’

    Fred

    John

    “Fred”F

    RDD: y

    J “John”

    KEYBY

    RDD: x RDD: y

  • 8/20/2019 visualapi spark

    73/122

    KEYBY

    x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])

    y = x.keyBy(lambda w: w[0])

    print y.collect()

    ['John', 'Fred', 'Anna', 'James

    [('J','John'),('F','Fred'),('A'

    x:

    y:

    keyBy( f )

    Create a Pair RDD, forming one pair for each item in the

    pair’s key is calculated from the value via a user-supplied

    val x = sc.parallelize(

    Array("John", "Fred", "Anna", "James"))

    val y = x.keyBy(w => w.charAt(0))

    println(y.collect().mkString(", "))

    PARTITIONBY

  • 8/20/2019 visualapi spark

    74/122

    PARTITIONBYRDD: x

    J “John”“Anna”A

    “Fred”F

    J “James”

    PARTITIONBY

  • 8/20/2019 visualapi spark

    75/122

    PARTITIONBYRDD: x

    J “John”“Anna”A

    “Fred”F

    J “James”

    RDD:RDD:

    J “James”

    1

    PARTITIONBY

  • 8/20/2019 visualapi spark

    76/122

    PARTITIONBYRDD: x

    J “John”“Anna”A

    “Fred”F

    RDD:RDD:

    J “James”

    “FredF

    0

    J “James”

    PARTITIONBY

  • 8/20/2019 visualapi spark

    77/122

    PARTITIONBYRDD: x

    J “John”“Anna”A

    RDD:RDD:

    J “James”

    “Anna”A

    “FredF

    0

    “Fred”F

    J “James”

    PARTITIONBY

  • 8/20/2019 visualapi spark

    78/122

    PARTITIONBYRDD: x

    J“John”

    RDD:RDD:

    J“John”

    J “James”

    “Anna”A

    “FredF1

    “Anna”A

    “Fred”F

    J “James”

    PARTITIONBY

  • 8/20/2019 visualapi spark

    79/122

    PARTITIONBY

    x = sc.parallelize([('J','James'),('F','Fred'),

    ('A','Anna'),('J','John')], 3)

    y = x.partitionBy(2, lambda w: 0 if w[0] < 'H' else 1)

    print x.glom().collect()

    print y.glom().collect()

    [[('J', 'James'[('A', 'Anna')

    [[('A', 'Anna')

    [('J', 'James'

    x:

    y:

    partitionBy(numPartitions, partitione

    Return a new RDD with the specified number of partitems into the partition returned by a user supplied f

    PARTITIONBY

  • 8/20/2019 visualapi spark

    80/122

    PARTITIONBY

    Array(Array((A,Array((J,J

    Array(Array((F,

    Array((J,J

    x:

    y:

    partitionBy(numPartitions, partitione

    Return a new RDD with the specified number of partitems into the partition returned by a user supplied f

    import org.apache.spark.Partitioner

    val x = sc.parallelize(Array(('J',"James"),('F',"Fred"),

    ('A',"Anna"),('J',"John")), 3)

    val y = x.partitionBy(new Partitioner() {val numPartitions = 2

    def getPartition(k:Any) = {

    if (k.asInstanceOf[Char] < 'H') 0 else 1

    }

    })

    val yOut = y.glom().collect()

    ZIP

  • 8/20/2019 visualapi spark

    81/122

    ZIPRDD: x RD

    3

    2

    1

    A

    B

    9

    4

    1

    A

    B

    ZIP

  • 8/20/2019 visualapi spark

    82/122

    ZIPRDD: x RD

    3

    2

    1

    A

    B

    4

    A

    RDD: z

    9

    4

    1

    A

    B

    1 1

    ZIP

  • 8/20/2019 visualapi spark

    83/122

    ZIPRDD: x RD

    3

    2

    1

    A

    B

    4

    A

    RDD: z

    9

    4

    1

    A

    B

    2

    1

    4

    1

    ZIP

  • 8/20/2019 visualapi spark

    84/122

    ZIPRDD: x RD

    3

    2

    1

    A

    B

    4

    3

    A

    B

    RDD: z

    9

    4

    1

    A

    B

    2

    1

    9

    4

    1

    ZIP

  • 8/20/2019 visualapi spark

    85/122

    ZIP

    x = sc.parallelize([1, 2, 3])

    y = x.map(lambda n:n*n)

    z = x.zip(y)

    print(z.collect()) [1, 2, 3]

    [1, 4, 9]

    [(1, 1), (2, 4

    x:

    y:

    zip(otherRDD)

    Return a new RDD containing pairs whose key is the item in the

    value is that item’s corresponding element (same partition, same

    val x = sc.parallelize(Array(1,2,3))

    val y = x.map(n=>n*n)

    val z = x.zip(y)

    println(z.collect().mkString(", "))

    z:

  • 8/20/2019 visualapi spark

    86/122

    ACTIONSCore

    vs

  • 8/20/2019 visualapi spark

    87/122

    vs

    distributed driver

    occurs across the cluster result must fit in driver JVM

    GETNUMPARTITIONS

  • 8/20/2019 visualapi spark

    88/122

    partition(s) A

    B

    2

    GETNUMPARTITIONS B

  • 8/20/2019 visualapi spark

    89/122

    x:

    y:

    getNumPartitions()

    Return the number of partitions

    [[1], [2,

    2

    A

    x = sc.parallelize([1,2,3], 2)

    y = x.getNumPartitions()

    print(x.glom().collect())

    print(y)

    val x = sc.parallelize(Array(1,2,3), 2)

    val y = x.partitions.size

    val xOut = x.glom().collect()

    println(y)

    COLLECT

  • 8/20/2019 visualapi spark

    90/122

    partition(s) A

    B[ ]

    BCOLLECT [

  • 8/20/2019 visualapi spark

    91/122

    x:

    y:

    collect()

    Return all items in the RDD to the driver in a

    [[1], [2,

    [1, 2, 3]

    A

    x = sc.parallelize([1,2,3], 2)

    y = x.collect()

    print(x.glom().collect())

    print(y)

    val x = sc.parallelize(Array(1,2,3), 2)

    val y = x.collect()

    val xOut = x.glom().collect()

    println(y)

    [

    REDUCE

  • 8/20/2019 visualapi spark

    92/122

    3

    2

    1

    emits

    4

    3

    REDUCE

  • 8/20/2019 visualapi spark

    93/122

    4

    3

    2

    1

    emits

    3

    input

    6

    REDUCE

  • 8/20/2019 visualapi spark

    94/122

    4

    3

    2

    1

    10input

    6

    10

    REDUCE

    ******

    ***

    ***

  • 8/20/2019 visualapi spark

    95/122

    x:

    y:

    reduce( f )

    Aggregate all the elements of the RDD by applying apairwise to elements and partial results, and returns a

    [1, 2, 3,

    10

    x = sc.parallelize([1,2,3,4])

    y = x.reduce(lambda a,b: a+b)

    print(x.collect())

    print(y)

    val x = sc.parallelize(Array(1,2,3,4))

    val y = x.reduce((a,b) => a+b)

    println(x.collect.mkString(", "))

    println(y)

    AGGREGATE

  • 8/20/2019 visualapi spark

    96/122

    3

    2

    1

    4

    A

    B

    AGGREGATE

  • 8/20/2019 visualapi spark

    97/122

    3

    2

    1

    4

    AGGREGATE

  • 8/20/2019 visualapi spark

    98/122

    3

    2

    1

    4

    emits

    ([], 0)

    ([1], 1)

    AGGREGATE

  • 8/20/2019 visualapi spark

    99/122

    3

    2

    1

    4

    ([], 0)

    ([2], 2)([1], 1)

    AGGREGATE

  • 8/20/2019 visualapi spark

    100/122

    3

    2

    1

    4

    ([2], 2)([1], 1)([1,2], 3)

    AGGREGATE

  • 8/20/2019 visualapi spark

    101/122

    3

    2

    1

    4

    ([2], 2)([1], 1)([1,2], 3)

    ([3], 3)

    ([], 0)

    AGGREGATE ([], 0)

  • 8/20/2019 visualapi spark

    102/122

    3

    2

    1

    4

    ([2], 2)([1], 1)([1,2], 3)

    ([3], 3)

    ([4], 4)

    AGGREGATE

  • 8/20/2019 visualapi spark

    103/122

    3

    2

    1

    4

    ([2], 2)([1], 1)([1,2], 3)

    ([4], 4)

    ([3], 3)([3,4], 7)

    AGGREGATE

  • 8/20/2019 visualapi spark

    104/122

    3

    2

    1

    4

    ([2], 2)([1], 1)([1,2], 3)

    ([4], 4)

    ([3], 3)([3,4], 7)

    AGGREGATE

  • 8/20/2019 visualapi spark

    105/122

    3

    2

    1

    4

    ([1,2], 3)

    ([3,4], 7)

    AGGREGATE

  • 8/20/2019 visualapi spark

    106/122

    3

    2

    1

    4

    ([1,2], 3)

    ([3,4], 7)

    ([1,2,3,4], 10)

    ([1,2,3,4]

    AGGREGATE*******

    *****

    [( )

  • 8/20/2019 visualapi spark

    107/122

    aggregate(identity, seqOp, combOp

    Aggregate all the elements of the RDD by:- applying a user function to combine elements with us- then combining those user-defined results via a secon

    - and finally returning a result to the driver.

    seqOp = lambda data, item: (data[0] + [item], data[1] + item)

    combOp = lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1])

    x = sc.parallelize([1,2,3,4])y = x.aggregate(([], 0), seqOp, combOp)

    print(y)

    x:

    y:

    [1,

    ([1,

    AGGREGATE*******

    *****

    [( )

  • 8/20/2019 visualapi spark

    108/122

    def seqOp = (data:(Array[Int], Int), item:Int) =>

    (data._1 :+ item, data._2 + item)

    def combOp = (d1:(Array[Int], Int), d2:(Array[Int], Int)) =>(d1._1.union(d2._1), d1._2 + d2._2)

    val x = sc.parallelize(Array(1,2,3,4))

    val y = x.aggregate((Array[Int](), 0))(seqOp, combOp)

    println(y)

    x:

    y:

    aggregate(identity, seqOp, combOp

    Aggregate all the elements of the RDD by:- applying a user function to combine elements with us- then combining those user-defined results via a secon

    - and finally returning a result to the driver.

    [1,

    (Arr

    MAX

  • 8/20/2019 visualapi spark

    109/122

    42

    1

    4

    MAX 2

    1

    4 max

  • 8/20/2019 visualapi spark

    110/122

    x:

    y:

    max()

    Return the maximum item in the

    [2, 4, 1]

    4

    x = sc.parallelize([2,4,1])

    y = x.max()

    print(x.collect())

    print(y)

    val x = sc.parallelize(Array(2,4,1))

    val y = x.max

    println(x.collect().mkString(", "))

    println(y)

    SUM

  • 8/20/2019 visualapi spark

    111/122

    72

    1

    4

    SUM 2

    1

    4 Σ

  • 8/20/2019 visualapi spark

    112/122

    x:

    y:

    sum()

    Return the sum of the items in th

    [2, 4, 1]

    7

    x = sc.parallelize([2,4,1])

    y = x.sum()

    print(x.collect())

    print(y)

    val x = sc.parallelize(Array(2,4,1))

    val y = x.sum

    println(x.collect().mkString(", "))

    println(y)

    MEAN

  • 8/20/2019 visualapi spark

    113/122

    2.333333332

    1

    4

    MEAN 2

    1

    4  x

  • 8/20/2019 visualapi spark

    114/122

    x:

    y:

    mean()

    Return the mean of the items in

    [2, 4, 1]

    2.3333333

    x = sc.parallelize([2,4,1])

    y = x.mean()

    print(x.collect())

    print(y)

    val x = sc.parallelize(Array(2,4,1))

    val y = x.mean

    println(x.collect().mkString(", "))

    println(y)

    STDEV

  • 8/20/2019 visualapi spark

    115/122

    1.24721912

    1

    4

    STDEV 2

    1

    4 σ

  • 8/20/2019 visualapi spark

    116/122

    x:

    y:

    stdev()

    Return the standard deviation of the ite

    [2, 4, 1]

    1.2472191

    x = sc.parallelize([2,4,1])

    y = x.stdev()

    print(x.collect())

    print(y)

    val x = sc.parallelize(Array(2,4,1))

    val y = x.stdev

    println(x.collect().mkString(", "))

    println(y)

    COUNTBYKEY

  • 8/20/2019 visualapi spark

    117/122

    {'A': 1, 'J': 2, '

    J “John”

    “Anna”A

    “Fred”F

    J “James”

    COUNTBYKEY

  • 8/20/2019 visualapi spark

    118/122

    x:

    y:

    countByKey()

    Return a map of keys and counts of their oc

    [('J', 'J('A','A

    {'A': 1,

    x = sc.parallelize([('J', 'James'), ('F','Fred'),

    ('A','Anna'), ('J','John')])

    y = x.countByKey()

    print(y)

    val x = sc.parallelize(Array(('J',"James"),('F',"Fred"),

    ('A',"Anna"),('J',"John")))

    val y = x.countByKey()

    println(y)

    SAVEASTEXTFILE

  • 8/20/2019 visualapi spark

    119/122

    SAVEASTEXTFILE

  • 8/20/2019 visualapi spark

    120/122

    x:

    y:

    saveAsTextFile( path, compressionCoSave the RDD to the filesystem indica

    [2, 4, 1]

    [u'2', u'

    dbutils.fs.rm("/temp/demo", True)

    x = sc.parallelize([2,4,1])

    x.saveAsTextFile("/temp/demo")

    y = sc.textFile("/temp/demo")

    print(y.collect())

    dbutils.fs.rm("/temp/demo", true)

    val x = sc.parallelize(Array(2,4,1))

    x.saveAsTextFile("/temp/demo")

    val y = sc.textFile("/temp/demo")

    println(y.collect().mkString(", "))

    LAB

  • 8/20/2019 visualapi spark

    121/122

    Q&A

  • 8/20/2019 visualapi spark

    122/122