data source api in spark
TRANSCRIPT
Data Source API in Spark
Yin Huai 3/25/2015 - Bay Area Spark Meetup
About Me
Spark SQL developer @databricks One of the main developers of Data Source API Used to work on Hive a lot (Hive Committer)
2
Spark: A Unified Platform
3
Spark Core Engine
DataFrame
Spark Streaming Streaming
MLlib Machine Learning
Graphx Graph
Computation
Spark R R on Spark Spark SQL
Alpha/Pre-alpha
DataFrames in Spark
Distributed collection of data grouped into named columns (i.e. RDD with schema) Domain-specific functions designed for common tasks • Metadata • Sampling • Relational data processing: project, filter, aggregation, join, ... • UDFs Available in Python, Scala, Java, and R (via SparkR)
4
5
Every Spark application starts with loading data and ends with
saving data
Datasets Stored in Various Formats/Systems
6
Spark Core Engine
Alpha/Pre-alpha
{ JSON }
JDBC and more…
DataFrame
Spark Streaming Streaming
MLlib Machine Learning
Graphx Graph
Computation
Spark R R on Spark Spark SQL
Loading and Saving Data is Not Easy
Convert/parse raw data • e.g. parse text records, parse JSON records, deserialize
data stored in binary Data format transformation • e.g. convert your Java objects to Avro records/JSON
records/Parquet records/HBase rows/… Applications often end up with in-flexible input/output logic
7
Data Sources API
8
Data Source API
Spark Core Engine
Alpha/Pre-alpha
{ JSON }
JDBC and more…
DataFrame
Spark Streaming Streaming
MLlib Machine Learning
Graphx Graph
Computation
Spark R R on Spark Spark SQL
Data Source Libraries
Users can use libraries based on Data Source API to read/write DataFrames from/to a variety of formats/systems.
9
{ JSON }
Built-In Support External Libraries
JDBC
and more…
Goals of Data Source API
Developers: build libraries for various data sources • No need to get your code merged in Spark codebase • Share your library with others through Spark Packages Users: easy loading/saving DataFrames Efficient data access powered by Spark SQL query optimizer • Have interfaces allowing optimizations to be pushed down
to data source e.g. Avoid reading unnecessary data for a query
10
11
Data Source API: Easy loading/saving data
12
Demo 1: Loading/saving data in Spark (Generic load/save functions) (Please see page 26 for code)
Demo 1: Summary
sqlContext.load: loads an existing dataset as a DataFrame • Data source name: what source we are loading from • Options: parameters for a specific data source, e.g. path of data • Schema: if a data source accepts a user-specific schema, you can
apply one dataframe.save: saves the contents of the DataFrame to a source • Data source name: what source we are saving to • Save mode: what we should do when data already exists • Options: parameters for a specific data source, e.g. path of data
13
14
Share data with other Spark applications/users?
Table: DataFrame with persisted metadata + name
Metadata Persistence
Configure data source once: • Data source name • Options
You give the DataFrame representing this dataset a name and we persist metadata in the Hive Metastore Anyone can retrieve the dataset by its name • In SQL or with DataFrame API
15
Data Source Tables in Hive Metastore Metadata of data source tables are stored in its own representations in Hive Metastore • Not limited by metastore’s internal restrictions (e.g. data types) • Data source tables are not Hive tables (note: you can always read/write Hive tables with Spark SQL) Two table types: • Managed tables: Users do not specify the location of the data.
DROP TABLE will delete the data. • External tables: Tables’ with user-specified locations. DROP
TABLE will NOT delete the data.
16
createExternalTable and saveAsTable
sqlContext.createExternalTable • sqlContext.load + metadata persistence + name
dataframe.saveAsTable • dataframe.save + metadata persistence + name Use sqlContext.table(name) to retrieve the DataFrame Or, access the DataFrame by its name in SQL queries
17
18
Demo 2: createExternalTable
and saveAsTable (Please see page 26 for code)
19
Performance of data access? Efficient data access powered by
Spark SQL query optimizer1
1The data source needs to support optimizations by implementing corresponding interfaces
20
events = sqlCtx.load("/data/events", "parquet") training_data = events .where("city = 'New York' and year = 2015") .select("timestamp").collect()
events (many columns)
2011
2012
2013
2014
2015
All columns of 5 years’ data (Expensive!!!)
events (city, year, timestamp)
2011
2012
2013
2014
2015
Needed columns (Better)
events (city, year, timestamp)
2011
2012
2013
2014
2015
Needed columns and records (Much better)
Column pruning Partitioning pruning1
1Supported for Parquet and Hive, more support coming in Spark 1.4
21
Build A Data Source Library
Build A Data Source Library
Implementing three interfaces for reading data from a data source • BaseRelation: The abstraction of a DataFrame loaded from
a data source. It provides schema of the data. • RelationProvider: Handle users’ options and create a
BaseRelation • TableScan (BaseRelation for read): Read the data from the
data source and construct rows For write path and supporting optimizations on data access, take a look at our Scala Doc/Java Doc
22
23
Demo 3: Build A Data Source Library
(Please see page 26 for code)
Starting From Here
More about Data Source API: Data Source Section in Spark SQL programming guide More about how to build a Data Source Library: Take a look at Spark Avro Want to share your data source library: Submit to Spark Packages
24
Thank you!
26
The following slides are code of demos.
Notes about Demo Code
The code is based on Spark 1.3.0. Demos were done in Databricks Cloud To try the demo code with your Spark 1.3.0 deployment, just replace display(…) with .show() for showing results e.g. Replace with
27
display(peopleJson.select("name))
peopleJson.select("name).show()
28
Demo 1: Loading/saving data in Spark (Generic load/save functions)
Load a JSON dataset as a DataFrame.
Command took 0.11s -- by yin at 3/25/2015, 7:13:41 PM on yin-meetup-demo
>
json: org.apache.spark.rdd.RDD[String] = /home/yin/meetup/people.json M
apPartitionsRDD[206] at textFile at <console>:29
Command took 0.77s -- by yin at 3/25/2015, 7:13:52 PM on yin-meetup-demo
>
{"name":"Cheng"}
{"name":"Michael"}
{"location":{"state":"California"},"name":"Reynold"}
{"location":{"city":"San Francisco","state":"California"},"name":"Yin"}
Command took 0.60s -- by yin at 3/25/2015, 7:14:41 PM on yin-meetup-demo
> val peopleJson = sqlContext.load("/home/yin/meetup/people.json", "json")
peopleJson.printSchema()
root
|-- location: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
|-- name: string (nullable = true)
peopleJson: org.apache.spark.sql.DataFrame = [location: struct<city:str
ing,state:string>, name: string]
Command took 0.70s -- by yin at 3/25/2015, 7:15:10 PM on yin-meetup-demo
> display(peopleJson.select("name", "location.state"))
name stateCheng null
Michael null
Reynold California
Yin California
val json = sc.textFile("/home/yin/meetup/people.json")
json.collect().foreach(println)
Demo1_Scala
29
Command took 0.49s -- by yin at 3/25/2015, 7:15:28 PM on yin-meetup-demo
> display( peopleJson
.filter("location.city = 'San Francisco' and
location.state = 'California'")
.select("name"))
nameYin
>
Save peopleJson to Parquet.
Command took 3.27s -- by yin at 3/25/2015, 7:15:49 PM on yin-meetup-demo
>
>
Save peopleJson to Avro.
Command took 0.52s -- by yin at 3/25/2015, 7:15:57 PM on yin-meetup-demo
> peopleJson.save("/home/yin/meetup/people.avro", "com.databricks.spark.avro")
>
Save peopleJson to CSV
Command took 0.89s -- by yin at 3/25/2015, 7:16:24 PM on yin-meetup-demo
>
>
peopleJson.save("/home/yin/meetup/people.parquet", "parquet")
peopleJson
.select("name", "location.city", "location.state")
.save("/home/yin/meetup/people.csv",
"com.databricks.spark.csv")
30
Save people.avro to Parquet.
Command took 1.21s -- by yin at 3/25/2015, 7:16:43 PM on yin-meetup-demo
> val peopleAvro = sqlContext.load("/home/yin/meetup/people.avro",
"com.databricks.spark.avro")
display(peopleAvro)
location namenull Cheng
null Michael
{"city":null,"state":"California"} Reynold
{"city":"San Francisco","state":"California"} Yin
java.lang.RuntimeException: path /home/yin/meetup/people.parquet alread
y exists.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.parquet.DefaultSource.createRelation(ne
wParquet.scala:110)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.s
cala:308)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1083)
Command took 0.19s -- by yin at 3/25/2015, 7:17:01 PM on yin-meetup-demo
> peopleAvro.save("/home/yin/meetup/people.parquet", "parquet")
>
Save mode needs to be control the
behavior of save when data already
exists.
Command took 0.09s -- by yin at 3/25/2015, 7:17:33 PM on yin-meetup-demo
> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
>
The default save mode is ErrorIfExists.
java.lang.RuntimeException: path /home/yin/meetup/people.parquet alread
y exists.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.parquet.DefaultSource.createRelation(ne
wParquet.scala:110)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.s
cala:308)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1094)
Command took 0.20s -- by yin at 3/25/2015, 7:17:40 PM on yin-meetup-demo
> peopleAvro.save("/home/yin/meetup/people.parquet", "parquet", SaveMode.ErrorIfExists)
>
Let's overwrite the existing people.parquet (use
SaveMode.Overwrite).
Command took 2.82s -- by yin at 3/25/2015, 7:17:50 PM on yin-meetup-demo
>
>
SaveMode.Append is for appending data (from a single
user).
> peopleAvro.save("/home/yin/meetup/people.parquet", "parquet", SaveMode.Append)
val peopleParquet = sqlContext.load("/home/yin/meetup/people.parquet",
"parquet")
display(peopleParquet)
location namenull Cheng
null Michael
peopleAvro.save("/home/yin/meetup/people.parquet",
"parquet", SaveMode.Overwrite)
31
Command took 3.54s -- by yin at 3/25/2015, 7:18:09 PM on yin-meetup-demo
{"city":null,"state":"California"} Reynold
{"city":"San Francisco","state":"California"} Yin
null Cheng
null Michael
{"city":null,"state":"California"} Reynold
{"city":"San Francisco","state":"California"} Yin
>
For load, we can infer the schema from
JSON, Parquet, and Avro.
>
You can also apply a schema to the
data.
Command took 0.09s -- by yin at 3/25/2015, 7:18:55 PM on yin-meetup-demo
> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
Command took 0.30s -- by yin at 3/25/2015, 7:19:36 PM on yin-meetup-demo
> val schema = StructType(StructField("name", StringType) :: StructField("city", StringType) :: Nil)
val options = Map("path" -> "/home/yin/meetup/people.csv")val peopleJsonWithSchema = sqlContext.load("com.databricks.spark.csv", schema,
options)
schema: org.apache.spark.sql.types.StructType = StructType(StructFiel
d(name,StringType,true), StructField(city,StringType,true))
options: scala.collection.immutable.Map[String,String] = Map(path -> /h
ome/yin/meetup/people.csv)
peopleJsonWithSchema: org.apache.spark.sql.DataFrame = [name: string, c
ity: string]
> peopleJsonWithSchema.printSchema()
Command took 0.11s -- by yin at 3/25/2015, 7:19:39 PM on yin-meetup-demo
root
|-- name: string (nullable = true)
|-- city: string (nullable = true)
Command took 0.78s -- by yin at 3/25/2015, 7:19:46 PM on yin-meetup-demo
> display(peopleJsonWithSchema)
name cityCheng null
Michael null
Reynold null
Yin San Francisco
32
Demo 2: createExternalTable
and saveAsTable
33
Create a table with existing datasetwith sqlContext.createExternalTable
Command took 0.93s -- by yin at 3/25/2015, 7:25:39 PM on yin-meetup-demo
>
Out[7]: DataFrame[location: struct<city:string,state:string>, name: str
ing]
Command took 0.50s -- by yin at 3/25/2015, 7:25:49 PM on yin-meetup-demo
>
location namenull Cheng
null Michael
{"city":null,"state":"California"} Reynold
{"city":"San Francisco","state":"California"} Yin
Command took 0.43s -- by yin at 3/25/2015, 7:25:58 PM on yin-meetup-demo
>
name cityCheng null
Michael null
Reynold null
Yin San Francisco
>
You can also provide a schema to createExternalTable (ifyour data source support user-specified schema)
sqlContext.createExternalTable(
tableName="people_json_table",
path="/home/yin/meetup/people.json",
source="json")
display(sqlContext.table("people_json_table"))
%sql SELECT name, location.city FROM people_json_table
Demo2_Python >
Save a DataFrame as a Table
Command took 4.83s -- by yin at 3/25/2015, 7:26:57 PM on yin-meetup-demo
> people_json =
sqlContext.load(path="/home/yin/meetup/people.json",
source="json")
people_json.saveAsTable(tableName="people_parquet_table",
source="parquet")
Command took 0.74s -- by yin at 3/25/2015, 7:27:10 PM on yin-meetup-demo
> display(sqlContext.table("people_parquet_table").select("n
ame"))
nameCheng
Michael
Reynold
Yin
>
Save mode can also be used withsaveAsTable
Command took 3.53s -- by yin at 3/25/2015, 7:27:42 PM on yin-meetup-demo
> people_json.saveAsTable(tableName="people_parquet_table",
source="parquet", mode="append")
> display(sqlContext.table("people_parquet_table").select("n
ame"))
nameCheng
Michael
Reynold
34
Command took 0.82s -- by yin at 3/25/2015, 7:27:48 PM on yin-meetup-demo
Yin
Cheng
Michael
Reynold
Yin
35
Demo 3: Build A Data Source Library
36
Usually, you want to import the
following ...
>
>
Write your own BaseRelation and
RelationProvider
IntegerRelation: A relation to generate integer
numbers for the range defined by [from, to].
>
>
import org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.sources._import org.apache.spark.sql.types._
case class IntegerRelation(from: Int, to: Int)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
// This relation has a single column "integer_num". override def schema = StructType(StructField("integer_num", IntegerType, nullable = false) :: Nil)
override def buildScan() = sqlContext.sparkContext.parallelize(from to to).map(Row(_))}
Demo3_Scala IntegerRelationProvider: Handle user's
parameter (from and to) and create an
IntegerRelation.
> class IntegerRelationProvider extends RelationProvider { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { IntegerRelation(parameters("from").toInt, parameters("to").toInt)(sqlContext) }}
>
Use sqlContext.load to get a DataFrame for
IntegerRelation. The range of integer numbers is
[1, 10].
Command took 0.18s -- by yin at 3/25/2015, 7:35:00 PM on yin-meetup-demo
>
options: scala.collection.immutable.Map[String,String] = Map(from -> 1, to -> 10)df: org.apache.spark.sql.DataFrame = [integer_num: int]
> display(df)
integer_num1
2
3
4
5
6
val options = Map("from"->"1", "to"->"10")val df = sqlContext.load("com.databricks.sources.number.IntegerRelationProvider", options)
37
Command took 0.19s -- by yin at 3/25/2015, 7:35:09 PM on yin-meetup-demo
7
8
Command took 0.21s -- by yin at 3/25/2015, 7:35:24 PM on yin-meetup-demo
> display(df.select($"integer_num" * 100))
(integer_num * 100)100
200
300
400
500
600
700
800
900
>
If the RelationProvider's class name is
DefaultSource, users only need to provide the
package name
(com.databricks.sources.number instead of
com.databricks.sources.number.IntegerRelationProvider)