diff --git a/cluster_api/core.py b/cluster_api/core.py index 9590c1e..d9eaba1 100644 --- a/cluster_api/core.py +++ b/cluster_api/core.py @@ -380,6 +380,43 @@ async def _call( # --- Properties --- + def track( + self, + job_id: str, + status: JobStatus = JobStatus.PENDING, + ) -> JobRecord: + """Begin tracking a job by ID without re-submitting it. + + Use this to seed the executor from a persistent store (e.g. a + database) so subsequent :meth:`poll` and :meth:`cancel` calls can + act on jobs that were submitted by a previous process. + + The created :class:`JobRecord` has minimal fields populated; the + next :meth:`poll` will fill in ``exec_host``, ``exit_code``, + ``start_time``, etc. from the scheduler. ``name`` and ``command`` + are not recoverable from the scheduler in general — set them on + the returned record if you need them. + + Replaces any existing record with the same ``job_id``. + + Args: + job_id: Scheduler-assigned job ID to track. + status: Initial status. Use the last status known to your + persistent store so :meth:`poll` doesn't transiently + report the job as PENDING. + + Returns: + The newly tracked :class:`JobRecord`. + """ + record = JobRecord( + job_id=job_id, + name="", + command="", + status=status, + ) + self._jobs[job_id] = record + return record + def remove_job(self, job_id: str) -> None: """Remove a job from tracking.""" self._jobs.pop(job_id, None) diff --git a/pixi.lock b/pixi.lock index 8a9579b..b8e4b55 100644 --- a/pixi.lock +++ b/pixi.lock @@ -845,8 +845,8 @@ packages: timestamp: 1764896838868 - pypi: ./ name: py-cluster-api - version: 0.5.0 - sha256: 6a42ec3a63e266d02359e62e4e0fa4d54476083e606b2625beb97828b5de5e60 + version: 0.6.0 + sha256: fe978178d2a54f14773c59d45530950f53cfb1ba925c07d080d4f37c0fc0203e requires_dist: - pyyaml - pytest ; extra == 'test' diff --git a/pyproject.toml b/pyproject.toml index f95831d..5bd432a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "py-cluster-api" -version = "0.5.0" +version = "0.6.0" description = "Generic Python library for running jobs on HPC clusters" readme = "README.md" license = { file = "LICENSE" } diff --git a/tests/test_core.py b/tests/test_core.py index 8996803..f981967 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -215,3 +215,32 @@ async def test_cancel_all_done(self, default_config, work_dir): await executor.cancel_all(done=True) assert job.status == JobStatus.DONE + + +class TestTrack: + + def test_track_default_status(self, default_config): + executor = LocalExecutor(default_config) + record = executor.track("12345") + assert record.job_id == "12345" + assert record.status == JobStatus.PENDING + assert executor.jobs["12345"] is record + + def test_track_with_status(self, default_config): + executor = LocalExecutor(default_config) + record = executor.track("99", status=JobStatus.RUNNING) + assert record.status == JobStatus.RUNNING + + def test_track_replaces_existing_record(self, default_config): + executor = LocalExecutor(default_config) + first = executor.track("7", status=JobStatus.PENDING) + second = executor.track("7", status=JobStatus.RUNNING) + assert first is not second + assert executor.jobs["7"] is second + assert executor.jobs["7"].status == JobStatus.RUNNING + + def test_remove_job_after_track(self, default_config): + executor = LocalExecutor(default_config) + executor.track("42") + executor.remove_job("42") + assert "42" not in executor.jobs