Skip to content

Add deferrable=True support to SparkSubmitOperator (issue #67168)#68277

Open
bujjibabukatta wants to merge 1 commit into
apache:mainfrom
bujjibabukatta:fix/#67168
Open

Add deferrable=True support to SparkSubmitOperator (issue #67168)#68277
bujjibabukatta wants to merge 1 commit into
apache:mainfrom
bujjibabukatta:fix/#67168

Conversation

@bujjibabukatta

Copy link
Copy Markdown
Contributor

Closes #67168

Summary

Adds deferrable: bool = False parameter to SparkSubmitOperator as
requested in #67168 (follow-up to #67118).

Two execution modes

Mode Behaviour
deferrable=False (default) Existing sync path via ResumableJobMixin — worker slot held during polling, reconnects to existing driver on infrastructure failure
deferrable=True Submits job then calls self.defer()SparkDriverTrigger — worker slot freed during polling, crash recovery handled by trigger-row persistence

Changes

operators/spark_submit.py

  • Add deferrable: bool param (respects operators.default_deferrable config)
  • Add if self.deferrable: branch in execute() — submits job then defers to SparkDriverTrigger
  • Add execute_complete() — callback fired when trigger completes, raises AirflowException on failure
  • Add _build_master_rest_urls() — builds REST API URLs from connection extras, supports HA (comma-separated masters)

triggers/spark_submit.py (new)

  • SparkDriverTrigger — async trigger that polls Spark standalone REST API via aiohttp
  • Supports HA: tries each master URL in order on every poll
  • Treats RELAUNCHING/UNKNOWN as still-alive (master in recovery)
  • serialize() implemented so Airflow can re-create the trigger after a crash

provider.yaml

  • Register triggers/spark_submit module so Airflow triggerer process discovers SparkDriverTrigger

pyproject.toml

  • Add aiohttp>=3.9.0 dependency (required by SparkDriverTrigger)

Tests

  • tests/triggers/test_spark_submit.py (new) — 11 tests covering SparkDriverTrigger serialize, run loop, HA failover, REST polling
  • tests/operators/test_spark_submit.py — new TestSparkSubmitOperatorDeferrable class with 9 tests

Relationship to #67118

Both modes share spark_job_id in task_state. A user can switch from deferrable=False to deferrable=True without any state migration.

@SameerMesiah97 SameerMesiah97 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only managed to get through part of the diff in my first pass. But I will revisit later. The overall approach is fine but there are some issues that I have pointed out in the comments.

I would review the entire diff to make sure there are no rough edges. Of course, CI is failing so I would fix that as well.

Honestly, I think this should be converted to draft before requesting any reviews from maintainers as the current state will result in endless back and forth during reviews. Once you feel it is ready, I would request review again.

polls the Spark REST API. On crash the trigger is re-created from
its serialised state (no reconnect needed). On user-clear, execute()
runs again and a fresh job is submitted.
If ``False`` (default), the sync ``ResumableJobMixin`` path is used.

@SameerMesiah97 SameerMesiah97 Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring entry is a bit too verbose. I don't think there is a need to go deep into the mechanics of deferrable mode here. I understand you want to explain the implications of an adjacent development i.e. ResumableJobMixin but I believe that might be better suited for a comment. I think you should use this instead:

:param deferrable: Run operator in deferrable mode.

if self.deferrable:
driver_id = self.submit_job(context)
master_urls = self._build_master_rest_urls()
from airflow.providers.apache.spark.triggers.spark_submit import SparkDriverTrigger

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import belongs at the top of the file.

Called by Airflow when the trigger fires after deferrable=True execution.
Raises AirflowException if the driver did not finish successfully.
"""
from airflow.providers.common.compat.sdk import AirflowException

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. And there has been a consensus for a while to move away from AirflowException in favour of native python Errors for e.g. ValueError, RuntineError or SDK exceptions if available. I would remove AirflowException.

"""
from airflow.providers.common.compat.sdk import AirflowException
driver_state = event.get("driver_state", "UNKNOWN")
driver_id = event.get("driver_id", "unknown")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can see 2 issues here:

  1. The fallback for driver_state when the event payload containts no "driver_state" is defaulting to UNKNOWN which is a valid spark driver state. Is it really the same as the state not being retrievable?
  2. There is an overlap between the fallback value for driver_id and UNKNOWN, which may confuse users reading the logs.

I think it would be better to validate both like this:

status = event.get("status")
driver_id = event.get("driver_id")

if status is None:
    raise RuntimeError(f"Malformed trigger event: {event}")

if driver_id is None:
    raise RuntimeError(f"Malformed trigger event: {event}")

"""
if self._hook is None:
self._hook = self._get_hook()
scheme = self._hook._connection.get("rest_scheme", "http")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why you are accessing the private attribute _connection to get the scheme. Isn't there a public API you can use?

scheme = self._hook._connection.get("rest_scheme", "http")
rest_port = self._hook._connection.get("rest_port", 6066)
master_hosts = self._hook._connection["master"].replace("spark://", "").split(",")
return [f"{scheme}://{m.strip().split(':')[0]}:{rest_port}" for m in master_hosts]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the list comprehension is doing way too much heavy lifting. An explicit loop like this would be better:

urls = []
for host in master_hosts:
    hostname = host.strip().split(":")[0]
    urls.append(f"{scheme}://{hostname}:{rest_port}")

return urls

You can see that it is much easier to understand the string parsing logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Consider adding deferrable support to SparkSubmitOperator

2 participants