anatomy of spark catalyst

Post on 08-Jan-2017

1.009 Views

Category:

Technology

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Anatomy of Spark Catalyst- Part 1

Deep dive into Spark Catalyst API

https://github.com/phatak-dev/anatomy-of-spark-catalyst

● Madhukara Phatak

● Technical Lead at Tellius

● Consultant and Trainer at datamantra.io

● Consult in Hadoop, Spark and Scala

● www.madhukaraphatak.com

Agenda● DataFrame Internals● Introduction to Catalyst ● Catalyst Trees● Expressions● Datatypes● Row and Internal Row● Custom expressions● Codegen in expressions● Janino

DataFrame internals● Each dataframe is internally represented as logical

plans in spark● These logical plan converts into physical plans to

execute on RDD abstraction● All plans are represented using tree structure● These trees and optimization of trees come from

catalyst● Ex : DFExample.scala

Code organization of Spark SQL● The code of spark sql is organized into below projects

○ Catalyst○ Core○ Hive○ Hive-Thrift server○ unsafe(external to spark sql)

● We are focusing on Catalyst code in this session● Core depends on catalyst ● Catalyst depends upon unsafe for tungsten code

Introduction to Catalyst● An implementation agnostic framework for manipulating

trees of relational operators and expressions● It defines the all the expressions, logical plans and

optimizations API’s for spark SQL● Catalyst API is independent of RDD evaluation. So

catalyst operators and expressions can be evaluated without RDD abstraction

● Introduced in Spark 1.3 version

Catalyst in Spark SQL

HiveQL

Hive parser

Hive queries

SparkQL

SparkSQL Parser

Spark SQL queries

Dataframe DSL

DataFrame

Catalyst

Spark RDD code

Tree API

Catalyst Trees● Everything in catalyst is a tree● Tree is a very powerful abstractions for defining

hierarchical operators and expressions● The important abstraction defined is TreeNode● Every TreeNode has its own type● Catalyst defines many helpers functions like forall,

transform which provides collection like API’s for TreeNode

● These trees can be used in user programs● Ex: org.apache.spark.sql.catalyst.TreeNode

Expression trees● We can define an arithmetic expression tree using

catalyst tree abstraction● Each tree node in the tree extends from a trait

ArithmeticExpression● We will have following different kind of nodes

○ LeafValue○ ADD○ Divide○ MUL

● com.madhukaraphatak.spark.catalyst.trees

Creating Expression Trees● In this example, we see how to use the earlier defined

tree● Let’s take below expression 10+20● It can be represented using the below tree

ADD(LeafValue(10), LeafValue(20))● Calling evaluate on the root of tree gives us the result● Ex : ExpressionExample

Traversing trees● Catalyst provides collection like methods for traversing

trees● Using these methods we can easily transverse our

expression trees● Collection like API’s allows us to use scala pattern

matching for work with defined node.● In our example, we will be using transform operation to

change the evaluation for the divide by 0 operation.● Ex : ExpressionTransformation

Optimising expressions● One of the nice property of collection like API’s are

ability to optimize the trees even before they are evaluated

● In our example, we will use transform and transformUp for optimizing tree in case of multiplication

● In this example, we optimize whenever there is multiplication zero

● Ex : ExpressionOptimization

Understanding transformUp

MUL

LeafValue

(19) MUL

LeafValue(19) LeafValue(0)

MUL

LeafValue

(19)

LeafValue

(0)

LeafValue

(0)transformUp transformUp

Catalyst Expressions

Catalyst Expression ● As we have discussed our simple expressions, catalyst

defines expressions for all different kinds of sql operations

● The base trait is Expression which extends TreeNode● Different kinds of expressions are

○ LeafExpression○ UnaryExpression○ BinaryExpression○ MathExpression

● org.apache.spark.sql.catalyst.Expression

Simple Expressions● In this example, we look at how to represent some of

simple spark sql expressions in catalyst API● We are going at below ones

○ Literals (Leaf expression)○ Unary Minus ( Unary expression)○ ADD expression ( Binary expression)○ Cos expression ( Math expression)

● We can evaluate all these expressions without sparkcontext or RDD.

● Ex : SimpleExpression

Datatypes of Catalyst● All expressions in catalyst take typed inputs and returns

typed output● All these types are represented using DataTypes● The supported data type are

○ Native (Long,Double,String etc)○ Complex (Struct Field, Array,Map)○ Container (Struct Type)

● All these internally mapped to Java datatypes● org.apache.spark.sql.types.DataType

Row● Row is abstraction to represent the data operated by

expressions● Row represents the sequence of values with order in

which schema is defined.● There are multiple implementation of Row available

which is optimized for specific use cases● Row plays a central role in moving from Dataframe

world to RDD world as DataFrame is represented as RDD[Row]

● Ex : RowExample

InternalRow and Unsafe Row● InternalRow is representation of dataframe row for

internal usage● The separation between user facing Row and Internal

row is done in 1.4 version,to support custom memory management

● In normal implementation, Row and InternalRow both behave similarly

● But in custom memory management, they differ a lot.● Ex : InternalRowExample

Complex expressions● Our earlier expression only worked on literal values. ● As we now know, what an expression works on, we can

understand more complex expressions● Some of the expression we are going look at

○ BoundReference○ Add with BoundReference○ Alias

● Ex : ComplexExpressions

Custom Expression example● Not only we can use the existing expressions, we can

add our own custom expression too.● As of now, these adding custom expressions is a private

API so you code has live in org.apache.spark.sql package

● In our example, we can see how to add our own custom add implementation.

● Ex: CustomAddExpression

Catalyst CodeGen

Expression Evaluation● Every expression has to be evaluated in runtime, to

produce appropriate value. We used eval till now.● Most of the time in spark sql, is spent on expression

evaluation● Expression evaluation is compute bound work● As the speed of individual core is not speeding up,

speeding up these evaluation will improve the overall time

● So from 1.4, spark started generating code to evaluate expression

CodeGen● Each expression comes with a method called genCode

() which refers to the java code to be generated to evaluate

● This generate code can use primitive types and mutable values rather than boxed values and immutable data structures

● These expressions are compiled in runtime using Janino compiler

● We can use CodeGenFallback trait for skipping codegen.● Ex : ExpressionCodeGenExample

Janino● Janino is super small super fast java compiler● It’s a runtime library for compiling java code.● Janino is used by many frameworks like Groovy,

LogBack etc● Spark uses janino to compile the expression code into

byte code and uses that bytecode for evaluating the expression

● In 2.0, this code generation is applied to whole stage rather than single expression at a time

● Ex : JaninoExample

Recap● Trees● Expressions● Data types● Row and Internal Row● Eval● CodeGen● Janino

top related