data source api in spark

37
Data Source API in Spark Yin Huai 3/25/2015 - Bay Area Spark Meetup

Upload: databricks

Post on 16-Jul-2015

2.778 views

Category:

Software


3 download

TRANSCRIPT

Page 1: Data Source API in Spark

Data Source API in Spark

Yin Huai 3/25/2015 - Bay Area Spark Meetup

Page 2: Data Source API in Spark

About Me

Spark SQL developer @databricks One of the main developers of Data Source API Used to work on Hive a lot (Hive Committer)

2

Page 3: Data Source API in Spark

Spark: A Unified Platform

3

Spark Core Engine

DataFrame

Spark Streaming Streaming

MLlib Machine Learning

Graphx Graph

Computation

Spark R R on Spark Spark SQL

Alpha/Pre-alpha

Page 4: Data Source API in Spark

DataFrames in Spark

Distributed collection of data grouped into named columns (i.e. RDD with schema) Domain-specific functions designed for common tasks •  Metadata •  Sampling •  Relational data processing: project, filter, aggregation, join, ... •  UDFs Available in Python, Scala, Java, and R (via SparkR)

4

Page 5: Data Source API in Spark

5

Every Spark application starts with loading data and ends with

saving data  

Page 6: Data Source API in Spark

Datasets Stored in Various Formats/Systems

6

Spark Core Engine

Alpha/Pre-alpha

{ JSON }

JDBC and more…

DataFrame

Spark Streaming Streaming

MLlib Machine Learning

Graphx Graph

Computation

Spark R R on Spark Spark SQL

Page 7: Data Source API in Spark

Loading and Saving Data is Not Easy

Convert/parse raw data •  e.g. parse text records, parse JSON records, deserialize

data stored in binary Data format transformation •  e.g. convert your Java objects to Avro records/JSON

records/Parquet records/HBase rows/… Applications often end up with in-flexible input/output logic

7

Page 8: Data Source API in Spark

Data Sources API

8

Data Source API

Spark Core Engine

Alpha/Pre-alpha

{ JSON }

JDBC and more…

DataFrame

Spark Streaming Streaming

MLlib Machine Learning

Graphx Graph

Computation

Spark R R on Spark Spark SQL

Page 9: Data Source API in Spark

Data Source Libraries

Users can use libraries based on Data Source API to read/write DataFrames from/to a variety of formats/systems.

9

{ JSON }

Built-In Support External Libraries

JDBC

and more…

Page 10: Data Source API in Spark

Goals of Data Source API

Developers: build libraries for various data sources •  No need to get your code merged in Spark codebase •  Share your library with others through Spark Packages Users: easy loading/saving DataFrames Efficient data access powered by Spark SQL query optimizer •  Have interfaces allowing optimizations to be pushed down

to data source e.g. Avoid reading unnecessary data for a query

10

Page 11: Data Source API in Spark

11

Data Source API: Easy loading/saving data

 

Page 12: Data Source API in Spark

12

Demo 1: Loading/saving data in Spark (Generic load/save functions) (Please see page 26 for code)

 

Page 13: Data Source API in Spark

Demo 1: Summary

sqlContext.load: loads an existing dataset as a DataFrame •  Data source name: what source we are loading from •  Options: parameters for a specific data source, e.g. path of data •  Schema: if a data source accepts a user-specific schema, you can

apply one dataframe.save: saves the contents of the DataFrame to a source •  Data source name: what source we are saving to •  Save mode: what we should do when data already exists •  Options: parameters for a specific data source, e.g. path of data

13

Page 14: Data Source API in Spark

14

Share data with other Spark applications/users?

Table: DataFrame with persisted metadata + name

Page 15: Data Source API in Spark

Metadata Persistence

Configure data source once: •  Data source name •  Options

You give the DataFrame representing this dataset a name and we persist metadata in the Hive Metastore Anyone can retrieve the dataset by its name •  In SQL or with DataFrame API

15

Page 16: Data Source API in Spark

Data Source Tables in Hive Metastore Metadata of data source tables are stored in its own representations in Hive Metastore •  Not limited by metastore’s internal restrictions (e.g. data types) •  Data source tables are not Hive tables (note: you can always read/write Hive tables with Spark SQL) Two table types: •  Managed tables: Users do not specify the location of the data.

DROP  TABLE  will delete the data. •  External tables: Tables’ with user-specified locations. DROP  

TABLE  will NOT delete the data.

16

Page 17: Data Source API in Spark

createExternalTable and saveAsTable

sqlContext.createExternalTable •  sqlContext.load + metadata persistence + name

dataframe.saveAsTable •  dataframe.save + metadata persistence + name Use sqlContext.table(name) to retrieve the DataFrame Or, access the DataFrame by its name in SQL queries

17

Page 18: Data Source API in Spark

18

Demo 2: createExternalTable

and saveAsTable (Please see page 26 for code)

 

Page 19: Data Source API in Spark

19

Performance of data access? Efficient data access powered by

Spark SQL query optimizer1

1The data source needs to support optimizations by implementing corresponding interfaces

Page 20: Data Source API in Spark

20

events  =  sqlCtx.load("/data/events",  "parquet")  training_data  =      events          .where("city  =  'New  York'  and  year  =  2015")          .select("timestamp").collect()    

events (many columns)

2011

2012

2013

2014

2015

All columns of 5 years’ data (Expensive!!!)

events (city, year, timestamp)

2011

2012

2013

2014

2015

Needed columns (Better)

events (city, year, timestamp)

2011

2012

2013

2014

2015

Needed columns and records (Much better)

Column pruning Partitioning pruning1

1Supported for Parquet and Hive, more support coming in Spark 1.4

Page 21: Data Source API in Spark

21

Build A Data Source Library

Page 22: Data Source API in Spark

Build A Data Source Library

Implementing three interfaces for reading data from a data source •  BaseRelation: The abstraction of a DataFrame loaded from

a data source. It provides schema of the data. •  RelationProvider: Handle users’ options and create a

BaseRelation •  TableScan (BaseRelation for read): Read the data from the

data source and construct rows For write path and supporting optimizations on data access, take a look at our Scala Doc/Java Doc

22

Page 23: Data Source API in Spark

23

Demo 3: Build A Data Source Library

(Please see page 26 for code)  

Page 24: Data Source API in Spark

Starting From Here

More about Data Source API: Data Source Section in Spark SQL programming guide More about how to build a Data Source Library: Take a look at Spark Avro Want to share your data source library: Submit to Spark Packages

24

Page 25: Data Source API in Spark

Thank you!

Page 26: Data Source API in Spark

26

The following slides are code of demos.  

Page 27: Data Source API in Spark

Notes about Demo Code

The code is based on Spark 1.3.0. Demos were done in Databricks Cloud To try the demo code with your Spark 1.3.0 deployment, just replace display(…) with .show()  for showing results  e.g. Replace with

27

display(peopleJson.select("name))  

peopleJson.select("name).show()  

Page 28: Data Source API in Spark

28

Demo 1: Loading/saving data in Spark (Generic load/save functions)

 

Page 29: Data Source API in Spark

Load a JSON dataset as a DataFrame.

Command took 0.11s -- by yin at 3/25/2015, 7:13:41 PM on yin-meetup-demo

json: org.apache.spark.rdd.RDD[String] = /home/yin/meetup/people.json M

apPartitionsRDD[206] at textFile at <console>:29

Command took 0.77s -- by yin at 3/25/2015, 7:13:52 PM on yin-meetup-demo

{"name":"Cheng"}

{"name":"Michael"}

{"location":{"state":"California"},"name":"Reynold"}

{"location":{"city":"San Francisco","state":"California"},"name":"Yin"}

Command took 0.60s -- by yin at 3/25/2015, 7:14:41 PM on yin-meetup-demo

> val peopleJson = sqlContext.load("/home/yin/meetup/people.json", "json")

peopleJson.printSchema()

root

|-- location: struct (nullable = true)

| |-- city: string (nullable = true)

| |-- state: string (nullable = true)

|-- name: string (nullable = true)

peopleJson: org.apache.spark.sql.DataFrame = [location: struct<city:str

ing,state:string>, name: string]

Command took 0.70s -- by yin at 3/25/2015, 7:15:10 PM on yin-meetup-demo

> display(peopleJson.select("name", "location.state"))

name stateCheng null

Michael null

Reynold California

Yin California

val json = sc.textFile("/home/yin/meetup/people.json")

json.collect().foreach(println)

Demo1_Scala

29

Command took 0.49s -- by yin at 3/25/2015, 7:15:28 PM on yin-meetup-demo

> display( peopleJson

.filter("location.city = 'San Francisco' and

location.state = 'California'")

.select("name"))

nameYin

Save peopleJson to Parquet.

Command took 3.27s -- by yin at 3/25/2015, 7:15:49 PM on yin-meetup-demo

Save peopleJson to Avro.

Command took 0.52s -- by yin at 3/25/2015, 7:15:57 PM on yin-meetup-demo

> peopleJson.save("/home/yin/meetup/people.avro", "com.databricks.spark.avro")

Save peopleJson to CSV

Command took 0.89s -- by yin at 3/25/2015, 7:16:24 PM on yin-meetup-demo

peopleJson.save("/home/yin/meetup/people.parquet", "parquet")

peopleJson

.select("name", "location.city", "location.state")

.save("/home/yin/meetup/people.csv",

"com.databricks.spark.csv")

Page 30: Data Source API in Spark

30

Save people.avro to Parquet.

Command took 1.21s -- by yin at 3/25/2015, 7:16:43 PM on yin-meetup-demo

> val peopleAvro = sqlContext.load("/home/yin/meetup/people.avro",

"com.databricks.spark.avro")

display(peopleAvro)

location namenull Cheng

null Michael

{"city":null,"state":"California"} Reynold

{"city":"San Francisco","state":"California"} Yin

java.lang.RuntimeException: path /home/yin/meetup/people.parquet alread

y exists.

at scala.sys.package$.error(package.scala:27)

at org.apache.spark.sql.parquet.DefaultSource.createRelation(ne

wParquet.scala:110)

at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.s

cala:308)

at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123)

at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1083)

Command took 0.19s -- by yin at 3/25/2015, 7:17:01 PM on yin-meetup-demo

> peopleAvro.save("/home/yin/meetup/people.parquet", "parquet")

Save mode needs to be control the

behavior of save when data already

exists.

Command took 0.09s -- by yin at 3/25/2015, 7:17:33 PM on yin-meetup-demo

> import org.apache.spark.sql.SaveMode

import org.apache.spark.sql.SaveMode

The default save mode is ErrorIfExists.

java.lang.RuntimeException: path /home/yin/meetup/people.parquet alread

y exists.

at scala.sys.package$.error(package.scala:27)

at org.apache.spark.sql.parquet.DefaultSource.createRelation(ne

wParquet.scala:110)

at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.s

cala:308)

at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123)

at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1094)

Command took 0.20s -- by yin at 3/25/2015, 7:17:40 PM on yin-meetup-demo

> peopleAvro.save("/home/yin/meetup/people.parquet", "parquet", SaveMode.ErrorIfExists)

Let's overwrite the existing people.parquet (use

SaveMode.Overwrite).

Command took 2.82s -- by yin at 3/25/2015, 7:17:50 PM on yin-meetup-demo

SaveMode.Append is for appending data (from a single

user).

> peopleAvro.save("/home/yin/meetup/people.parquet", "parquet", SaveMode.Append)

val peopleParquet = sqlContext.load("/home/yin/meetup/people.parquet",

"parquet")

display(peopleParquet)

location namenull Cheng

null Michael

peopleAvro.save("/home/yin/meetup/people.parquet",

"parquet", SaveMode.Overwrite)

Page 31: Data Source API in Spark

31

Command took 3.54s -- by yin at 3/25/2015, 7:18:09 PM on yin-meetup-demo

{"city":null,"state":"California"} Reynold

{"city":"San Francisco","state":"California"} Yin

null Cheng

null Michael

{"city":null,"state":"California"} Reynold

{"city":"San Francisco","state":"California"} Yin

For load, we can infer the schema from

JSON, Parquet, and Avro.

You can also apply a schema to the

data.

Command took 0.09s -- by yin at 3/25/2015, 7:18:55 PM on yin-meetup-demo

> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._

Command took 0.30s -- by yin at 3/25/2015, 7:19:36 PM on yin-meetup-demo

> val schema = StructType(StructField("name", StringType) :: StructField("city", StringType) :: Nil)

val options = Map("path" -> "/home/yin/meetup/people.csv")val peopleJsonWithSchema = sqlContext.load("com.databricks.spark.csv", schema,

options)

schema: org.apache.spark.sql.types.StructType = StructType(StructFiel

d(name,StringType,true), StructField(city,StringType,true))

options: scala.collection.immutable.Map[String,String] = Map(path -> /h

ome/yin/meetup/people.csv)

peopleJsonWithSchema: org.apache.spark.sql.DataFrame = [name: string, c

ity: string]

> peopleJsonWithSchema.printSchema()

Command took 0.11s -- by yin at 3/25/2015, 7:19:39 PM on yin-meetup-demo

root

|-- name: string (nullable = true)

|-- city: string (nullable = true)

Command took 0.78s -- by yin at 3/25/2015, 7:19:46 PM on yin-meetup-demo

> display(peopleJsonWithSchema)

name cityCheng null

Michael null

Reynold null

Yin San Francisco

Page 32: Data Source API in Spark

32

Demo 2: createExternalTable

and saveAsTable  

Page 33: Data Source API in Spark

33

Create a table with existing datasetwith sqlContext.createExternalTable

Command took 0.93s -- by yin at 3/25/2015, 7:25:39 PM on yin-meetup-demo

Out[7]: DataFrame[location: struct<city:string,state:string>, name: str

ing]

Command took 0.50s -- by yin at 3/25/2015, 7:25:49 PM on yin-meetup-demo

location namenull Cheng

null Michael

{"city":null,"state":"California"} Reynold

{"city":"San Francisco","state":"California"} Yin

Command took 0.43s -- by yin at 3/25/2015, 7:25:58 PM on yin-meetup-demo

name cityCheng null

Michael null

Reynold null

Yin San Francisco

You can also provide a schema to createExternalTable (ifyour data source support user-specified schema)

sqlContext.createExternalTable(

tableName="people_json_table",

path="/home/yin/meetup/people.json",

source="json")

display(sqlContext.table("people_json_table"))

%sql SELECT name, location.city FROM people_json_table

Demo2_Python > 

Save a DataFrame as a Table

Command took 4.83s -- by yin at 3/25/2015, 7:26:57 PM on yin-meetup-demo

> people_json =

sqlContext.load(path="/home/yin/meetup/people.json",

source="json")

people_json.saveAsTable(tableName="people_parquet_table",

source="parquet")

Command took 0.74s -- by yin at 3/25/2015, 7:27:10 PM on yin-meetup-demo

> display(sqlContext.table("people_parquet_table").select("n

ame"))

nameCheng

Michael

Reynold

Yin

Save mode can also be used withsaveAsTable

Command took 3.53s -- by yin at 3/25/2015, 7:27:42 PM on yin-meetup-demo

> people_json.saveAsTable(tableName="people_parquet_table",

source="parquet", mode="append")

> display(sqlContext.table("people_parquet_table").select("n

ame"))

nameCheng

Michael

Reynold

Page 34: Data Source API in Spark

34

Command took 0.82s -- by yin at 3/25/2015, 7:27:48 PM on yin-meetup-demo

Yin

Cheng

Michael

Reynold

Yin

Page 35: Data Source API in Spark

35

Demo 3: Build A Data Source Library

 

Page 36: Data Source API in Spark

36

Usually, you want to import the

following ...

Write your own BaseRelation and

RelationProvider

IntegerRelation: A relation to generate integer

numbers for the range defined by [from, to].

import org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.sources._import org.apache.spark.sql.types._

case class IntegerRelation(from: Int, to: Int)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {

// This relation has a single column "integer_num". override def schema =   StructType(StructField("integer_num", IntegerType, nullable = false) :: Nil)

override def buildScan() = sqlContext.sparkContext.parallelize(from to to).map(Row(_))}

Demo3_Scala IntegerRelationProvider: Handle user's

parameter (from and to) and create an

IntegerRelation.

> class IntegerRelationProvider extends RelationProvider { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { IntegerRelation(parameters("from").toInt, parameters("to").toInt)(sqlContext) }}

Use sqlContext.load to get a DataFrame for

IntegerRelation. The range of integer numbers is

[1, 10].

Command took 0.18s -- by yin at 3/25/2015, 7:35:00 PM on yin-meetup-demo

options: scala.collection.immutable.Map[String,String] = Map(from -> 1, to -> 10)df: org.apache.spark.sql.DataFrame = [integer_num: int]

> display(df)

integer_num1

2

3

4

5

6

val options = Map("from"->"1", "to"->"10")val df = sqlContext.load("com.databricks.sources.number.IntegerRelationProvider", options)

Page 37: Data Source API in Spark

37

Command took 0.19s -- by yin at 3/25/2015, 7:35:09 PM on yin-meetup-demo

7

8

Command took 0.21s -- by yin at 3/25/2015, 7:35:24 PM on yin-meetup-demo

> display(df.select($"integer_num" * 100))

(integer_num * 100)100

200

300

400

500

600

700

800

900

If the RelationProvider's class name is

DefaultSource, users only need to provide the

package name

(com.databricks.sources.number instead of

com.databricks.sources.number.IntegerRelationProvider)