Spark Execution Demystified

charantej thota
5 min readFeb 15, 2022

I believe asking right questions helps us instill greater understanding of any process or systems. In my previous article I mentioned two phases of spark execution. The conversion of user program into logical plan and the next is to convert logical plan into physical plan. My attempt in this article is demystify the process and answer below questions at the same time to get achieve greater understanding of this process.

This article helps us to bring perspective for below questions

  1. how Spark Analyzer converts unresolved plan into resolved plans?
  2. how Spark Catalyst optimizer extracts the best logical plan?
  3. how DAG scheduler converts logical plan into physical plan?

Simple demonstration of the above steps, if we are to run a sample PySpark code to understand what an unresolved plan looks like and how they are optimized and resolved into final physical plan (also called as Spark Plan)

Some key points to note:
1. Spark Execution is lazy, its triggered by an action to RDD
2. Explaining plan is triggered by explain plan, however an executor is not called to explain the plan.

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
df = spark.range(500).toDF("number").filter("number < 600").withColumn("mod_val", F.col("number") % 100)
df.explain(extended=True)
------------------------------------------------------------------
Result:
== Parsed Logical Plan ==
'Project [number#2L, ('number % 100) AS mod_val#4]
+- Filter (number#2L < cast(600 as bigint))
+- Project [id#0L AS number#2L]
+- Range (0, 500, step=1, splits=Some(1))
== Analyzed Logical Plan ==
number: bigint, mod_val: bigint
Project [number#2L, (number#2L % cast(100 as bigint)) AS mod_val#4L]
+- Filter (number#2L < cast(600 as bigint))
+- Project [id#0L AS number#2L]
+- Range (0, 500, step=1, splits=Some(1))
== Optimized Logical Plan ==
Project [id#0L AS number#2L, (id#0L % 100) AS mod_val#4L]
+- Filter (id#0L < 600)
+- Range (0, 500, step=1, splits=Some(1))
== Physical Plan ==
*(1) Project [id#0L AS number#2L, (id#0L % 100) AS mod_val#4L]
+- *(1) Filter (id#0L < 600)
+- *(1) Range (0, 500, step=1, splits=1)

How Spark Analyzer converts unresolved plan into resolved plans?

Spark Analyzer:

analyzer is a rule executor from an input Tree Node by resolving, removing and in general modifying. Analyzer and optimizer both of them are direct implementation of abstract classes of rule executor. Analyzer helps to resolve the logical plan, while optimizer takes responsibility of optimizing the resolved plan. As you can see in the above example the step after “Parsed Logical Plan” is analyzed logical plan which is an output from Analyzer and immediately followed by Optimized Logical plan which is the result of optimizer execution.

what is abstract classes ?
Abstract classes are the classes in Java, which can have a set of declared/defined functions. However, the specific nature of abstract classes is they cannot be instantiated. These classes however can be inherited using ‘extends’ to other classes which can be instantiated. The class which inherits this abstract class can therefore use the defined function or define the declared functions or the functions can be overridden.
How does spark Catalog help?
Spark Catalog is a repository of the objects (either actual or derived) and help analyzer while arriving at logical plan by resolving the required information regarding the table,columns etc.

while I’m not attempting to into implementation of each rule, but I would still like to point some examples to understand the nature of the rules. As an example, if we want to resolve an alias, spark would use Batch name with resolution and strategy as fixed point and use the rule as resolve alias. There is a whole lot of another batches, strategy and alias which can be viewed here.

What is batch?
Batch is nothing but a logical name for a bunch of rules those have similar behavior in nature. Batch name can look like Hints, Simple Sanity Check, Resolution. This means all several resolution rules comes under the batch name of Resolution.
What is strategy?
Strategy indicates a maximum number of iterations, internally its just another abstract class with fields to indicate maximum number of iteration. strategy “once” mean it runs through only one iteration and strategy “fixed point” has several number of iterations.
What is a rule?
resolving alias can be a rule, rule is applied to a plan using apply function which then resolves the alias.

How Spark Catalyst optimizer extracts the best logical plan?

Catalyst Optimizer

Catalyst optimizer is another implementation from Rule Executor abstract class. This makes optimizer a rule executor similar to analyzer, but the input for the optimizer is resolved plan from spark analyzer. The major difference between the two rule executors are the type of the rules they apply at different phases of logical plan resolution and optimization. The rules which are executed in optimization phase can be found here

Examples of rules in optimizer
Eliminate Logical view, Replace Expressions are some of the examples relating to spark optimizer. Replace Expressions is a logical rule to look at the tree node of analyzed plan to reduce all expressions under one node into one. thus optimizing overall plan. There are other logical rules to support different optimization rules.

How DAG scheduler converts logical plan into physical plan?

DAG stands for Directed Acyclic Graph, DAG scheduler primarily focuses on converting optimized to Jobs, Stages and Tasks. Task is the smallest unit of work in this hierarchy, a stage can contain number of tasks. Its important to note stages can be dependent on each other. A job can contain number of stages. DAG works with resource manager to request to compute resources to execute the tasks. DAG scheduler also tracks the executions of number of jobs and stages.

The cost based model arrives at estimates using table statistics, this can be enabled by a spark configuration. This chooses data size, number of rows, column statistics (i.e. type of operations) to arrive at these stats and then chose the optimized model. In a nutshell, the physical plan is nothing but changing the optimized plan into a RDD lineage where nodes of DAG is RDD an the edges are operations/actions on it.

What is AQE?
AQE stands for adaptive query execution. Spark based on the run time stats can decide to purse another optimized plan subject to availability and detection by spark during the runtime. As it happens during time, explain function won’t be able to show this plan. The only way we can identify this is during the execution itself based on spark UI. This feature of spark ships with 3.0

What are wide and narrow transformations?
As we understand the key success of spark is to work parallelized by breaking the data into multiple partitions/blocks. However, there can be action we want to perform across data which might possibly result in causing a shuffle event of the data across partitions. These type of transformation are called as wide transformations. for example Join, Re-partition etc. However, if we are to filter data we might not need to shuffle data between different partitions. These type of transformations are called narrow transformations. Naturally, wide transformations cause more compute requirements for spark.
Note: Despite the transformation being wide or narrow, spark needs an action to be called for triggering the execution.

References

--

--

charantej thota

About 12 years professional experience. Working in multiple domains ML/DL research, Software Engineering, Data Engineering and Data Science.