From 5ab2682659b5943e53d82b3c9c483c38e9ecf8d2 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 09:12:26 +0000 Subject: [PATCH 1/9] [DOCS][CONNECT] Document DataFrame column resolution behavior in spark-connect-gotchas Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 43 ++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index f1973133d335b..d510677f5b890 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -73,7 +73,7 @@ Unlike query execution, Spark Classic and Spark Connect differ in when schema an # Common Gotchas (with Mitigations) -If you are not careful about the difference between lazy vs. eager analysis, there are four key gotchas to be aware of: 1) overwriting temporary view names, 2) capturing external variables in UDFs, 3) delayed error detection, and 4) excessive schema access on new DataFrames. +If you are not careful about the difference between lazy vs. eager analysis, there are five key gotchas to be aware of: 1) overwriting temporary view names, 2) capturing external variables in UDFs, 3) delayed error detection, 4) excessive schema access on new DataFrames, and 5) DataFrame column references after a column is shadowed. ## 1. Reusing temporary view names @@ -418,6 +418,46 @@ println(structColumnFields) This approach is significantly faster when dealing with a large number of columns because it avoids creating and analyzing numerous DataFrames. +## 5. DataFrame column references after column shadowing + +In Spark Connect, a DataFrame column reference such as `df["col"]` is tagged with the plan id of `df`. At analysis time the server resolves the reference by looking for the tagged ancestor in the plan and pulling the matching attribute from it. Spark Classic does not use plan ids; it resolves column references against the immediate child's output by attribute id and name. + +The two resolution strategies diverge once a column has been shadowed by another operator that produces an attribute with the same name: + +```python +import pyspark.sql.functions as F + +df = spark.sql("SELECT 'x' AS col") +df.withColumn("col", F.col("col").cast("string")).select(df["col"]).collect() +``` + +`withColumn("col", ...)` does not mutate `df`; it returns a new DataFrame whose `col` is a new attribute that hides the original. The trailing `df["col"]` still refers to the *original* `col` attribute, which is no longer in the projection list. + +* **Spark Classic** has always rejected this query at analysis time with `MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION`, because the original attribute is not present in the operator's child output. +* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan, and strict resolution does not fall back to name-based resolution. + +### Mitigation + +Use `F.col("col")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["col"]` (a tagged reference to `df`'s original column): + +```python +import pyspark.sql.functions as F + +df = spark.sql("SELECT 'x' AS col") +df.withColumn("col", F.col("col").cast("string")).select(F.col("col")).collect() +``` + +**Scala example:** + +```scala +import org.apache.spark.sql.functions._ + +val df = spark.sql("SELECT 'x' AS col") +df.withColumn("col", col("col").cast("string")).select(col("col")).collect() +``` + +If you cannot change the call sites and want Spark Connect to accept the shadowed pattern, set the internal config `spark.sql.analyzer.strictDataFrameColumnResolution=false` to opt into a name-based fallback: when plan-id-based resolution does not find the tagged attribute, the analyzer also tries to resolve the reference by name. The lenient fallback is intended as an escape hatch and is not the default — prefer fixing the call site. + # Summary | Aspect | Spark Classic | Spark Connect | @@ -428,5 +468,6 @@ This approach is significantly faster when dealing with a large number of column | **Schema access** | Local | Triggers RPC, and caches the schema on first access | | **Temporary views** | Plan embedded | Name lookup | | **UDF serialization** | At creation | At execution | +| **DataFrame column references** | Resolved against child attributes by id/name | Resolved against the tagged ancestor's plan id | The key difference is that Spark Connect defers analysis and name resolution to execution time. From 7bacb74a4610db0d722165b86182df2a23d150e9 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:26:50 +0000 Subject: [PATCH 2/9] Rename example column from 'col' to 'c' to avoid confusion with F.col() Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index d510677f5b890..bb5be46e916f5 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -420,31 +420,31 @@ This approach is significantly faster when dealing with a large number of column ## 5. DataFrame column references after column shadowing -In Spark Connect, a DataFrame column reference such as `df["col"]` is tagged with the plan id of `df`. At analysis time the server resolves the reference by looking for the tagged ancestor in the plan and pulling the matching attribute from it. Spark Classic does not use plan ids; it resolves column references against the immediate child's output by attribute id and name. +In Spark Connect, a DataFrame column reference such as `df["c"]` is tagged with the plan id of `df`. At analysis time the server resolves the reference by looking for the tagged ancestor in the plan and pulling the matching attribute from it. Spark Classic does not use plan ids; it resolves column references against the immediate child's output by attribute id and name. The two resolution strategies diverge once a column has been shadowed by another operator that produces an attribute with the same name: ```python import pyspark.sql.functions as F -df = spark.sql("SELECT 'x' AS col") -df.withColumn("col", F.col("col").cast("string")).select(df["col"]).collect() +df = spark.sql("SELECT 'x' AS c") +df.withColumn("c", F.col("c").cast("string")).select(df["c"]).collect() ``` -`withColumn("col", ...)` does not mutate `df`; it returns a new DataFrame whose `col` is a new attribute that hides the original. The trailing `df["col"]` still refers to the *original* `col` attribute, which is no longer in the projection list. +`withColumn("c", ...)` does not mutate `df`; it returns a new DataFrame whose `c` is a new attribute that hides the original. The trailing `df["c"]` still refers to the *original* `c` attribute, which is no longer in the projection list. * **Spark Classic** has always rejected this query at analysis time with `MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION`, because the original attribute is not present in the operator's child output. * **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan, and strict resolution does not fall back to name-based resolution. ### Mitigation -Use `F.col("col")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["col"]` (a tagged reference to `df`'s original column): +Use `F.col("c")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["c"]` (a tagged reference to `df`'s original column): ```python import pyspark.sql.functions as F -df = spark.sql("SELECT 'x' AS col") -df.withColumn("col", F.col("col").cast("string")).select(F.col("col")).collect() +df = spark.sql("SELECT 'x' AS c") +df.withColumn("c", F.col("c").cast("string")).select(F.col("c")).collect() ``` **Scala example:** @@ -452,8 +452,8 @@ df.withColumn("col", F.col("col").cast("string")).select(F.col("col")).collect() ```scala import org.apache.spark.sql.functions._ -val df = spark.sql("SELECT 'x' AS col") -df.withColumn("col", col("col").cast("string")).select(col("col")).collect() +val df = spark.sql("SELECT 'x' AS c") +df.withColumn("c", col("c").cast("string")).select(col("c")).collect() ``` If you cannot change the call sites and want Spark Connect to accept the shadowed pattern, set the internal config `spark.sql.analyzer.strictDataFrameColumnResolution=false` to opt into a name-based fallback: when plan-id-based resolution does not find the tagged attribute, the analyzer also tries to resolve the reference by name. The lenient fallback is intended as an escape hatch and is not the default — prefer fixing the call site. From ee50079ffe3c7f20256dc7366c5a73ce00527996 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:28:38 +0000 Subject: [PATCH 3/9] Use 'sf' alias for pyspark.sql.functions in new gotcha section Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index bb5be46e916f5..5897614dae49d 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -425,10 +425,10 @@ In Spark Connect, a DataFrame column reference such as `df["c"]` is tagged with The two resolution strategies diverge once a column has been shadowed by another operator that produces an attribute with the same name: ```python -import pyspark.sql.functions as F +import pyspark.sql.functions as sf df = spark.sql("SELECT 'x' AS c") -df.withColumn("c", F.col("c").cast("string")).select(df["c"]).collect() +df.withColumn("c", sf.col("c").cast("string")).select(df["c"]).collect() ``` `withColumn("c", ...)` does not mutate `df`; it returns a new DataFrame whose `c` is a new attribute that hides the original. The trailing `df["c"]` still refers to the *original* `c` attribute, which is no longer in the projection list. @@ -438,13 +438,13 @@ df.withColumn("c", F.col("c").cast("string")).select(df["c"]).collect() ### Mitigation -Use `F.col("c")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["c"]` (a tagged reference to `df`'s original column): +Use `sf.col("c")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["c"]` (a tagged reference to `df`'s original column): ```python -import pyspark.sql.functions as F +import pyspark.sql.functions as sf df = spark.sql("SELECT 'x' AS c") -df.withColumn("c", F.col("c").cast("string")).select(F.col("c")).collect() +df.withColumn("c", sf.col("c").cast("string")).select(sf.col("c")).collect() ``` **Scala example:** From 118ff74483ea36b10caea2183c0c38f7b59ff9fc Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:42:41 +0000 Subject: [PATCH 4/9] Reference strictDataFrameColumnResolution config inline in section 5 Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index 5897614dae49d..53d0d4b02f86a 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -434,11 +434,11 @@ df.withColumn("c", sf.col("c").cast("string")).select(df["c"]).collect() `withColumn("c", ...)` does not mutate `df`; it returns a new DataFrame whose `c` is a new attribute that hides the original. The trailing `df["c"]` still refers to the *original* `c` attribute, which is no longer in the projection list. * **Spark Classic** has always rejected this query at analysis time with `MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION`, because the original attribute is not present in the operator's child output. -* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan, and strict resolution does not fall back to name-based resolution. +* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan, and strict resolution does not fall back to name-based resolution. The strict behavior is controlled by the SQL config `spark.sql.analyzer.strictDataFrameColumnResolution` (added in Spark 4.2.0, default `true`). Setting it to `false` opts into a name-based fallback that resolves the tagged reference by name when plan-id-based resolution does not find the tagged attribute. ### Mitigation -Use `sf.col("c")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["c"]` (a tagged reference to `df`'s original column): +Prefer fixing the call site: use `sf.col("c")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["c"]` (a tagged reference to `df`'s original column): ```python import pyspark.sql.functions as sf @@ -456,7 +456,7 @@ val df = spark.sql("SELECT 'x' AS c") df.withColumn("c", col("c").cast("string")).select(col("c")).collect() ``` -If you cannot change the call sites and want Spark Connect to accept the shadowed pattern, set the internal config `spark.sql.analyzer.strictDataFrameColumnResolution=false` to opt into a name-based fallback: when plan-id-based resolution does not find the tagged attribute, the analyzer also tries to resolve the reference by name. The lenient fallback is intended as an escape hatch and is not the default — prefer fixing the call site. +If you cannot change the call sites, set `spark.sql.analyzer.strictDataFrameColumnResolution=false` to opt into the lenient name-based fallback. This is intended as an escape hatch and is not the default. # Summary From ce53b1014b8d9a38bdeb53e3a45c7ae436f64287 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:46:17 +0000 Subject: [PATCH 5/9] Use integer literal in section 5 examples so cast('string') is meaningful Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index 53d0d4b02f86a..b720c41fe2928 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -427,7 +427,7 @@ The two resolution strategies diverge once a column has been shadowed by another ```python import pyspark.sql.functions as sf -df = spark.sql("SELECT 'x' AS c") +df = spark.sql("SELECT 1 AS c") df.withColumn("c", sf.col("c").cast("string")).select(df["c"]).collect() ``` @@ -443,7 +443,7 @@ Prefer fixing the call site: use `sf.col("c")` (an untagged name reference) when ```python import pyspark.sql.functions as sf -df = spark.sql("SELECT 'x' AS c") +df = spark.sql("SELECT 1 AS c") df.withColumn("c", sf.col("c").cast("string")).select(sf.col("c")).collect() ``` @@ -452,7 +452,7 @@ df.withColumn("c", sf.col("c").cast("string")).select(sf.col("c")).collect() ```scala import org.apache.spark.sql.functions._ -val df = spark.sql("SELECT 'x' AS c") +val df = spark.sql("SELECT 1 AS c") df.withColumn("c", col("c").cast("string")).select(col("c")).collect() ``` From b2c0fb52ad19ce0cae393623a1d63777e3da431d Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:50:39 +0000 Subject: [PATCH 6/9] Restructure strict-Connect bullet to contrast strict and lenient modes Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index b720c41fe2928..add602d29f32e 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -434,7 +434,7 @@ df.withColumn("c", sf.col("c").cast("string")).select(df["c"]).collect() `withColumn("c", ...)` does not mutate `df`; it returns a new DataFrame whose `c` is a new attribute that hides the original. The trailing `df["c"]` still refers to the *original* `c` attribute, which is no longer in the projection list. * **Spark Classic** has always rejected this query at analysis time with `MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION`, because the original attribute is not present in the operator's child output. -* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan, and strict resolution does not fall back to name-based resolution. The strict behavior is controlled by the SQL config `spark.sql.analyzer.strictDataFrameColumnResolution` (added in Spark 4.2.0, default `true`). Setting it to `false` opts into a name-based fallback that resolves the tagged reference by name when plan-id-based resolution does not find the tagged attribute. +* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan. But when the SQL config `spark.sql.analyzer.strictDataFrameColumnResolution` (added in Spark 4.2.0, default `true`) is set to `false`, the analyzer falls back to name-based resolution: the tagged `df["c"]` is resolved by name against the projected `c` from `withColumn`, and the query succeeds. ### Mitigation From 81383f8488ddaa10f396e57b606b5124384d67d9 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:52:15 +0000 Subject: [PATCH 7/9] Clarify lenient mode tries plan-id first, then falls back to name-based Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index add602d29f32e..91b06b9f01398 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -434,7 +434,7 @@ df.withColumn("c", sf.col("c").cast("string")).select(df["c"]).collect() `withColumn("c", ...)` does not mutate `df`; it returns a new DataFrame whose `c` is a new attribute that hides the original. The trailing `df["c"]` still refers to the *original* `c` attribute, which is no longer in the projection list. * **Spark Classic** has always rejected this query at analysis time with `MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION`, because the original attribute is not present in the operator's child output. -* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan. But when the SQL config `spark.sql.analyzer.strictDataFrameColumnResolution` (added in Spark 4.2.0, default `true`) is set to `false`, the analyzer falls back to name-based resolution: the tagged `df["c"]` is resolved by name against the projected `c` from `withColumn`, and the query succeeds. +* **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan. But when the SQL config `spark.sql.analyzer.strictDataFrameColumnResolution` (added in Spark 4.2.0, default `true`) is set to `false`, the analyzer still tries plan-id-based resolution first, and only when that fails does it fall back to name-based resolution: the tagged `df["c"]` is then resolved by name against the projected `c` from `withColumn`, and the query succeeds. ### Mitigation From 25c529342cadc5b045d1f7bbdb784bb529fd5231 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:55:04 +0000 Subject: [PATCH 8/9] Reframe section 5 mitigation as 'Recommended way' to switch to sf.col Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index 91b06b9f01398..0bfc888bdee48 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -436,9 +436,9 @@ df.withColumn("c", sf.col("c").cast("string")).select(df["c"]).collect() * **Spark Classic** has always rejected this query at analysis time with `MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION`, because the original attribute is not present in the operator's child output. * **Spark Connect** rejects it with `CANNOT_RESOLVE_DATAFRAME_COLUMN` by default. The plan-id-tagged reference does not match any attribute in the current plan. But when the SQL config `spark.sql.analyzer.strictDataFrameColumnResolution` (added in Spark 4.2.0, default `true`) is set to `false`, the analyzer still tries plan-id-based resolution first, and only when that fails does it fall back to name-based resolution: the tagged `df["c"]` is then resolved by name against the projected `c` from `withColumn`, and the query succeeds. -### Mitigation +### Recommended way -Prefer fixing the call site: use `sf.col("c")` (an untagged name reference) when you intend to refer to the column produced by the most recent projection or `withColumn`, rather than `df["c"]` (a tagged reference to `df`'s original column): +If you hit any of the confusing failures mentioned above, it is recommended to switch to `sf.col` first. `sf.col("c")` is an untagged name reference that resolves against the most recent projection or `withColumn`, rather than `df["c"]` which is a tagged reference to `df`'s original column: ```python import pyspark.sql.functions as sf From 837659237b19da69a8adbf6ce870f07e16907e67 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 13 May 2026 11:57:38 +0000 Subject: [PATCH 9/9] Shorten DataFrame column references row to Eager vs Lazy/plan-id Generated-by: Claude Code (Anthropic), claude-opus-4-7 --- docs/spark-connect-gotchas.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/spark-connect-gotchas.md b/docs/spark-connect-gotchas.md index 0bfc888bdee48..b60b0e4feaca0 100644 --- a/docs/spark-connect-gotchas.md +++ b/docs/spark-connect-gotchas.md @@ -468,6 +468,6 @@ If you cannot change the call sites, set `spark.sql.analyzer.strictDataFrameColu | **Schema access** | Local | Triggers RPC, and caches the schema on first access | | **Temporary views** | Plan embedded | Name lookup | | **UDF serialization** | At creation | At execution | -| **DataFrame column references** | Resolved against child attributes by id/name | Resolved against the tagged ancestor's plan id | +| **DataFrame column references** | Eagerly resolved | Lazily resolved against plan id | The key difference is that Spark Connect defers analysis and name resolution to execution time.