The Databricks notebook was supposed to be simple: six small reference tables, one main table of 50,000 rows, and a handful of left joins. By all accounts, this should have been a trivial operation. Instead, it triggered out-of-memory errors, driver crashes, and a DAG so large it looked like a subway map designed by a conspiracy theorist. The engineer did everything “right”, broadcasted small tables, added caching, even tried breaking up the DAG with .count() statements. The cluster still died.
This isn’t a story about insufficient compute. It’s about how good software engineering instincts become dangerous anti-patterns in distributed data systems, and why the most “optimized” PySpark code can be worse than no optimization at all.
The Siren Song of Iterative Join Logic
The core problem, buried in the Reddit thread, reveals a fundamental architectural mismatch. The engineer wasn’t just joining six tables, they were dynamically resolving join keys based on the contents of each row in their mapping tables. For every key in a small reference table, they looped through, filtered the main DataFrame, performed a join, accumulated matches via unionByName, and carried forward the unmatched rows for the next iteration.
Here’s what that actually looks like in practice:
# Extract valid keys from mapping table
valid_keys = (
mapping_df.groupBy("key")
.count()
.select("key")
.distinct()
.rdd.flatMap(lambda row: row)
.collect()
)
# Iterative join logic that builds a monster DAG
current_df = main_df
final_matched_df = None
for key_name, key_cols in filtered_dict.items():
# Drop columns to "reset" state
for col_name in output_columns:
if col_name in current_df.columns:
current_df = current_df.drop(col_name)
# Filter mapping for this specific key
mapping_key = mapping_df.select(*key_cols, *output_columns)\
.where(col("key") == key_name)
# Perform join
joined_df = current_df.join(mapping_key, on=key_cols, how="left")
# Separate matched vs unmatched
matched_df = joined_df.filter(col("key").isNotNull())
# Accumulate matches
if final_matched_df is None:
final_matched_df = matched_df
else:
final_matched_df = final_matched_df.unionByName(matched_df)
# Continue with unmatched rows
current_df = joined_df.filter(col("key").isNull())
This pattern gets executed six times, once per small table. Each iteration appends to the execution plan, creating a DAG that grows linearly with the number of keys. When the community member asked for clarification, the engineer confirmed: “depending on row 3 it will be joined on key 1 and depending on data in row 5 it will be joined on key 2.” This is the smoking gun.
Why Your .count() Hack Makes Everything Worse
The engineer tried breaking up the DAG by inserting .count() statements, a common misconception about Spark’s execution model. As one commenter pointed out, this doesn’t just fail, it actively harms performance. A .count() materializes the DataFrame but doesn’t make it available downstream. When you subsequently use that same DataFrame in a join, Spark either recalculates it entirely (doubling your processing) or intelligently caches it anyway, rendering your .count pointless overhead.
The misunderstanding stems from treating Spark DataFrames like in-memory objects. They’re not. They’re execution plans. Calling .count() is like asking a architect to build a house just to count the rooms, then demolishing it and rebuilding it again when you actually want to live there. The right tool for persistence is .cache() or .persist(), but even that won’t save you from a fundamentally flawed algorithm.
The 50-Row Table That Wasn’t the Problem
Broadcasting the small tables had “minimal impact”, because Spark was already broadcasting them automatically. When your “small” tables have 50-100 rows, they’re well below the spark.sql.autoBroadcastJoinThreshold default of 10MB. The engineer was optimizing something that was already optimized into oblivion.
The real issue? Data modeling by iteration. The approach of “join on key1, accumulate matches, join remaining on key2, accumulate matches” is a procedural solution to a set-based problem. In distributed systems, this creates N separate shuffle operations where you need one. It forces Spark to keep track of intermediate state across iterations, ballooning the DAG and exhausting driver memory just to track the execution graph itself.
The Partitioning Problem Hiding in Plain Sight
While the Reddit thread focused on DAG complexity, the real-world scale issue emerges when this pattern meets production data volumes. The repartition vs coalesce analysis demonstrates what happens when similar anti-patterns scale to 100 million rows. Partitioning becomes an architectural decision, not a performance tweak.
The iterative union approach creates massive skew. Each unionByName stage waits for the previous to complete, preventing parallelization. At scale, this means:
– Too few partitions: Executors sit idle while a single thread handles the union chain
– Too many partitions: Scheduling overhead explodes as Spark tries to coordinate thousands of tiny tasks
– Wrong partitioning: Each join triggers a full shuffle because the DataFrame isn’t co-located
The solution isn’t more tuning, it’s eliminating the iterative pattern entirely.
Set-Based Thinking: The Actual Fix
The correct architecture treats the mapping table as what it is: a configuration dataset. Instead of iterating through keys procedurally, express the logic declaratively:
# Instead of iterative joins, use a single multi-condition join
# First, flatten your mapping rules into a proper lookup table
from pyspark.sql.functions import when, col
# Create a unified mapping with explicit join conditions
unified_mapping = mapping_df.select(
"key",
"key1", "key2", "key3", # Your join columns
"output_column"
)
# Perform a single left join with conditional logic
result_df = main_df.join(
unified_mapping,
on=[
(main_df.primary_key == unified_mapping.key1) |
(main_df.secondary_key == unified_mapping.key2) |
(main_df.tertiary_key == unified_mapping.key3)
],
how="left"
).select(
main_df["*"],
unified_mapping.output_column
)
# Handle the missing flag in one pass
final_df = result_df.withColumn(
"missing_flag",
when(unified_mapping.key.isNull(), "Y").otherwise("N")
)
This approach creates one join operation in the DAG, not N. Spark can optimize the entire shuffle as a single stage, apply broadcast automatically, and parallelize across all executors simultaneously. The execution plan goes from O(N) complexity to O(1).
The Modular Code vs. Performance Fallacy
The engineer’s side question cuts to the heart of a common identity crisis: “Is it better to write code modularly using functions or without?” After months of PySpark, they suspected modular code was the problem.
This is a false dichotomy. As the community response clarified: Spark doesn’t care how your code is organized. Functions have zero impact on the query plan. The issue isn’t modularity, it’s what you’re modularizing.
Bad practice modularized:
def iterative_join(df, mapping_df, key):
# Builds DAG fragments
return df.join(mapping_df.filter(col("key") == key), ...)
result = main_df
for k in keys:
result = iterative_join(result, mapping_df, k) # DAG grows
Good practice modularized:
def build_unified_mapping(mappings):
# Returns a single DataFrame
return union_all([m.select("key", "output") for m in mappings])
def apply_lookup(main_df, mapping_df):
# Returns a single-join plan
return main_df.join(mapping_df, ...)
mapping = build_unified_mapping(small_tables)
result = apply_lookup(main_df, mapping) # One DAG
The difference isn’t functions vs. no functions. It’s whether your abstractions respect distributed computing primitives.
When Caching Actually Matters
Caching isn’t magic fairy dust. In this scenario, caching the main table before the iterative loop would have helped, but only by masking the underlying problem. The real solution eliminates the need for caching entirely.
If you must cache, do it strategically:
# Cache only if you'll reuse the DataFrame multiple times
# In the FIXED version, caching is often unnecessary
main_df.cache() # Only if reused across separate actions
# In the BROKEN iterative version, this might help but won't fix the DAG size
current_df.cache() # Still builds massive plan, just materializes intermediates
The repartition article reinforces this: at 100M rows, caching decisions affect whether your job finishes in minutes or hours. But caching a bad algorithm just means you’ll run out of memory more predictably.
The Team Collaboration Cost
The original engineer noted their code was “slow and confusing to look at” and made them the single point of contact. This is the hidden tax of procedural data processing. When logic is encoded in loops and iteration, it’s impossible for another developer to reason about the final state without executing the entire mental simulation.
Set-based code is self-documenting:
# What does this do? Joins on multiple keys, creates flag
result = main_df.join(mapping, on=[key1_condition, key2_condition, key3_condition])
Procedural code requires a debugger:
# What does this do? Loop through keys, filter, join, union, track state...
# You have to read every line and simulate the loop
In data engineering, maintainability isn’t about comments, it’s about expressing intent through structure. A 50-line declarative join is infinitely more maintainable than a 20-line iterative solution that hides a O(N²) complexity bomb.
The Takeaway: Stop Treating Spark Like a Python Interpreter
The entire saga boils down to one principle: PySpark is not Python with extra steps. It’s a declarative DSL for distributed set operations. When you bring software engineering patterns like iteration, accumulation, and procedural logic into a distributed context, you create performance disasters that no amount of cluster tuning can fix.
The controversial truth? Your software engineering background isn’t an asset here, it’s a liability until you unlearn these instincts. The best PySpark code looks “simple” or even “naive” to a traditional developer. A single join. A straightforward filter. No loops. No cleverness.
That’s not a bug in your education. It’s a feature of distributed systems. The complexity moves from code to data modeling. Get the model right, and the code writes itself. Get it wrong, and you’ll spend weeks optimizing a fundamentally broken architecture.
Next time your “small” job fails big, don’t reach for spark.conf.set(). Reach for the ER diagram. The problem isn’t in your DAG, it’s in your schema.




