Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ To install Skypilot with optional features, use one of the following commands:

You can also manually install Skypilot from https://skypilot.readthedocs.io/en/latest/getting-started/installation.html

If using DGX Cloud Lepton, use the following command to install the Lepton CLI:
If using DGX Cloud Lepton, install NeMo Run with the Lepton extra:

```bash
pip install leptonai
pip install "nemo_run[lepton]"
```

To authenticate with the DGX Cloud Lepton cluster, navigate to the **Settings > Tokens** page in the DGX Cloud Lepton UI and copy the ``lep login`` command shown on the page and run it in the terminal.
Expand Down
10 changes: 9 additions & 1 deletion nemo_run/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from nemo_run.core.execution.docker import DockerExecutor
from nemo_run.core.execution.kubeflow import KubeflowExecutor
from nemo_run.core.execution.launcher import FaultTolerance, SlurmRay, SlurmTemplate, Torchrun
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.core.execution.local import LocalExecutor
from nemo_run.core.execution.skypilot import SkypilotExecutor
from nemo_run.core.execution.slurm import SlurmExecutor
Expand All @@ -40,6 +39,15 @@
from nemo_run.run.experiment import Experiment
from nemo_run.run.plugin import ExperimentPlugin as Plugin


def __getattr__(name: str):
if name == "LeptonExecutor":
from nemo_run.core.execution.lepton import LeptonExecutor

return LeptonExecutor
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


__all__ = [
"autoconvert",
"cli",
Expand Down
10 changes: 9 additions & 1 deletion nemo_run/core/execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
# limitations under the License.

from nemo_run.core.execution.dgxcloud import DGXCloudExecutor
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.core.execution.local import LocalExecutor
from nemo_run.core.execution.kubeflow import KubeflowExecutor
from nemo_run.core.execution.skypilot import SkypilotExecutor
from nemo_run.core.execution.slurm import SlurmExecutor


def __getattr__(name: str):
if name == "LeptonExecutor":
from nemo_run.core.execution.lepton import LeptonExecutor

return LeptonExecutor
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


__all__ = [
"LocalExecutor",
"SlurmExecutor",
Expand Down
84 changes: 67 additions & 17 deletions nemo_run/core/execution/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import base64
import logging
import os
Expand All @@ -22,36 +24,80 @@
import time
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, List, Optional, Set, Type

from invoke.context import Context
from leptonai.api.v2.client import APIClient
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.common import Metadata, LeptonVisibility
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import (
EnvVar,
EnvValue,
LeptonContainer,
Mount,
)
from leptonai.api.v1.types.job import (
LeptonJob,
LeptonJobState,
LeptonJobUserSpec,
ReservationConfig,
)
from leptonai.api.v1.types.replica import Replica

from nemo_run.config import get_nemorun_home
from nemo_run.core.execution.base import Executor, ExecutorMacros
from nemo_run.core.packaging.base import Packager
from nemo_run.core.packaging.git import GitArchivePackager

_LEPTON_IMPORT_ERROR: ImportError | None = None
_LEPTON_AVAILABLE = False

try:
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import (
EnvVar,
EnvValue,
LeptonContainer,
Mount,
)
from leptonai.api.v1.types.job import (
LeptonJob,
LeptonJobState,
LeptonJobUserSpec,
ReservationConfig,
)
from leptonai.api.v1.types.replica import Replica
from leptonai.api.v2.client import APIClient

_LEPTON_AVAILABLE = True
except ImportError as e:
_LEPTON_IMPORT_ERROR = e

class LeptonJobState(Enum):
Starting = "Starting"
Running = "Running"
Failed = "Failed"
Completed = "Completed"
Deleting = "Deleting"
Restarting = "Restarting"
Archived = "Archived"
Stopped = "Stopped"
Stopping = "Stopping"
Unknown = "Unknown"

APIClient = None
DedicatedNodeGroup = None
EnvValue = None
EnvVar = None
LeptonContainer = None
LeptonJob = None
LeptonJobUserSpec = None
LeptonResourceAffinity = None
LeptonVisibility = None
Metadata = None
Mount = None
Replica = None
ReservationConfig = None

logger = logging.getLogger(__name__)


def _require_leptonai() -> None:
if not _LEPTON_AVAILABLE:
raise ImportError(
"leptonai package is required for LeptonExecutor. "
'Install it with: pip install "nemo_run[lepton]"'
) from _LEPTON_IMPORT_ERROR


@dataclass(kw_only=True)
class LeptonExecutor(Executor):
"""
Expand Down Expand Up @@ -84,6 +130,9 @@ class LeptonExecutor(Executor):
head_resource_shape: Optional[str] = "" # Only used for LeptonRayCluster
ray_version: Optional[str] = None # Only used for LeptonRayCluster

def __post_init__(self) -> None:
_require_leptonai()

def stop_job(self, job_id: str):
"""
Send a stop signal to the requested job
Expand Down Expand Up @@ -376,6 +425,7 @@ def cancel(self, job_id: str):

@classmethod
def logs(cls: Type["LeptonExecutor"], app_id: str, fallback_path: Optional[str]):
_require_leptonai()
client = APIClient()

# Get the first replica from the job which contains the job logs
Expand Down
17 changes: 14 additions & 3 deletions nemo_run/run/ray/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@
from typing import Optional, Type

from nemo_run.core.execution.base import Executor
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.core.execution.slurm import SlurmExecutor
from nemo_run.core.frontend.console.api import configure_logging
from nemo_run.run.ray.lepton import LeptonRayCluster
from nemo_run.run.ray.slurm import SlurmRayCluster

# Import guard for Lepton dependencies
try:
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.run.ray.lepton import LeptonRayCluster

_LEPTON_RAY_AVAILABLE = True
except ImportError:
LeptonExecutor = None
LeptonRayCluster = None
_LEPTON_RAY_AVAILABLE = False

# Import guard for Kubernetes dependencies
try:
from nemo_run.core.execution.kuberay import KubeRayExecutor
Expand All @@ -45,9 +54,11 @@ def __post_init__(self):
configure_logging(level=self.log_level)
backend_map: dict[Type[Executor], Type] = {
SlurmExecutor: SlurmRayCluster,
LeptonExecutor: LeptonRayCluster,
}

if _LEPTON_RAY_AVAILABLE and LeptonExecutor is not None and LeptonRayCluster is not None:
backend_map[LeptonExecutor] = LeptonRayCluster

if _KUBERAY_AVAILABLE and KubeRayExecutor is not None and KubeRayCluster is not None:
backend_map[KubeRayExecutor] = KubeRayCluster

Expand Down
19 changes: 15 additions & 4 deletions nemo_run/run/ray/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@
from typing import Any, Optional, Type

from nemo_run.core.execution.base import Executor
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.core.execution.slurm import SlurmExecutor
from nemo_run.core.frontend.console.api import configure_logging
from nemo_run.run.ray.lepton import LeptonRayJob
from nemo_run.run.ray.slurm import SlurmRayJob

# Import guard for Lepton dependencies
try:
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.run.ray.lepton import LeptonRayJob

_LEPTON_RAY_AVAILABLE = True
except ImportError:
LeptonExecutor = None
LeptonRayJob = None
_LEPTON_RAY_AVAILABLE = False

# Import guard for Kubernetes dependencies
try:
from nemo_run.core.execution.kuberay import KubeRayExecutor
Expand All @@ -49,10 +58,12 @@ class RayJob:
def __post_init__(self) -> None: # noqa: D401 – simple implementation
configure_logging(level=self.log_level)
backend_map: dict[Type[Executor], Type[Any]] = {
LeptonExecutor: LeptonRayJob,
SlurmExecutor: SlurmRayJob,
}

if _LEPTON_RAY_AVAILABLE and LeptonExecutor is not None and LeptonRayJob is not None:
backend_map[LeptonExecutor] = LeptonRayJob

if _KUBERAY_AVAILABLE and KubeRayExecutor is not None and KubeRayJob is not None:
backend_map[KubeRayExecutor] = KubeRayJob

Expand All @@ -62,7 +73,7 @@ def __post_init__(self) -> None: # noqa: D401 – simple implementation
backend_cls = backend_map[self.executor.__class__]
self.backend = backend_cls(name=self.name, executor=self.executor)

if isinstance(self.executor, LeptonExecutor):
if LeptonExecutor is not None and isinstance(self.executor, LeptonExecutor):
self.backend.cluster_name = self.cluster_name
self.backend.cluster_ready_timeout = self.cluster_ready_timeout

Expand Down
73 changes: 56 additions & 17 deletions nemo_run/run/ray/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,57 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import asyncio
import json
import logging
import sys
import time
import urllib3
import warnings
from dataclasses import dataclass
from ray.job_submission import JobSubmissionClient
from rich.pretty import pretty_repr
from typing import Any, Optional, TypeAlias

from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import EnvVar, EnvValue

from nemo_run.core.execution.lepton import LeptonExecutor
import urllib3
from rich.pretty import pretty_repr

from leptonai.api.v2.client import APIClient
from leptonai.api.v1.types.raycluster import (
LeptonRayCluster as LeptonRayClusterSpec,
LeptonRayClusterUserSpec,
Metadata,
RayHeadGroupSpec,
RayWorkerGroupSpec,
)
from leptonai.cli.raycluster import DEFAULT_RAY_IMAGE
from nemo_run.core.execution.lepton import LeptonExecutor, _require_leptonai

_RAY_IMPORT_ERROR: ImportError | None = None
_RAY_AVAILABLE = False
try:
from ray.job_submission import JobSubmissionClient

_RAY_AVAILABLE = True
except ImportError as e:
_RAY_IMPORT_ERROR = e
JobSubmissionClient = None

try:
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import EnvVar, EnvValue
from leptonai.api.v1.types.raycluster import (
LeptonRayCluster as LeptonRayClusterSpec,
LeptonRayClusterUserSpec,
Metadata,
RayHeadGroupSpec,
RayWorkerGroupSpec,
)
from leptonai.api.v2.client import APIClient
from leptonai.cli.raycluster import DEFAULT_RAY_IMAGE
except ImportError:
APIClient = None
DEFAULT_RAY_IMAGE = None
DedicatedNodeGroup = None
EnvValue = None
EnvVar = None
LeptonRayClusterSpec = None
LeptonRayClusterUserSpec = None
LeptonResourceAffinity = None
Metadata = None
RayHeadGroupSpec = None
RayWorkerGroupSpec = None

noquote: TypeAlias = str

Expand All @@ -49,6 +73,19 @@
RAY_NOT_READY_STATE = "Not Ready"


def _require_ray() -> None:
if not _RAY_AVAILABLE:
raise ImportError(
"ray is required for Lepton Ray helpers. "
'Install it with: pip install "nemo_run[lepton]"'
) from _RAY_IMPORT_ERROR


def _require_lepton_ray() -> None:
_require_leptonai()
_require_ray()


@dataclass(kw_only=True)
class LeptonRayCluster:
EXECUTOR_CLS = LeptonExecutor
Expand All @@ -57,6 +94,7 @@ class LeptonRayCluster:
executor: LeptonExecutor

def __post_init__(self):
_require_lepton_ray()
self.cluster_map: dict[str, str] = {}

def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
Expand Down Expand Up @@ -374,6 +412,7 @@ class LeptonRayJob:
# Internals
# ---------------------------------------------------------------------
def __post_init__(self):
_require_lepton_ray()
self.submission_id = None

def _get_last_submission_id(self) -> Optional[int]:
Expand Down
Loading