Skip to content

fix: Multi-Node Jobs ended_at time not being set#184

Open
yuechao-qin wants to merge 1 commit intomasterfrom
ycq/fix-multinode-ended-at
Open

fix: Multi-Node Jobs ended_at time not being set#184
yuechao-qin wants to merge 1 commit intomasterfrom
ycq/fix-multinode-ended-at

Conversation

@yuechao-qin
Copy link
Copy Markdown
Collaborator

@yuechao-qin yuechao-qin commented Mar 21, 2026

Background

Closes https://github.com/Shopify/oasis-frontend/issues/542

Kubernetes Job conditions use "Complete" for successful jobs and "Failed" for failed jobs. The LaunchedKubernetesJob.ended_at property was checking for "Succeeded" instead of "Complete", which meant ended_at was always None for successful K8s Jobs. This caused the API to return no end time for completed executions.

Separately, when the orchestrator hit an internal error processing a running execution (SYSTEM_ERROR), it set the status but never recorded ended_at, making it impossible to tell when the failure occurred.

Fix

  • ended_at now correctly resolves for successful K8s Jobs by matching the "Complete" condition type
  • SYSTEM_ERROR executions now record ended_at so the API shows when the error occurred
  • All terminal state fields (status, ended_at, exit_code, started_at) are set through a single helper to prevent future inconsistencies

Changes

cloud_pipelines_backend/launchers/kubernetes_launchers.py

  • Added JobConditionType and ConditionStatus enums with K8s API doc references, replacing magic strings throughout the file
  • Fixed ended_at to check ("Complete", "Failed") instead of ("Succeeded", "Failed")

cloud_pipelines_backend/orchestrator_sql.py

  • Added _set_terminal_state() helper that sets all terminal fields on a ContainerExecution in one place
  • Refactored SUCCEEDED, FAILED, CANCELLED, and SYSTEM_ERROR branches to use the helper
  • SYSTEM_ERROR branch now sets ended_at (was previously missing)

Test

uv run pytest tests/test_kubernetes_launchers.py tests/test_orchestrator_terminal_state.py

Copy link
Copy Markdown
Collaborator Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

@yuechao-qin yuechao-qin marked this pull request as ready for review March 21, 2026 03:47
@yuechao-qin yuechao-qin requested a review from Ark-kun as a code owner March 21, 2026 03:47
_MULTI_NODE_NODE_INDEX_ENV_VAR_NAME = "_TANGLE_MULTI_NODE_NODE_INDEX"


class JobConditionType(str, enum.Enum):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this private.

FAILURE_TARGET = "FailureTarget"


class ConditionStatus(str, enum.Enum):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this private.

return datetime.datetime.now(tz=datetime.timezone.utc)


def _set_terminal_state(
Copy link
Copy Markdown
Contributor

@Ark-kun Ark-kun Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this function adds any value. It might also be adding confusion as it's not obvious (from the sall site) what exactly it is doing. Let's revert it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I messed up on this function. I meant to make the ended_at also required, not optional. So that it's clear for any terminal state transitions it requires 1) status and 2) ended time changes.

Do you think this function is helpful then?

Else, do you have any suggestions on how to 1) make it clear to the developers where the terminal transitions are for a job and 2) what required changes are needed when transitioning to a terminal state?

Copy link
Copy Markdown
Contributor

Ark-kun commented Mar 25, 2026

Nit: This looks like a fix combined with a refactoring. This increases the diff and makes it harder to see what has changed and what hasn't. The fix changes are hidden among refactoring changes.


with (
mock.patch(
"cloud_pipelines_backend.orchestrator_sql._get_current_time",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we make this a real obejct, not a string? This would be more robust if something is renamed.

info.is_dir = False
info.total_size = 10
info.hashes = {"md5": "abc123"}
storage.make_uri.return_value.get_reader.return_value.get_info.return_value = (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks brittle. It might be easier to creake a mock storage provider (or a mocked GCS storage provider) and give it to the launcher/orchestrator.

return datetime.datetime(2026, 3, 20, hour, 0, tzinfo=datetime.timezone.utc)


def _make_execution_node(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_make_mock_execution_node?

return lc


def _make_mock_session() -> mock.MagicMock:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be easier to work with a real in-memory Sqlite DB session than mock it. That would also allow testing more end-to-end with less mocking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants