Skip to content

Commit da69ecb

Browse files
authored
Reduce pipeline processing latency via fetch hints and skip flag (#3922)
1 parent 9360c07 commit da69ecb

2 files changed

Lines changed: 22 additions & 1 deletion

File tree

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,12 @@ async def process(self, item: JobRunningPipelineItem):
321321
job_model=context.job_model,
322322
result=result,
323323
)
324+
new_status = result.job_update_map.get("status")
325+
if new_status == JobStatus.PULLING:
326+
self._pipeline_hinter.hint_fetch(JobModel.__name__)
327+
# Hint run pipeline for fast run transition to RUNNING status.
328+
if new_status == JobStatus.RUNNING and context.job_model.run.status != RunStatus.RUNNING:
329+
self._pipeline_hinter.hint_fetch(RunModel.__name__)
324330

325331

326332
@dataclass
@@ -609,7 +615,9 @@ async def _refetch_locked_job_model(
609615
)
610616
.options(joinedload(JobModel.instance).joinedload(InstanceModel.project))
611617
.options(joinedload(JobModel.probes).load_only(ProbeModel.success_streak))
612-
.options(joinedload(JobModel.run).load_only(RunModel.id, RunModel.run_spec))
618+
.options(
619+
joinedload(JobModel.run).load_only(RunModel.id, RunModel.run_spec, RunModel.status)
620+
)
613621
.execution_options(populate_existing=True)
614622
)
615623
return res.unique().scalar_one_or_none()
@@ -981,6 +989,18 @@ async def _apply_process_result(
981989
if result.new_probe_models:
982990
session.add_all(result.new_probe_models)
983991

992+
# Set RunModel.skip_min_processing_interval for fast run transition to RUNNING status.
993+
# Cross-pipeline write is ok: worst case skip_min_processing_interval is overridden.
994+
if (
995+
result.job_update_map.get("status") == JobStatus.RUNNING
996+
and job_model.run.status != RunStatus.RUNNING
997+
):
998+
await session.execute(
999+
update(RunModel)
1000+
.where(RunModel.id == job_model.run_id)
1001+
.values(skip_min_processing_interval=True)
1002+
)
1003+
9841004
_emit_result_events(session=session, job_model=job_model, result=result)
9851005

9861006

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ async def process(self, item: JobSubmittedPipelineItem):
345345
context=context,
346346
assignment=assignment,
347347
)
348+
self._pipeline_hinter.hint_fetch(JobModel.__name__)
348349

349350

350351
@dataclass

0 commit comments

Comments
 (0)