software engineer, workday complex plans asif shahid

53
Optimizing the Catalyst Optimizer for Complex Plans Jianneng Li Software Engineer, Workday Asif Shahid Software Engineer, Workday

Upload: others

Post on 21-Jun-2022

2 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Software Engineer, Workday Complex Plans Asif Shahid

Optimizing the Catalyst Optimizer for Complex Plans

Jianneng LiSoftware Engineer, Workday

Asif ShahidSoftware Engineer, Workday

Page 2: Software Engineer, Workday Complex Plans Asif Shahid

This presentation may contain forward-looking statements for which there are risks, uncertainties, and assumptions. If the risks materialize or assumptions prove incorrect, Workday’s business results and directions could differ materially from results implied by the forward-looking statements. Forward-looking statements include any statements regarding strategies or plans for future operations; any statements concerning new features, enhancements or upgrades to our existing applications or plans for future applications; and any statements of belief. Further information on risks that could affect Workday’s results is included in our filings with the Securities and Exchange Commission which are available on the Workday investor relations webpage: www.workday.com/company/investor_relations.php

Workday assumes no obligation for and does not intend to update any forward-looking statements. Any unreleased services, features, functionality or enhancements referenced in any Workday document, roadmap, blog, our website, press release or public statement that are not currently available are subject to change at Workday’s discretion and may not be delivered as planned or at all.

Customers who purchase Workday, Inc. services should make their purchase decisions upon services, features, and functions that are currently available.

Safe Harbor Statement

Page 3: Software Engineer, Workday Complex Plans Asif Shahid

Agenda

▪ Workday Prism Analytics▪ Complex Plans▪ Handling Complex Plans

▪ Common Subexpression Elimination (CSE)

▪ Large Case Expressions▪ Constraint Propagation

▪ Closing Thoughts

Page 4: Software Engineer, Workday Complex Plans Asif Shahid

Workday Prism Analytics

Page 5: Software Engineer, Workday Complex Plans Asif Shahid

Example Spark physical plan of our pipeline shown in Spark UI

• Customers use our self-service product to build data transformation pipelines, which are compiled to DataFrames and executed by Spark

• Finance and HR use cases• Use cases often involve complex

data pipelines

Spark in Workday Prism Analytics

For more details, see session from SAIS 2019 - Lessons Learned using Apache Spark for Self-Service Data Prep in SaaS World

Page 6: Software Engineer, Workday Complex Plans Asif Shahid

Complex Plans

Page 7: Software Engineer, Workday Complex Plans Asif Shahid

What are Complex Plans

• Thousands of operators• Many (self) joins, unions, and large expressions

• Takes Catalyst hours to compile and optimize• Difficult to understand or inspect visually

Page 8: Software Engineer, Workday Complex Plans Asif Shahid

Example: Data Validation

id name group_id

1 a x

2 b y

3 c y

4 d z

Page 9: Software Engineer, Workday Complex Plans Asif Shahid

Example: Data Validation

id name group_id

1 a x

2 b y

3 c y

4 d z

Part 1: filter rows based on criteria

SELECT *

FROM dataset

WHERE id > 1 AND name != "b"

Page 10: Software Engineer, Workday Complex Plans Asif Shahid

Example: Data Validation

id name group_id

1 a x

2 b y

3 c y

4 d z

Part 2: if a row is filtered, other rows in the same group_id are also filtered

SELECT *

FROM

(SELECT *

FROM dataset

WHERE id > 1 AND name != "b") l

LEFT ANTI JOIN

(SELECT group_id

FROM dataset

WHERE NOT (id > 1 AND name != "b")

GROUP BY group_id) r

ON l.group_id = r.group_id

Page 11: Software Engineer, Workday Complex Plans Asif Shahid

SELECT *

FROM dataset

WHERE NOT (id > 1 AND name != "b")

UNION ALL

SELECT l.id, l.name, l.group_id

FROM

(SELECT *

FROM dataset

WHERE id > 1 AND name != "b") l

INNER JOIN

(SELECT group_id

FROM dataset

WHERE NOT (id > 1 AND name != "b")

GROUP BY group_id) r

ON l.group_id = r.group_id

Example: Data Validation

id name group_id

1 a x

2 b y

3 c y

4 d z

Part 3: compute invalid rows too

SELECT *

FROM

(SELECT *

FROM dataset

WHERE id > 1 AND name != "b") l

LEFT ANTI JOIN

(SELECT group_id

FROM dataset

WHERE NOT (id > 1 AND name != "b")

GROUP BY group_id) r

ON l.group_id = r.group_id

Page 12: Software Engineer, Workday Complex Plans Asif Shahid

Example: Data Validation

id name group_id

1 a x

2 b y

3 c y

4 d z

SELECT *

FROM dataset

WHERE NOT (id > 1 AND name != "b")

UNION ALL

SELECT l.id, l.name, l.group_id

FROM

(SELECT *

FROM dataset

WHERE id > 1 AND name != "b") l

INNER JOIN

(SELECT group_id

FROM dataset

WHERE NOT (id > 1 AND name != "b")

GROUP BY group_id) r

ON l.group_id = r.group_id

Part 4: show unique error message for each filter criterion

SELECT *

FROM

(SELECT *

FROM dataset

WHERE id > 1 AND name != "b") l

LEFT ANTI JOIN

(SELECT group_id

FROM dataset

WHERE NOT (id > 1 AND name != "b")

GROUP BY group_id) r

ON l.group_id = r.group_id

MORE SELF UNIONS

Page 13: Software Engineer, Workday Complex Plans Asif Shahid

About Complex Plans

• Complexity increases gradually over time• Could ask customers to optimize, but much better if

performance is good without optimizations

Page 14: Software Engineer, Workday Complex Plans Asif Shahid

Handling Complex Plans

Page 15: Software Engineer, Workday Complex Plans Asif Shahid

Common Subexpression Elimination (CSE)

Page 16: Software Engineer, Workday Complex Plans Asif Shahid

Common Subexpression Elimination (CSE)

• Identify shared subplans, and cache them• E.g. self joins, self unions, reused scans

• Performed while creating DataFrames• Heuristic• Algorithmic

Page 17: Software Engineer, Workday Complex Plans Asif Shahid

Common Subexpression Elimination (CSE)

Union(

Parse(“Dataset A”),

Join(

Parse(“Dataset A”),

Parse(“Dataset B”)),

Join(

Parse(“Dataset A”),

Parse(“Dataset B”))

)

Page 18: Software Engineer, Workday Complex Plans Asif Shahid

Common Subexpression Elimination (CSE)

Union(

Cache(ID=1,

Parse(“Dataset A”)),

Join(

Cache(ID=1, ∅),

Parse(“Dataset B”)),

Join(

Cache(ID=1, ∅),

Parse(“Dataset B”))

)

Union(

Parse(“Dataset A”),

Join(

Parse(“Dataset A”),

Parse(“Dataset B”)),

Join(

Parse(“Dataset A”),

Parse(“Dataset B”))

)

Page 19: Software Engineer, Workday Complex Plans Asif Shahid

Common Subexpression Elimination (CSE)

Union(

Cache(ID=1,

Parse(“Dataset A”)),

Join(

Cache(ID=1, ∅),

Parse(“Dataset B”)),

Join(

Cache(ID=1, ∅),

Parse(“Dataset B”))

)

Union(

Cache(ID=1,

Parse(“Dataset A”)),

Cache(ID=2,

Join(

Cache(ID=1, ∅),

Parse(“Dataset B”))),

Cache(ID=2, ∅)

)

Union(

Parse(“Dataset A”),

Join(

Parse(“Dataset A”),

Parse(“Dataset B”)),

Join(

Parse(“Dataset A”),

Parse(“Dataset B”))

)

Page 20: Software Engineer, Workday Complex Plans Asif Shahid

CSE Benchmark

Without CSE With CSENumber of operators in

optimized plan 10K 150

Time to compile and optimize plan 10 minutes 30 seconds

4 Data Validations in one data pipeline

Spark 2.4, local mode, 4GB memory

Page 21: Software Engineer, Workday Complex Plans Asif Shahid

Logging Complex Plans (10s of MBs in Size)

• Stream plans to log without generating them upfront• Send only truncated plans to log aggregation service

Page 22: Software Engineer, Workday Complex Plans Asif Shahid

Large Case Expressions

Page 23: Software Engineer, Workday Complex Plans Asif Shahid

CASE

WHEN f(a, b) = 1 then 1

WHEN f(a, b) = 2 then 2

...

WHEN f(a, b) = 1000 then 1000

ELSE -1

END

Large Case Expression

1000s of branches

Page 24: Software Engineer, Workday Complex Plans Asif Shahid

Problems with Large Case Expressions

• Function f is evaluated once for each branch• Inlined into nested Projects (CollapseProject rule)• OOM during code generation (SPARK-29561)

Relationa, b

Project(CASEWHEN f(a, b) = 1 then 10WHEN f(a, b) = 2 then 20...END) as c

Projectc, c as c1

Relationa, b

Project(CASEWHEN k = 1 then 10WHEN k = 2 then 20...END) as c,(CASEWHEN k = 1 then 10WHEN k = 2 then 20...END) as c1,

Page 25: Software Engineer, Workday Complex Plans Asif Shahid

Handling Large Case Expressions in Catalyst

• Identify large expressions and not collapse them• Identify and extract f as an alias

• Only if f is used more than once• Disable whole stage codegen if too many branches

Relationa, b

Project(CASEWHEN f(a, b) = 1 then 10WHEN f(a, b) = 2 then 20...END) as c

Projectc, c as c1

Relationa, b

Project(CASEWHEN k = 1 then 10WHEN k = 2 then 20...END) as c

Projectf(a, b) as k

Projectc, c as c1

Page 26: Software Engineer, Workday Complex Plans Asif Shahid

Large Case Expression Benchmark

SELECT CASE

WHEN cf1 + cf2 = -1 then 1

WHEN cf1 + cf2 = -2 then 2

...

END as cf3

FROM (SELECT cf1, cf1 AS cf2

FROM (SELECT CASE

WHEN f(a) = 1 AND g(b) = 1 THEN 1

WHEN f(a) = 2 AND g(b) = 1 THEN 2

...

END as cf1

FROM dataset))

Page 27: Software Engineer, Workday Complex Plans Asif Shahid

Large Case Expression Benchmark

SELECT CASE

WHEN cf1 + cf2 = -1 then 1

WHEN cf1 + cf2 = -2 then 2

...

END as cf3

FROM (SELECT cf1, cf1 AS cf2

FROM (SELECT CASE

WHEN f(a) = 1 AND g(b) = 1 THEN 1

WHEN f(a) = 2 AND g(b) = 1 THEN 2

...

END as cf1

FROM dataset))

Page 28: Software Engineer, Workday Complex Plans Asif Shahid

SELECT CASE

WHEN cf1 + cf2 = -1 then 1

WHEN cf1 + cf2 = -2 then 2

...

END as cf3

FROM (SELECT cf1, cf1 AS cf2

FROM (SELECT CASE

WHEN f(a) = 1 AND g(b) = 1 THEN 1

WHEN f(a) = 2 AND g(b) = 1 THEN 2

...

END as cf1

FROM dataset))

Large Case Expression Benchmark

Page 29: Software Engineer, Workday Complex Plans Asif Shahid

SELECT CASE

WHEN cf1 + cf2 = -1 then 1

WHEN cf1 + cf2 = -2 then 2

...

END as cf3

FROM (SELECT cf1, cf1 AS cf2

FROM (SELECT CASE

WHEN f(a) = 1 AND g(b) = 1 THEN 1

WHEN f(a) = 2 AND g(b) = 1 THEN 2

...

END as cf1

FROM dataset))

Large Case Expression Benchmark

Spark 3.1, local mode, 4GB memory

Page 30: Software Engineer, Workday Complex Plans Asif Shahid

Constraint Propagation

Page 31: Software Engineer, Workday Complex Plans Asif Shahid

What are Constraints

• Filters on column values• Can be used to

• Generate new filters (eg. IsNotNull)• Prune redundant filters• Push down new filters on the "other" side of a join

Page 32: Software Engineer, Workday Complex Plans Asif Shahid

Example: Generate New Filter

Relationa, b

Filtera > 10

Constraints:a is not nulla > 10

Filtera > 10IsNotNull(a)

Relationa, b

Page 33: Software Engineer, Workday Complex Plans Asif Shahid

Example: Prune Redundant Filter

Relationa, b

Filtera > 10

Constraints:a is not nulla > 10

Filtera > 10IsNotNull(a)

Relationa, b

Filtera1 > 10

Projecta, b, a as a1

Projecta, b, a as a1

Page 34: Software Engineer, Workday Complex Plans Asif Shahid

Example: New Filter on “Other” Side of Join

Relationa, b

Filtera > 5, a!=null

Projecta, a as a1, b

Constraints:a is not nulla > 5

Join a1 == xb == y

Relationx, y

Filterx > 5x != null

Relationa, b

Filtera > 5, a!=null

Projecta, a as a1, b

Join a1 == xb == y

Relationx, y

Page 35: Software Engineer, Workday Complex Plans Asif Shahid

Current Constraint Propagation Algorithm

• Traverses tree from bottom to top• On Filter node, create additional IsNotNull constraints• On Project node with alias, create all possible

combinations of constraints

Relationa, b

Filtera > 10

Constraints:a is not nulla > 10

Projecta, b, a as a1

Constraints:a is not nulla > 10a1 is not nulla1 > 10EqualsNullSafe(a, a1)

Page 36: Software Engineer, Workday Complex Plans Asif Shahid

Current Constraint Propagation Algorithm

• To prune filter• Check if the filter already exists in constraints

• To add a new filter to right hand side of join• Check if any constraint exists on join key

• Consider only those constraints dependent on a single join key

Page 37: Software Engineer, Workday Complex Plans Asif Shahid

• Given a filter function F(a, b), if• count of attribute a and its aliases is m• count of attribute b and its aliases is n

Current Algorithm Takes High Memory

Constraints for alias combinations

IsNotNull constraints

EqualsNullSafe constraints

• Then• total intermediate constraints created for 1 such filter expression

≈ m * n + m + n + C(m, 2) + C(n, 2)

Page 38: Software Engineer, Workday Complex Plans Asif Shahid

Recall: Fix for Large Case Expressions

• We created new aliases!• New aliases cause OOM in Catalyst, due to

• Large number of aliases• Large number of operators in plan

Page 39: Software Engineer, Workday Complex Plans Asif Shahid

Optimized Constraint Propagation (SPARK-33152)

• Traverses tree from bottom to top• On Filter node, create additional null constraints• On Project node, create Lists where

• Each List maintains original attribute and its aliases and constraint is stored in terms of original attribute

Relationa, b

Filtera > 10

Constraints:a is not nulla > 10

Projecta, b, a as a1

Constraints:a is not nulla > 10Aliases:[a, a1]

Page 40: Software Engineer, Workday Complex Plans Asif Shahid

Optimized Constraint Propagation (SPARK-33152)

• To prune filter• Rewrite expression in terms of original attribute• a1 > 10 becomes a > 10• Check if canonical version already exists in constraints

Relationa, b

Filtera > 10

Projecta, b, a as a1

Constraints:a is not nulla > 10Aliases:[a, a1]

Filtera1 > 10

Relationa, b

Filtera > 10

Projecta, b, a as a1

Page 41: Software Engineer, Workday Complex Plans Asif Shahid

Optimized Constraint Propagation (SPARK-33152)

• To add a new filter to right hand side of join• Rewrite expression in terms of original attributes• Check if any constraint exists on join key

Relationa, b

Filtera + b > 5, a!=null, b!=null

Projecta, a as a1, b, b as b1

Constraints:a is not nullb is not nulla + b > 5Aliases:[a, a1][b, b1]

Join a1 == xb1 == y

Relationx, y

Filterx + y > 5x != nully != null

Relationa, b

Filtera + b > 5, a!=null, b!=null

Projecta, a as a1, b, b as b1

Join a1 == xb1 == y

Relationx, y

Page 42: Software Engineer, Workday Complex Plans Asif Shahid

Constraint Propagation Algorithms Comparison

Current Algorithm Improved Algorithm

Number of constraints Combinatorial, dependent on the number of aliases

Independent of the number of aliases

Memory usage High Low

Filter pushdown for join Single reference filters Single reference and compound filters

Creation of IsNotNull constraints Can miss IsNotNull constraints Detects more IsNotNull constraints

Page 43: Software Engineer, Workday Complex Plans Asif Shahid

Constraint Propagation BenchmarkSELECT cf

FROM (SELECT cf

FROM (SELECT CASE

WHEN abs(c01) < 1 THEN 1

WHEN abs(c01) < 2 THEN 2

WHEN abs(c02) < 1 THEN 3

WHEN abs(c02) < 2 THEN 4

...

END AS cf

FROM (SELECT sum(a + a) AS c01,

sum(a + b) AS c02

...

FROM dataset

GROUP BY a))

WHERE cf > 0)

INNER JOIN letters ON a = cf

Page 44: Software Engineer, Workday Complex Plans Asif Shahid

Constraint Propagation BenchmarkSELECT cf

FROM (SELECT cf

FROM (SELECT CASE

WHEN abs(c01) < 1 THEN 1

WHEN abs(c01) < 2 THEN 2

WHEN abs(c02) < 1 THEN 3

WHEN abs(c02) < 2 THEN 4

...

END AS cf

FROM (SELECT sum(a + a) AS c01,

sum(a + b) AS c02

...

FROM dataset

GROUP BY a))

WHERE cf > 0)

INNER JOIN letters ON a = cf

Page 45: Software Engineer, Workday Complex Plans Asif Shahid

Constraint Propagation BenchmarkSELECT cf

FROM (SELECT cf

FROM (SELECT CASE

WHEN abs(c01) < 1 THEN 1

WHEN abs(c01) < 2 THEN 2

WHEN abs(c02) < 1 THEN 3

WHEN abs(c02) < 2 THEN 4

...

END AS cf

FROM (SELECT sum(a + a) AS c01,

sum(a + b) AS c02

...

FROM dataset

GROUP BY a))

WHERE cf > 0)

INNER JOIN letters ON a = cf

Page 46: Software Engineer, Workday Complex Plans Asif Shahid

Constraint Propagation BenchmarkSELECT cf

FROM (SELECT cf

FROM (SELECT CASE

WHEN abs(c01) < 1 THEN 1

WHEN abs(c01) < 2 THEN 2

WHEN abs(c02) < 1 THEN 3

WHEN abs(c02) < 2 THEN 4

...

END AS cf

FROM (SELECT sum(a + a) AS c01,

sum(a + b) AS c02

...

FROM dataset

GROUP BY a))

WHERE cf > 0)

INNER JOIN letters ON a = cf

Page 47: Software Engineer, Workday Complex Plans Asif Shahid

Constraint Propagation BenchmarkSELECT cf

FROM (SELECT cf

FROM (SELECT CASE

WHEN abs(c01) < 1 THEN 1

WHEN abs(c01) < 2 THEN 2

WHEN abs(c02) < 1 THEN 3

WHEN abs(c02) < 2 THEN 4

...

END AS cf

FROM (SELECT sum(a + a) AS c01,

sum(a + b) AS c02

...

FROM dataset

GROUP BY a))

WHERE cf > 0)

INNER JOIN letters ON a = cf

Spark 3.1, local mode, 4GB memory

Page 48: Software Engineer, Workday Complex Plans Asif Shahid

Effect on Customer Pipeline

• Financial use case for large insurance company

• Uses nested case statements to validate and categorize data

Page 49: Software Engineer, Workday Complex Plans Asif Shahid

Closing Thoughts

Page 50: Software Engineer, Workday Complex Plans Asif Shahid

Tuning Tips

• Take advantage of CSE• Reduce the number of operators• Limit the number of aliases• Follow SPARK-33152 to receive updates on the

improved constraint propagation algorithm

Page 51: Software Engineer, Workday Complex Plans Asif Shahid

Future Work

• Improve logic for Catalyst rules• PushDownPredicates• CollapseProject

• Implement rules engine in Spark• Algorithms for converting to lookup table• Rete Algorithm

Page 52: Software Engineer, Workday Complex Plans Asif Shahid

Thank You

Page 53: Software Engineer, Workday Complex Plans Asif Shahid

Feedback

Your feedback is important to us.

Don’t forget to rate and review the sessions.