Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions cloud_pipelines_backend/launchers/kubernetes_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import copy
import datetime
import enum
import json
import logging
import os
Expand Down Expand Up @@ -67,6 +68,34 @@
_MULTI_NODE_NODE_INDEX_ENV_VAR_NAME = "_TANGLE_MULTI_NODE_NODE_INDEX"


class _JobConditionType(str, enum.Enum):
"""Kubernetes Job condition types.

A Job is considered finished when it is in a terminal condition,
either "Complete" or "Failed".

Reference: https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/job-v1/
See: `A job is considered finished when it is in a terminal condition, either "Complete" or "Failed".`
"""

COMPLETE = "Complete"
FAILED = "Failed"
SUSPENDED = "Suspended"
FAILURE_TARGET = "FailureTarget"


class _ConditionStatus(str, enum.Enum):
"""Kubernetes condition status values.

Reference: https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/job-v1/
See: `Status of the condition, one of True, False, Unknown.`
"""

TRUE = "True"
FALSE = "False"
UNKNOWN = "Unknown"


_T = typing.TypeVar("_T")

_CONTAINER_FILE_NAME = "data"
Expand Down Expand Up @@ -1265,11 +1294,13 @@ def status(self) -> interfaces.ContainerStatus:
if not job_status:
return interfaces.ContainerStatus.PENDING
has_succeeded_condition = any(
condition.type == "Complete" and condition.status == "True"
condition.type == _JobConditionType.COMPLETE
and condition.status == _ConditionStatus.TRUE
for condition in job_status.conditions or []
)
has_failed_condition = any(
condition.type == "Failed" and condition.status == "True"
condition.type == _JobConditionType.FAILED
and condition.status == _ConditionStatus.TRUE
for condition in job_status.conditions or []
)
if has_failed_condition:
Expand Down Expand Up @@ -1338,13 +1369,19 @@ def started_at(self) -> datetime.datetime | None:

@property
def ended_at(self) -> datetime.datetime | None:
"""Return the time when the Job entered a terminal condition.

A Job is considered finished when it has a "Complete" or "Failed"
condition with status "True".
"""
job_status = self._debug_job.status
if not job_status:
return None
ended_condition_times = [
condition.last_transition_time
for condition in job_status.conditions or []
if condition.type in ("Succeeded", "Failed") and condition.status == "True"
if condition.type in (_JobConditionType.COMPLETE, _JobConditionType.FAILED)
and condition.status == _ConditionStatus.TRUE
]
if not ended_condition_times:
return None
Expand Down
57 changes: 45 additions & 12 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ def internal_process_running_executions_queue(self, session: orm.Session):
except Exception as ex:
_logger.exception("Error processing running container execution")
session.rollback()
running_container_execution.status = (
bts.ContainerExecutionStatus.SYSTEM_ERROR
_record_terminal_state(
container_execution=running_container_execution,
status=bts.ContainerExecutionStatus.SYSTEM_ERROR,
ended_at=_get_current_time(),
)
# Doing an intermediate commit here because it's most important to mark the problematic execution as SYSTEM_ERROR.
session.commit()
Expand Down Expand Up @@ -684,9 +686,12 @@ def internal_process_one_running_execution(
# Requesting container termination.
# Termination might not happen immediately (e.g. Kubernetes has grace period).
launched_container.terminate()
container_execution.ended_at = _get_current_time()
# We need to mark the execution as CANCELLED otherwise orchestrator will continue polling it.
container_execution.status = bts.ContainerExecutionStatus.CANCELLED
_record_terminal_state(
container_execution=container_execution,
status=bts.ContainerExecutionStatus.CANCELLED,
ended_at=_get_current_time(),
)
terminated = True

# Mark the execution nodes as cancelled only after the launched container is successfully terminated (if needed)
Expand Down Expand Up @@ -746,10 +751,13 @@ def internal_process_one_running_execution(
bts.ContainerExecutionStatus.RUNNING
)
elif new_status == launcher_interfaces.ContainerStatus.SUCCEEDED:
container_execution.status = bts.ContainerExecutionStatus.SUCCEEDED
container_execution.exit_code = reloaded_launched_container.exit_code
container_execution.started_at = reloaded_launched_container.started_at
container_execution.ended_at = reloaded_launched_container.ended_at
_record_terminal_state(
container_execution=container_execution,
status=bts.ContainerExecutionStatus.SUCCEEDED,
exit_code=reloaded_launched_container.exit_code,
started_at=reloaded_launched_container.started_at,
ended_at=reloaded_launched_container.ended_at,
)

# Don't fail the execution if log upload fails.
# Logs are important, but not so important that we should fail a successfully completed container execution.
Expand Down Expand Up @@ -881,10 +889,13 @@ def _maybe_preload_value(
bts.ContainerExecutionStatus.QUEUED
)
elif new_status == launcher_interfaces.ContainerStatus.FAILED:
container_execution.status = bts.ContainerExecutionStatus.FAILED
container_execution.exit_code = reloaded_launched_container.exit_code
container_execution.started_at = reloaded_launched_container.started_at
container_execution.ended_at = reloaded_launched_container.ended_at
_record_terminal_state(
container_execution=container_execution,
status=bts.ContainerExecutionStatus.FAILED,
exit_code=reloaded_launched_container.exit_code,
started_at=reloaded_launched_container.started_at,
ended_at=reloaded_launched_container.ended_at,
)
launcher_error = reloaded_launched_container.launcher_error_message
if launcher_error:
orchestration_error_message = f"Launcher error: {launcher_error}"
Expand Down Expand Up @@ -1010,6 +1021,28 @@ def _get_current_time() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)


def _record_terminal_state(
*,
container_execution: bts.ContainerExecution,
status: bts.ContainerExecutionStatus,
ended_at: datetime.datetime,
exit_code: int | None = None,
started_at: datetime.datetime | None = None,
) -> None:
"""Record terminal state fields on a container execution.

A terminal state must minimally include a status change and an end time.
exit_code and started_at are optional — they depend on whether the
launcher was able to report them before the execution ended.
"""
container_execution.status = status
container_execution.ended_at = ended_at
if exit_code is not None:
container_execution.exit_code = exit_code
if started_at is not None:
container_execution.started_at = started_at


def _generate_random_id() -> str:
import os
import time
Expand Down
159 changes: 159 additions & 0 deletions tests/test_kubernetes_launchers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Tests for LaunchedKubernetesJob.ended_at — the property fixed to use
"Complete" instead of the incorrect "Succeeded" K8s Job condition type.
"""

from __future__ import annotations

import datetime
from typing import Any
from unittest import mock

from cloud_pipelines_backend.launchers import kubernetes_launchers as kl


def _utc(
*,
year: int = 2026,
month: int = 3,
day: int = 20,
hour: int = 12,
minute: int = 0,
) -> datetime.datetime:
return datetime.datetime(
year, month, day, hour, minute, tzinfo=datetime.timezone.utc
)


def _make_condition(
*,
type: str,
status: str = "True",
last_transition_time: datetime.datetime | None = None,
) -> mock.Mock:
c = mock.Mock()
c.type = type
c.status = status
c.last_transition_time = last_transition_time or _utc()
return c


def _make_job(
*,
conditions: list[Any] | None = None,
active: int | None = None,
succeeded: int | None = None,
failed: int | None = None,
start_time: datetime.datetime | None = None,
completions: int | None = 1,
) -> mock.Mock:
job = mock.Mock()
job.status = mock.Mock()
job.status.conditions = conditions
job.status.active = active
job.status.succeeded = succeeded
job.status.failed = failed
job.status.start_time = start_time
job.spec = mock.Mock()
job.spec.completions = completions
return job


def _make_launched_job(
*,
job: mock.Mock | None = None,
) -> kl.LaunchedKubernetesJob:
if job is None:
job = _make_job()
return kl.LaunchedKubernetesJob(
job_name="test-job",
namespace="default",
output_uris={},
log_uri="gs://bucket/log",
debug_job=job,
)


class TestEndedAt:
"""Tests for LaunchedKubernetesJob.ended_at.

This property reads job.status.conditions and returns the
last_transition_time of the first terminal condition (Complete or Failed)
with status=True.

Code under test: kubernetes_launchers.py LaunchedKubernetesJob.ended_at
"""

def test_returns_none_when_no_status(self) -> None:
job = mock.Mock()
job.status = None
launched = _make_launched_job(job=job)
assert launched.ended_at is None

def test_returns_none_when_no_conditions(self) -> None:
launched = _make_launched_job(job=_make_job(conditions=None))
assert launched.ended_at is None

def test_returns_none_when_empty_conditions(self) -> None:
launched = _make_launched_job(job=_make_job(conditions=[]))
assert launched.ended_at is None

def test_returns_none_when_only_suspended_condition(self) -> None:
"""A Suspended=True condition is not terminal — ended_at stays None."""
condition = _make_condition(type="Suspended", status="True")
launched = _make_launched_job(job=_make_job(conditions=[condition]))
assert launched.ended_at is None

def test_returns_time_for_complete_condition(self) -> None:
"""Job finished successfully: condition type=Complete, status=True."""
t = _utc(hour=14)
condition = _make_condition(
type="Complete", status="True", last_transition_time=t
)
launched = _make_launched_job(job=_make_job(conditions=[condition]))
assert launched.ended_at == t

def test_returns_time_for_failed_condition(self) -> None:
"""Job failed: condition type=Failed, status=True."""
t = _utc(hour=15)
condition = _make_condition(
type="Failed", status="True", last_transition_time=t
)
launched = _make_launched_job(job=_make_job(conditions=[condition]))
assert launched.ended_at == t

def test_ignores_complete_condition_with_status_false(self) -> None:
condition = _make_condition(type="Complete", status="False")
launched = _make_launched_job(job=_make_job(conditions=[condition]))
assert launched.ended_at is None

def test_ignores_failed_condition_with_status_unknown(self) -> None:
condition = _make_condition(type="Failed", status="Unknown")
launched = _make_launched_job(job=_make_job(conditions=[condition]))
assert launched.ended_at is None

def test_does_not_match_succeeded_string(self) -> None:
"""Regression: 'Succeeded' is not a valid K8s Job condition type.
The old code had condition.type in ("Succeeded", "Failed") which
caused ended_at to always be None for successful jobs.
"""
condition = _make_condition(type="Succeeded", status="True")
launched = _make_launched_job(job=_make_job(conditions=[condition]))
assert launched.ended_at is None

def test_picks_terminal_condition_ignoring_suspended(self) -> None:
"""Real scenario: a job was suspended then resumed and completed.
Conditions list has Suspended=True followed by Complete=True.
ended_at should come from the Complete condition.
"""
t_suspended = _utc(hour=10)
t_complete = _utc(hour=14)
conditions = [
_make_condition(
type="Suspended", status="True", last_transition_time=t_suspended
),
_make_condition(
type="Complete", status="True", last_transition_time=t_complete
),
]
launched = _make_launched_job(job=_make_job(conditions=conditions))
assert launched.ended_at == t_complete
Loading
Loading