Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion docs/spark-connect-gotchas.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Unlike query execution, Spark Classic and Spark Connect differ in when schema an

# Common Gotchas (with Mitigations)

If you are not careful about the difference between lazy vs. eager analysis, there are four key gotchas to be aware of: 1) overwriting temporary view names, 2) capturing external variables in UDFs, 3) delayed error detection, and 4) excessive schema access on new DataFrames.
If you are not careful about the difference between lazy vs. eager analysis, there are five key gotchas to be aware of: 1) overwriting temporary view names, 2) capturing external variables in UDFs, 3) delayed error detection, 4) excessive schema access on new DataFrames, and 5) DataFrame column references after a column is shadowed.

## 1. Reusing temporary view names

Expand Down Expand Up @@ -418,6 +418,46 @@ println(structColumnFields)

This approach is significantly faster when dealing with a large number of columns because it avoids creating and analyzing numerous DataFrames.

## 5. DataFrame column references after column shadowing

In Spark Connect, a DataFrame column reference such as `df["c"]` is tagged with the plan id of `df`. At analysis time the server resolves the reference by looking for the tagged ancestor in the plan and pulling the matching attribute from it. Spark Classic does not use plan ids; it resolves column references against the immediate child's output by attribute id and name.

The two resolution strategies diverge once a column has been shadowed by another operator that produces an attribute with the same name:

```python
import pyspark.sql.functions as sf

df = spark.sql("SELECT 1 AS c")
df.withColumn("c", sf.col("c").cast("string")).select(df["c"]).collect()
```

`withColumn("c", ...)` does not mutate `df`; it returns a new DataFrame whose `c` is a new attribute that hides the original. The trailing `df["c"]` still refers to the *original* `c` attribute, which is no longer in the projection list.

* **Spark Classic** has always rejected this query at analysis time with `MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION`, because the original attribute is not present in the operator's child output.
* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan. But when the SQL config `spark.sql.analyzer.strictDataFrameColumnResolution` (added in Spark 4.2.0, default `true`) is set to `false`, the analyzer still tries plan-id-based resolution first, and only when that fails does it fall back to name-based resolution: the tagged `df["c"]` is then resolved by name against the projected `c` from `withColumn`, and the query succeeds.

### Recommended way

If you hit any of the confusing failures mentioned above, it is recommended to switch to `sf.col` first. `sf.col("c")` is an untagged name reference that resolves against the most recent projection or `withColumn`, rather than `df["c"]` which is a tagged reference to `df`'s original column:

```python
import pyspark.sql.functions as sf

df = spark.sql("SELECT 1 AS c")
df.withColumn("c", sf.col("c").cast("string")).select(sf.col("c")).collect()
```

**Scala example:**

```scala
import org.apache.spark.sql.functions._

val df = spark.sql("SELECT 1 AS c")
df.withColumn("c", col("c").cast("string")).select(col("c")).collect()
```

If you cannot change the call sites, set `spark.sql.analyzer.strictDataFrameColumnResolution=false` to opt into the lenient name-based fallback. This is intended as an escape hatch and is not the default.

# Summary

| Aspect | Spark Classic | Spark Connect |
Expand All @@ -428,5 +468,6 @@ This approach is significantly faster when dealing with a large number of column
| **Schema access** | Local | Triggers RPC, and caches the schema on first access |
| **Temporary views** | Plan embedded | Name lookup |
| **UDF serialization** | At creation | At execution |
| **DataFrame column references** | Eagerly resolved | Lazily resolved against plan id |

The key difference is that Spark Connect defers analysis and name resolution to execution time.