structuring spark: dataframes, datasets, and streaming by michael armbrust
TRANSCRIPT
![Page 1: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/1.jpg)
Structuring SparkSQL, DataFrames, Datasets, and Streaming
Michael Armbrust - @michaelarmbrustSpark Summit East 2016
![Page 2: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/2.jpg)
Background: What is in an RDD?
•Dependencies•Partitions (with optional locality info)•Compute function: Partition => Iterator[T]
2
![Page 3: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/3.jpg)
Background: What is in an RDD?
•Dependencies•Partitions (with optional locality info)•Compute function: Partition => Iterator[T]
3
Opaque Computation
![Page 4: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/4.jpg)
Background: What is in an RDD?
•Dependencies•Partitions (with optional locality info)•Compute function: Partition => Iterator[T]
4
Opaque Data
![Page 5: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/5.jpg)
Struc·ture[ˈstrək(t)SHər]
verb1. construct or arrange according to a
plan; give a pattern or organization to.
5
![Page 6: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/6.jpg)
Why structure?
•By definition, structure will limitwhat can be expressed.• In practice, we can accommodate the vast
majority of computations.
6
Limiting the space of what can be expressed enables optimizations.
![Page 7: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/7.jpg)
Structured APIs In Spark
7
SQL DataFrames Datasets
Syntax Errors
AnalysisErrors
Runtime CompileTime
RuntimeCompile
Time
CompileTime
Runtime
Analysis errors reported before a distributed job starts
![Page 8: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/8.jpg)
Type-safe: operate on domain objects with compiled lambda functions
8
Datasets APIval df = ctx.read.json("people.json")
// Convert data to domain objects.case class Person(name: String, age: Int)val ds: Dataset[Person] = df.as[Person]ds.filter(_.age > 30)
// Compute histogram of age by name.val hist = ds.groupBy(_.name).mapGroups {case (name, people: Iter[Person]) =>
val buckets = new Array[Int](10) people.map(_.age).foreach { a =>buckets(a / 10) += 1
} (name, buckets)
}
![Page 9: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/9.jpg)
DataFrame = Dataset[Row]
•Spark 2.0 will unify these APIs•Stringly-typed methods will downcast to
generic Rowobjects•Ask Spark SQL to enforce types on
generic rows using df.as[MyClass]
9
![Page 10: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/10.jpg)
What about ?
Some of the goals of the Dataset API have always been available!
10
df.map(lambda x: x.name)
df.map(x => x(0).asInstanceOf[String])
![Page 11: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/11.jpg)
Shared Optimization & Execution
11
SQL AST
DataFrame Unresolved Logical Plan Logical Plan Optimized
Logical Plan RDDsSelected Physical Plan
Analysis LogicalOptimization
PhysicalPlanning
Cost
Mod
el
Physical Plans
CodeGeneration
Catalog
DataFrames, Datasets and SQL share the same optimization/execution pipeline
Dataset
![Page 12: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/12.jpg)
Structuring Computation
12
![Page 13: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/13.jpg)
Columns
col("x") === 1df("x") === 1expr("x = 1")sql("SELECT … WHERE x = 1")
13
New value, computed based on input values.
DSL
SQL Parser
![Page 14: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/14.jpg)
• 100+ native functions with optimized codegenimplementations– String manipulation – concat, format_string, lower, lpad
– Data/Time – current_timestamp, date_format, date_add, …
– Math – sqrt, randn, …– Other –monotonicallyIncreasingId, sparkPartitionId, …
14
Complex Columns With Functions
from pyspark.sql.functions import *yesterday = date_sub(current_date(), 1)df2 = df.filter(df.created_at > yesterday)
import org.apache.spark.sql.functions._val yesterday = date_sub(current_date(), 1)val df2 = df.filter(df("created_at") > yesterday)
![Page 15: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/15.jpg)
Functions
15
(x: Int) => x == 1
Columns
col("x") === 1You Type
Spark Sees class $anonfun$1{def apply(Int): Boolean
}
EqualTo(x, Lit(1))
![Page 16: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/16.jpg)
Columns: Predicate pushdownsqlContext.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "people")
.load()
.where($"name" === "michael")
16
You Write
Spark TranslatesFor Postgres SELECT * FROM people WHERE name = 'michael'
![Page 17: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/17.jpg)
Columns: Efficient Joins
df1.join(df2, col("x") == col("y"))
17
df1 df2
SortMergeJoin
myUDF = udf(lambda x, y: x == y)df1.join(df2, myUDF(col("x"), col("y")))
df1 df2
Cartisian
Filter
n2
n log n
Equal values sort to the same place
![Page 18: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/18.jpg)
Structuring Data
18
![Page 19: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/19.jpg)
Spark's Structured Data Model
•Primitives: Byte, Short, Integer, Long, Float, Double, Decimal, String, Binary, Boolean, Timestamp, Date•Array[Type]: variable length collection•Struct: fixed # of nested columns with fixed types•Map[Type, Type]: variable length association
19
![Page 20: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/20.jpg)
6 “bricks”
Tungsten’s Compact Encoding
20
0x0 123 32L 48L 4 “data”
(123, “data”, “bricks”)
Null bitmap
Offset to data
Offset to data Field lengths
![Page 21: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/21.jpg)
Encoders
21
6 “bricks”0x0 123 32L 48L 4 “data”
JVM Object
Internal Representation
MyClass(123, “data”, “bricks”)
Encoders translate between domain objects and Spark's internal format
![Page 22: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/22.jpg)
Bridge Objects with Data Sources
22
{"name": "Michael","zip": "94709""languages": ["scala"]
}
case class Person(name: String,languages: Seq[String],zip: Int)
Encoders map columns to fields by name
{ JSON } JDBC
![Page 23: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/23.jpg)
Space Efficiency
23
![Page 24: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/24.jpg)
Serialization performance
24
![Page 25: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/25.jpg)
Operate Directly On Serialized Data
25
df.where(df("year") > 2015)
GreaterThan(year#234, Literal(2015))
bool filter(Object baseObject) {int offset = baseOffset + bitSetWidthInBytes + 3*8L;int value = Platform.getInt(baseObject, offset);return value34 > 2015;
}
DataFrame Code / SQL
Catalyst Expressions
Low-level bytecodeJVM intrinsic JIT-ed to
pointer arithmetic
Platform.getInt(baseObject, offset);
![Page 26: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/26.jpg)
Structured Streaming
26
![Page 27: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/27.jpg)
The simplest way to perform streaming analyticsis not having to reason about streaming.
![Page 28: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/28.jpg)
Spark 2.0Continuous DataFrames
Spark 1.3Static DataFrames
Single API !
![Page 29: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/29.jpg)
Structured Streaming• High-level streaming API built on Spark SQL engine
• Runs the same queries on DataFrames• Event time, windowing, sessions, sources & sinks
• Unifies streaming, interactive and batch queries• Aggregate data in a stream, then serve using JDBC• Change queries at runtime• Build and apply ML models
![Page 30: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/30.jpg)
logs = ctx.read.format("json").open("s3://logs")
logs.groupBy(logs.user_id).agg(sum(logs.time))
.write.format("jdbc")
.save("jdbc:mysql//...")
Example: Batch Aggregation
![Page 31: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/31.jpg)
logs = ctx.read.format("json").stream("s3://logs")
logs.groupBy(logs.user_id).agg(sum(logs.time))
.write.format("jdbc")
.stream("jdbc:mysql//...")
Example: Continuous Aggregation
![Page 32: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/32.jpg)
Logically:DataFrame operations on static data(i.e. as easy to understand as batch)
Physically:Spark automatically runs the query in streaming fashion(i.e. incrementally and continuously)
DataFrame
Logical Plan
Continuous, incremental execution
Catalyst optimizer
Execution
![Page 33: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/33.jpg)
Incrementalized By Spark
Scan Files
Aggregate
Write to MySQL
Scan New Files
StatefulAggregate
Update MySQL
Batch ContinuousTransformation
requires information
about the structure
![Page 34: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/34.jpg)
What's Coming?• Spark 2.0• Unification of the APIs• Basic streaming API• Event-time aggregations
• Spark 2.1+• Other streaming sources / sinks• Machine learning
• Structure in other libraries: MLlib, GraphFrames
34
![Page 35: Structuring Spark: DataFrames, Datasets, and Streaming by Michael Armbrust](https://reader033.vdocuments.us/reader033/viewer/2022052915/58f9a903760da3da068b6a08/html5/thumbnails/35.jpg)
Questions?@michaelarmbrust