Most Spark explanations stop at the DAG. You learn that transformations are lazy, actions trigger execution, and somewhere in the middle there’s a directed acyclic graph. What you don’t get is what the runtime actually does with that graph — how it’s cut into stages, how tasks are scheduled, and what happens on disk during a shuffle.

That gap matters when you’re debugging a slow job or deciding how to structure a pipeline.

From logical plan to physical plan

When you call an action (.count(), .write(), .collect()), Spark doesn’t execute the DAG directly. It first runs the Catalyst optimizer, which transforms your logical plan through several phases:

  1. Parsed plan — AST from your DataFrame/SQL expression
  2. Analyzed plan — column references resolved against the catalog
  3. Optimized logical plan — predicate pushdown, constant folding, projection pruning applied
  4. Physical plan — concrete execution strategy selected (sort-merge join vs broadcast join, scan pushdown, etc.)

You can inspect this at any stage:

df.explain(mode="extended")  # shows all four plans
df.explain(mode="cost")      # includes row count / size estimates

The physical plan is what Spark actually runs. The DAG you see in the UI is a visualization of the physical plan, not the logical one.

How stage boundaries are drawn

The physical plan is a tree of operators. Spark walks this tree and cuts it at shuffle boundaries — any point where data from multiple partitions must be co-located before the next operation can proceed.

Narrow dependencies (one input partition → one output partition) don’t require a shuffle:

  • map, filter, flatMap
  • union on same-partitioned data
  • mapPartitions

Wide dependencies (many input partitions → one output partition) require a shuffle:

  • groupBy, reduceByKey, aggregateByKey
  • join (unless one side is broadcast)
  • repartition (always), coalesce (only if increasing partition count)
  • sortBy, orderBy

Each contiguous run of narrow dependencies becomes one stage. The boundary is where one stage writes shuffle files and the next stage reads them.

A common pipeline like filter → groupBy → filter → count produces two stages: the first ends at the groupBy shuffle, the second reads the shuffle output, applies the second filter, and counts.

What the task scheduler actually does

When a stage is ready to run (all parent stages have completed and written their shuffle output), the DAG scheduler hands it to the task scheduler. A stage becomes a set of tasks — one task per input partition.

The task scheduler:

  1. Serializes each task — the closure (your lambda/function) plus metadata about which partition to read. The closure is serialized with Java serialization (or Kryo if configured). This is why capturing large objects in closures is expensive: the entire object graph is serialized and shipped to every executor for every task.

  2. Assigns tasks to executors with data locality preference:

    • PROCESS_LOCAL — data is already in the executor’s JVM memory (cached RDD)
    • NODE_LOCAL — data is on the same physical node (HDFS block, local shuffle file)
    • RACK_LOCAL — data is on the same rack
    • ANY — data requires a network fetch

    The scheduler waits up to spark.locality.wait (default 3s) for a slot at the preferred locality before relaxing to the next level. If you see tasks sitting in PENDING briefly in the UI, this is usually why.

  3. Retries failed tasks — up to spark.task.maxFailures times (default 4) before failing the stage. Retries happen on a different executor when possible.

  4. Speculative execution — if enabled (spark.speculation=true), tasks running significantly slower than the median are re-launched on a different executor. The first to finish wins; the other is killed. Useful for dealing with stragglers caused by hardware inconsistency, but dangerous if your tasks have side effects.

The shuffle: what actually lands on disk

Between stages, data is exchanged through a shuffle. The shuffle has two sides:

Map side (shuffle write): Each task in the upstream stage, for each output record, determines which downstream partition it belongs to (based on the partitioner — hash by default, range for sort operations). It writes its output to local disk on the executor, grouped by destination partition. This produces one shuffle file per map task.

Reduce side (shuffle read): Each task in the downstream stage fetches the blocks it owns from every map task’s output. If the downstream stage has 200 partitions and the upstream stage produced 400 map tasks, each reduce task makes up to 400 network requests to fetch its slice of the shuffle.

The shuffle data lands on executor local disk, not HDFS or S3. This has two important implications:

  • Executor disk is a real resource constraint. A heavy shuffle can easily generate hundreds of GB of temp files. Running executors with insufficient local disk causes jobs to fail with no space left on device.
  • If an executor is lost after the shuffle write but before all downstream tasks read from it, those shuffle files are gone. Spark must re-run the upstream stage to regenerate them. This is why executor failures mid-job are more expensive than they look.

Sort-merge vs hash shuffle

Spark has two shuffle implementations:

Sort-merge shuffle (default): Map tasks write records sorted by partition ID (and then by key within each partition, for operations that require sorted output). Reduce tasks merge-sort the inputs they receive. Memory-efficient: doesn’t require holding a partition’s entire output in memory.

Hash shuffle (legacy, mostly gone): Each map task writes one file per output partition — no sorting. Fast for small partition counts, but generates an enormous number of small files at scale (200 map tasks × 200 reduce partitions = 40,000 files). Effectively deprecated.

Broadcast joins bypass the shuffle entirely

When one side of a join is small enough (spark.sql.autoBroadcastJoinThreshold, default 10MB), Spark broadcasts it to every executor rather than shuffling both sides. The driver collects the small relation, serializes it, and the executors cache it in memory. The large table is scanned once without any repartitioning.

Broadcast joins are often the single biggest optimization available for star-schema queries against large fact tables. When Catalyst’s size estimates are wrong (stale statistics, complex subquery), you can force it:

from pyspark.sql.functions import broadcast

result = large.join(broadcast(small), "key")

The cost: the broadcast payload is sent to every executor. Broadcasting a 500MB table to 100 executors means 50GB of network traffic just to set up the join. The 10MB default is conservative for a reason.

Reading the Stage UI

The Spark UI stages tab exposes everything discussed here. A few specific things to look for:

  • Input vs Shuffle Read size — tasks in the first stage of a job read from storage. Tasks in subsequent stages read shuffle data. If shuffle read is orders of magnitude larger than output, you’re doing expensive fan-in aggregation.
  • Spill (memory) and Spill (disk) — when a task’s shuffle data doesn’t fit in the memory buffer (spark.shuffle.memoryFraction), it spills to disk. Spill is expensive (serialize → write → read → deserialize). High spill is a sign of skewed partitions or under-provisioned executor memory.
  • GC time — if GC time is a significant fraction of task duration, your executors are under memory pressure. The fix is usually fewer objects per task (more partitions) or larger executor heap.
  • Task duration distribution — if the max is 10x the median, you have skew. One partition is much larger than the others, and one task is doing most of the work while the rest of the stage sits idle.