anatomy of spark sql catalyst - part 2
Post on 08-Jan-2017
564 Views
Preview:
TRANSCRIPT
Anatomy of Spark Catalyst- Part 2
Journey from DataFrame to RDD
https://github.com/phatak-dev/anatomy-of-spark-catalyst
● Madhukara Phatak
● Technical Lead at Tellius
● Consultant and Trainer at datamantra.io
● Consult in Hadoop, Spark and Scala
● www.madhukaraphatak.com
Agenda● Recap of Part 1● Query Plan● Logical Plans● Analysis● Optimization● Spark Plan● Spark Strategies● Custom strategies
Code organization of Spark SQL● The code of spark sql is organized into below projects
○ Catalyst○ Core○ Hive○ Hive-Thrift server○ unsafe(external to spark sql)
● We are focusing on Catalyst code in this session● Core depends on catalyst ● Catalyst depends upon unsafe for tungsten code
Introduction to Catalyst● An implementation agnostic framework for manipulating
trees of relational operators and expressions● It defines the all the expressions, logical plans and
optimizations API’s for spark SQL● Catalyst API is independent of RDD evaluation. So
catalyst operators and expressions can be evaluated without RDD abstraction
● Introduced in Spark 1.3 version
Catalyst in Spark SQL
HiveQL
Hive parser
Hive queries
SparkSQL
SparkSQL Parser
Spark SQL queries
Dataframe DSL
DataFrame
Catalyst
Spark RDD code
Dataset DSL
Recap of Part 1● Trees● Expressions● Data types● Row and Internal Row● Eval● CodeGen● Janino
DataFrame Internals● Each dataframe is internally represented as logical
plans in spark● These logical plan converts into physical plans to
execute on RDD abstraction● Building blocks of plans like expressions, operators
come from the catalyst library● We need to understand internals of catalyst in order to
understand how a given query formed and executed.● Ex : DFExample.scala
Query Plan API● Root of all plans● Plan types - Logical Plan and Spark Plan● OutputSet signifies the attributes outputted by this plan● InputSet signifies the attributed inputted by this plan● Schema - Signifies the StructType associated with
output of logical plan● Provides special functions like transformExpressions,
transformExpressionsUp to manipulate the expressions in plan
Logical Plan
Logical Plan● One type of Query plan which focuses on building plans
of catalyst operators and expressions● Independent of RDD abstraction● Focuses on analysis of the plan for correctness ● Also responsible for resolving the attributes before they
are evaluated● Three default type of logical plans are
○ LeafNode,UnaryNode and BinaryNode● Ex:LogicalPlanExample
Tree manipulation of Logical Plan● As we did in earlier with expression trees, we can
manipulate the logical plans using tree API’s● All these manipulations are represented as a Rule● These rule take a plan and give you new plan● Rather than using transform and transformUp we will be
using transformExpression and transformExpressionUp for manipulating these trees
● Ex : FilterLogicalPlanManipulation
Understanding Plan Manipulation
Transform
LR
Filter
AttribituteRef(id) Literal(true)
Equals
Transform ExpressionUp
LR
Filter
Literal (true)
LR
Analysis of Logical Plan
Analysing● Analysis of logical plan is a step includes
○ Resolving relations (Spark SQL)○ Resolve attributes○ Resolve functions○ Analyze for correctness of structuring
● Analysis makes sures all information is extracted before a logical plan can be executed
● Analyzer is an interface to implement the analysis● AnalysisExample
Analysis in SQL● Whenever we use SQL API for manipulating
dataframes we work with UnResolvedRelation● UnResolvedRelation is a logical relation which needs to
be resolved from catalog● Catalog is a dictionary of all registered tables● Part of the analysis, is to to resolve these unresolved
relations and provide appropriate relation types● UnResolvedRelationExample
Logical Plan Optimization
Optimizer● One of the important part of the Spark Catalyst is to
implement the optimizations on logical plans● All these optimizations are represented using Rule
which transforms the logical plans● All code for Optimization resides in Optimizer.scala file● In our example, we see how filter push for a logical plan● For more information on optimization, refer to anatomy
of dataframe talk[1] from references● PushPredicateExample
Providing custom optimizations● Till Spark 2.0, we needed to change the spark source
code for the changing optimizations● As Dataset becomes core abstraction in 2.0, ability to
tweak catalyst optimization becomes important● So from spark 2.0, spark has exposed the ability to add
user defined rules in run time which makes spark optimizer more configurable
● More information about defining and adding custom rules refer to spark 2.0 talk[2] from references
Spark Plan
SparkPlan● Physical plan of the Spark SQL which lives in the core
package● Defines two abstract methods
○ doPrepare○ doExecute
● Specifies helper collectMethods like○ executeCollect, executeTake
● Three nodes LeafNode, UnaryNode, BinaryNode● org.apache.spark.sql.execution.SparkPlan
Logical Plan to Spark Plan● Let’s look at converting our logical plans to spark plans● On sqlContext, there is a SparkPlanner which will help
us to do the conversion● A single logical plan can result in multiple physical plans● On every physical plan , there is execute method which
consumes RDD[InternalRow] and produces RDD[InternalRow]
● LogicalToPhysicalExample
QueryPlanner ● Interface for converting logical planning to Physical
plans● List of strategies applied for conversion● Each strategy has a method plan which is chained like
we did in rules in logical plan side● QueryPlanner also extends from TreeNode which
supports all tree transversal● PlanLater is a strategy which gives lazy effect● org.apache.spark.sql.catalyst.planning.QueryPlanne
r
Spark Strategies● These are set of strategies which implement query
planner to turn logical plans to Spark Plans● The different strategies are
○ BasicOperators○ Aggregation○ DefaultJoin
● We execute these strategies in sequence to generate final result
● org.apache.spark.sql.core.phyzicalplan.SparkStrategyExample
SparkPlanner● User facing API for converting the logical plans to spark
plans● It lists all the strategies to execute on a given logical
plan● Calling plan method generate all physical plans using
the above strategies● These physical plans can be executed using execute
method on a physical plan● org.apache.spark.sql.execution.SparkPlanner
Understanding Filter Strategy● All code for Filter Strategy lives in
basicOperators.scala● It uses mapPartitionInternal for filtering data over
RDD[InternalRow]● A comparison expression is converted to predicate
using newPredicate method which uses code generation
● Once we have predicate, we can use scala filter to filter the data from the RDD
● Filtered RDD[InternalRow] is returned from the strategy
Custom strategy● As we can write custom rules for logical optimization,
we can add custom strategies also● Many connectors like MemSQL, Mongodb add custom
strategies to optimize read, filter etc● Developer can add custom strategies using
sqlContext.experimental.strategies object● You can look at simple custom strategy from memsql in
below link● http://bit.ly/2bwnUxF
References● Dataframe Anatomy
https://www.youtube.com/watch?v=iKOGBr-kOks● Spark 2.0 talk https://www.youtube.com/watch?v=GhZ-XPGyXiM● Memsql Custom Strategyhttps://github.com/memsql/memsql-spark-connector/tree/master/connectorLib/src/main/scala/org/apache/spark/sql/memsql
top related