Skip to content

Commit 96dfb6c

Browse files
committed
feat: API pipeline run get has execution summary
1 parent 489c797 commit 96dfb6c

2 files changed

Lines changed: 90 additions & 8 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,19 @@ def create(
130130
session.refresh(pipeline_run)
131131
return PipelineRunResponse.from_db(pipeline_run)
132132

133-
def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse:
133+
def get(
134+
self,
135+
session: orm.Session,
136+
id: bts.IdType,
137+
include_execution_stats: bool = False,
138+
) -> PipelineRunResponse:
134139
pipeline_run = session.get(bts.PipelineRun, id)
135140
if not pipeline_run:
136141
raise ItemNotFoundError(f"Pipeline run {id} not found.")
137-
return PipelineRunResponse.from_db(pipeline_run)
142+
response = PipelineRunResponse.from_db(pipeline_run)
143+
if include_execution_stats:
144+
self._populate_execution_stats(session=session, response=response)
145+
return response
138146

139147
def terminate(
140148
self,
@@ -258,12 +266,7 @@ def create_pipeline_run_response(
258266
pipeline_name = component_spec.name
259267
response.pipeline_name = pipeline_name
260268
if include_execution_stats:
261-
stats, summary = self._get_execution_stats_and_summary(
262-
session=session,
263-
root_execution_id=pipeline_run.root_execution_id,
264-
)
265-
response.execution_status_stats = stats
266-
response.execution_summary = summary
269+
self._populate_execution_stats(session=session, response=response)
267270
return response
268271

269272
return ListPipelineJobsResponse(
@@ -274,6 +277,18 @@ def create_pipeline_run_response(
274277
next_page_token=next_page_token,
275278
)
276279

280+
def _populate_execution_stats(
281+
self,
282+
session: orm.Session,
283+
response: PipelineRunResponse,
284+
) -> None:
285+
stats, summary = self._get_execution_stats_and_summary(
286+
session=session,
287+
root_execution_id=response.root_execution_id,
288+
)
289+
response.execution_status_stats = stats
290+
response.execution_summary = summary
291+
277292
def _get_execution_stats_and_summary(
278293
self,
279294
session: orm.Session,

tests/test_api_server_sql.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pytest
12
from sqlalchemy import orm
23

34
from cloud_pipelines_backend import backend_types_sql as bts
@@ -6,6 +7,7 @@
67
ExecutionStatusSummary,
78
PipelineRunsApiService_Sql,
89
)
10+
from cloud_pipelines_backend.errors import ItemNotFoundError
911

1012

1113
def _initialize_db_and_get_session_factory():
@@ -162,3 +164,68 @@ def test_list_with_execution_stats(self):
162164
assert summary.total_nodes == 2
163165
assert summary.ended_nodes == 1
164166
assert summary.has_ended is False
167+
168+
169+
class TestPipelineRunServiceGet:
170+
def test_get_not_found(self):
171+
session_factory = _initialize_db_and_get_session_factory()
172+
service = PipelineRunsApiService_Sql()
173+
with session_factory() as session:
174+
with pytest.raises(ItemNotFoundError):
175+
service.get(session=session, id="nonexistent-id")
176+
177+
def test_get_returns_pipeline_run(self):
178+
session_factory = _initialize_db_and_get_session_factory()
179+
service = PipelineRunsApiService_Sql()
180+
with session_factory() as session:
181+
root = _create_execution_node(session)
182+
root_id = root.id
183+
run = _create_pipeline_run(session, root, created_by="user1")
184+
run_id = run.id
185+
session.commit()
186+
187+
with session_factory() as session:
188+
result = service.get(session=session, id=run_id)
189+
assert result.id == run_id
190+
assert result.root_execution_id == root_id
191+
assert result.created_by == "user1"
192+
assert result.execution_status_stats is None
193+
assert result.execution_summary is None
194+
195+
def test_get_with_execution_stats(self):
196+
session_factory = _initialize_db_and_get_session_factory()
197+
service = PipelineRunsApiService_Sql()
198+
with session_factory() as session:
199+
root = _create_execution_node(session)
200+
root_id = root.id
201+
child1 = _create_execution_node(
202+
session,
203+
parent=root,
204+
status=bts.ContainerExecutionStatus.SUCCEEDED,
205+
)
206+
child2 = _create_execution_node(
207+
session,
208+
parent=root,
209+
status=bts.ContainerExecutionStatus.RUNNING,
210+
)
211+
_link_ancestor(session, child1, root)
212+
_link_ancestor(session, child2, root)
213+
run = _create_pipeline_run(session, root)
214+
run_id = run.id
215+
session.commit()
216+
217+
with session_factory() as session:
218+
result = service.get(
219+
session=session, id=run_id, include_execution_stats=True
220+
)
221+
assert result.id == run_id
222+
assert result.root_execution_id == root_id
223+
stats = result.execution_status_stats
224+
assert stats is not None
225+
assert stats["SUCCEEDED"] == 1
226+
assert stats["RUNNING"] == 1
227+
summary = result.execution_summary
228+
assert summary is not None
229+
assert summary.total_nodes == 2
230+
assert summary.ended_nodes == 1
231+
assert summary.has_ended is False

0 commit comments

Comments
 (0)