anatomy of spark catalyst
TRANSCRIPT
Anatomy of Spark Catalyst- Part 1
Deep dive into Spark Catalyst API
https://github.com/phatak-dev/anatomy-of-spark-catalyst
● Madhukara Phatak
● Technical Lead at Tellius
● Consultant and Trainer at datamantra.io
● Consult in Hadoop, Spark and Scala
● www.madhukaraphatak.com
Agenda● DataFrame Internals● Introduction to Catalyst ● Catalyst Trees● Expressions● Datatypes● Row and Internal Row● Custom expressions● Codegen in expressions● Janino
DataFrame internals● Each dataframe is internally represented as logical
plans in spark● These logical plan converts into physical plans to
execute on RDD abstraction● All plans are represented using tree structure● These trees and optimization of trees come from
catalyst● Ex : DFExample.scala
Code organization of Spark SQL● The code of spark sql is organized into below projects
○ Catalyst○ Core○ Hive○ Hive-Thrift server○ unsafe(external to spark sql)
● We are focusing on Catalyst code in this session● Core depends on catalyst ● Catalyst depends upon unsafe for tungsten code
Introduction to Catalyst● An implementation agnostic framework for manipulating
trees of relational operators and expressions● It defines the all the expressions, logical plans and
optimizations API’s for spark SQL● Catalyst API is independent of RDD evaluation. So
catalyst operators and expressions can be evaluated without RDD abstraction
● Introduced in Spark 1.3 version
Catalyst in Spark SQL
HiveQL
Hive parser
Hive queries
SparkQL
SparkSQL Parser
Spark SQL queries
Dataframe DSL
DataFrame
Catalyst
Spark RDD code
Tree API
Catalyst Trees● Everything in catalyst is a tree● Tree is a very powerful abstractions for defining
hierarchical operators and expressions● The important abstraction defined is TreeNode● Every TreeNode has its own type● Catalyst defines many helpers functions like forall,
transform which provides collection like API’s for TreeNode
● These trees can be used in user programs● Ex: org.apache.spark.sql.catalyst.TreeNode
Expression trees● We can define an arithmetic expression tree using
catalyst tree abstraction● Each tree node in the tree extends from a trait
ArithmeticExpression● We will have following different kind of nodes
○ LeafValue○ ADD○ Divide○ MUL
● com.madhukaraphatak.spark.catalyst.trees
Creating Expression Trees● In this example, we see how to use the earlier defined
tree● Let’s take below expression 10+20● It can be represented using the below tree
ADD(LeafValue(10), LeafValue(20))● Calling evaluate on the root of tree gives us the result● Ex : ExpressionExample
Traversing trees● Catalyst provides collection like methods for traversing
trees● Using these methods we can easily transverse our
expression trees● Collection like API’s allows us to use scala pattern
matching for work with defined node.● In our example, we will be using transform operation to
change the evaluation for the divide by 0 operation.● Ex : ExpressionTransformation
Optimising expressions● One of the nice property of collection like API’s are
ability to optimize the trees even before they are evaluated
● In our example, we will use transform and transformUp for optimizing tree in case of multiplication
● In this example, we optimize whenever there is multiplication zero
● Ex : ExpressionOptimization
Understanding transformUp
MUL
LeafValue
(19) MUL
LeafValue(19) LeafValue(0)
MUL
LeafValue
(19)
LeafValue
(0)
LeafValue
(0)transformUp transformUp
Catalyst Expressions
Catalyst Expression ● As we have discussed our simple expressions, catalyst
defines expressions for all different kinds of sql operations
● The base trait is Expression which extends TreeNode● Different kinds of expressions are
○ LeafExpression○ UnaryExpression○ BinaryExpression○ MathExpression
● org.apache.spark.sql.catalyst.Expression
Simple Expressions● In this example, we look at how to represent some of
simple spark sql expressions in catalyst API● We are going at below ones
○ Literals (Leaf expression)○ Unary Minus ( Unary expression)○ ADD expression ( Binary expression)○ Cos expression ( Math expression)
● We can evaluate all these expressions without sparkcontext or RDD.
● Ex : SimpleExpression
Datatypes of Catalyst● All expressions in catalyst take typed inputs and returns
typed output● All these types are represented using DataTypes● The supported data type are
○ Native (Long,Double,String etc)○ Complex (Struct Field, Array,Map)○ Container (Struct Type)
● All these internally mapped to Java datatypes● org.apache.spark.sql.types.DataType
Row● Row is abstraction to represent the data operated by
expressions● Row represents the sequence of values with order in
which schema is defined.● There are multiple implementation of Row available
which is optimized for specific use cases● Row plays a central role in moving from Dataframe
world to RDD world as DataFrame is represented as RDD[Row]
● Ex : RowExample
InternalRow and Unsafe Row● InternalRow is representation of dataframe row for
internal usage● The separation between user facing Row and Internal
row is done in 1.4 version,to support custom memory management
● In normal implementation, Row and InternalRow both behave similarly
● But in custom memory management, they differ a lot.● Ex : InternalRowExample
Complex expressions● Our earlier expression only worked on literal values. ● As we now know, what an expression works on, we can
understand more complex expressions● Some of the expression we are going look at
○ BoundReference○ Add with BoundReference○ Alias
● Ex : ComplexExpressions
Custom Expression example● Not only we can use the existing expressions, we can
add our own custom expression too.● As of now, these adding custom expressions is a private
API so you code has live in org.apache.spark.sql package
● In our example, we can see how to add our own custom add implementation.
● Ex: CustomAddExpression
Catalyst CodeGen
Expression Evaluation● Every expression has to be evaluated in runtime, to
produce appropriate value. We used eval till now.● Most of the time in spark sql, is spent on expression
evaluation● Expression evaluation is compute bound work● As the speed of individual core is not speeding up,
speeding up these evaluation will improve the overall time
● So from 1.4, spark started generating code to evaluate expression
CodeGen● Each expression comes with a method called genCode
() which refers to the java code to be generated to evaluate
● This generate code can use primitive types and mutable values rather than boxed values and immutable data structures
● These expressions are compiled in runtime using Janino compiler
● We can use CodeGenFallback trait for skipping codegen.● Ex : ExpressionCodeGenExample
Janino● Janino is super small super fast java compiler● It’s a runtime library for compiling java code.● Janino is used by many frameworks like Groovy,
LogBack etc● Spark uses janino to compile the expression code into
byte code and uses that bytecode for evaluating the expression
● In 2.0, this code generation is applied to whole stage rather than single expression at a time
● Ex : JaninoExample
Recap● Trees● Expressions● Data types● Row and Internal Row● Eval● CodeGen● Janino
References● Dataframe Anatomy https://www.youtube.com/watch?
v=iKOGBr-kOks● Tungsten Anatomy
https://www.youtube.com/watch?v=7nIMpD5TyNs● Janino
http://janino-compiler.github.io/janino/