Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions cluster_api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,43 @@ async def _call(

# --- Properties ---

def track(
self,
job_id: str,
status: JobStatus = JobStatus.PENDING,
) -> JobRecord:
"""Begin tracking a job by ID without re-submitting it.

Use this to seed the executor from a persistent store (e.g. a
database) so subsequent :meth:`poll` and :meth:`cancel` calls can
act on jobs that were submitted by a previous process.

The created :class:`JobRecord` has minimal fields populated; the
next :meth:`poll` will fill in ``exec_host``, ``exit_code``,
``start_time``, etc. from the scheduler. ``name`` and ``command``
are not recoverable from the scheduler in general — set them on
the returned record if you need them.

Replaces any existing record with the same ``job_id``.

Args:
job_id: Scheduler-assigned job ID to track.
status: Initial status. Use the last status known to your
persistent store so :meth:`poll` doesn't transiently
report the job as PENDING.

Returns:
The newly tracked :class:`JobRecord`.
"""
record = JobRecord(
job_id=job_id,
name="",
command="",
status=status,
)
self._jobs[job_id] = record
return record

def remove_job(self, job_id: str) -> None:
"""Remove a job from tracking."""
self._jobs.pop(job_id, None)
Expand Down
4 changes: 2 additions & 2 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "py-cluster-api"
version = "0.5.0"
version = "0.6.0"
description = "Generic Python library for running jobs on HPC clusters"
readme = "README.md"
license = { file = "LICENSE" }
Expand Down
29 changes: 29 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,32 @@ async def test_cancel_all_done(self, default_config, work_dir):

await executor.cancel_all(done=True)
assert job.status == JobStatus.DONE


class TestTrack:

def test_track_default_status(self, default_config):
executor = LocalExecutor(default_config)
record = executor.track("12345")
assert record.job_id == "12345"
assert record.status == JobStatus.PENDING
assert executor.jobs["12345"] is record

def test_track_with_status(self, default_config):
executor = LocalExecutor(default_config)
record = executor.track("99", status=JobStatus.RUNNING)
assert record.status == JobStatus.RUNNING

def test_track_replaces_existing_record(self, default_config):
executor = LocalExecutor(default_config)
first = executor.track("7", status=JobStatus.PENDING)
second = executor.track("7", status=JobStatus.RUNNING)
assert first is not second
assert executor.jobs["7"] is second
assert executor.jobs["7"].status == JobStatus.RUNNING

def test_remove_job_after_track(self, default_config):
executor = LocalExecutor(default_config)
executor.track("42")
executor.remove_job("42")
assert "42" not in executor.jobs