Skip to content

Commit 6727474

Browse files
committed
test: Add tests for GetGraphExecution API
1 parent 034f57b commit 6727474

2 files changed

Lines changed: 308 additions & 1 deletion

File tree

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
import pytest
2+
from sqlalchemy import orm
3+
4+
from cloud_pipelines_backend import backend_types_sql as bts
5+
from cloud_pipelines_backend import database_ops
6+
from cloud_pipelines_backend.api_server_sql import (
7+
ExecutionNodesApiService_Sql,
8+
GetGraphExecutionStateResponse,
9+
)
10+
11+
12+
def _initialize_db_and_get_session_factory():
13+
db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://")
14+
return lambda: orm.Session(bind=db_engine)
15+
16+
17+
def _make_execution_node(
18+
task_spec: dict | None = None,
19+
task_id_in_parent: str | None = None,
20+
container_execution_status: bts.ContainerExecutionStatus | None = None,
21+
) -> bts.ExecutionNode:
22+
return bts.ExecutionNode(
23+
task_spec=task_spec or {},
24+
task_id_in_parent_execution=task_id_in_parent,
25+
container_execution_status=container_execution_status,
26+
)
27+
28+
29+
def _add_child_node(
30+
session: orm.Session,
31+
parent_id: str,
32+
task_spec: dict | None = None,
33+
task_id_in_parent: str | None = None,
34+
container_execution_status: bts.ContainerExecutionStatus | None = None,
35+
) -> bts.ExecutionNode:
36+
"""Create a child execution node, add to session, and set parent_execution_id.
37+
38+
parent_execution_id is init=False in the MappedAsDataclass model,
39+
so it must be set after the object is added and flushed.
40+
"""
41+
node = _make_execution_node(
42+
task_spec=task_spec,
43+
task_id_in_parent=task_id_in_parent,
44+
container_execution_status=container_execution_status,
45+
)
46+
session.add(node)
47+
session.flush()
48+
node.parent_execution_id = parent_id
49+
session.flush()
50+
return node
51+
52+
53+
class TestGetGraphExecutionState:
54+
"""Tests for ExecutionNodesApiService_Sql.get_graph_execution_state"""
55+
56+
def setup_method(self):
57+
self.session_factory = _initialize_db_and_get_session_factory()
58+
self.service = ExecutionNodesApiService_Sql()
59+
60+
def test_no_children_returns_empty_stats(self):
61+
"""A graph node with no children returns empty stats."""
62+
with self.session_factory() as session:
63+
parent = _make_execution_node(task_spec={"name": "parent_graph"})
64+
session.add(parent)
65+
session.flush()
66+
67+
result = self.service.get_graph_execution_state(session, parent.id)
68+
69+
assert isinstance(result, GetGraphExecutionStateResponse)
70+
assert result.child_execution_status_stats == {}
71+
72+
def test_children_with_no_status_are_excluded(self):
73+
"""Children whose container_execution_status is None are not counted."""
74+
with self.session_factory() as session:
75+
parent = _make_execution_node(task_spec={"name": "graph"})
76+
session.add(parent)
77+
session.flush()
78+
79+
_add_child_node(
80+
session,
81+
parent.id,
82+
task_spec={"name": "task_pending"},
83+
task_id_in_parent="pending_task",
84+
container_execution_status=None,
85+
)
86+
87+
result = self.service.get_graph_execution_state(session, parent.id)
88+
89+
assert result.child_execution_status_stats == {}
90+
91+
def test_direct_container_children(self):
92+
"""Children that are direct container nodes (no descendants via ancestor links)."""
93+
with self.session_factory() as session:
94+
parent = _make_execution_node(task_spec={"name": "graph"})
95+
session.add(parent)
96+
session.flush()
97+
98+
child1 = _add_child_node(
99+
session,
100+
parent.id,
101+
task_spec={"name": "task1"},
102+
task_id_in_parent="task1",
103+
container_execution_status=bts.ContainerExecutionStatus.SUCCEEDED,
104+
)
105+
child2 = _add_child_node(
106+
session,
107+
parent.id,
108+
task_spec={"name": "task2"},
109+
task_id_in_parent="task2",
110+
container_execution_status=bts.ContainerExecutionStatus.RUNNING,
111+
)
112+
113+
result = self.service.get_graph_execution_state(session, parent.id)
114+
115+
stats = result.child_execution_status_stats
116+
assert child1.id in stats
117+
assert stats[child1.id] == {"SUCCEEDED": 1}
118+
assert child2.id in stats
119+
assert stats[child2.id] == {"RUNNING": 1}
120+
121+
def test_three_level_mixed_stats(self):
122+
"""3-level deep graph with direct tasks and nested sub-graphs.
123+
124+
Tree structure:
125+
root (graph)
126+
├── task_1 (SUCCEEDED)
127+
├── task_2 (FAILED)
128+
├── sub_graph_a (graph)
129+
│ ├── task_sg_a_1 (CANCELLED)
130+
│ ├── task_sg_a_2 (SKIPPED)
131+
│ ├── task_sg_a_3 (RUNNING)
132+
│ └── sub_graph_a_b (graph)
133+
│ ├── task_sg_a_b_1 (INVALID)
134+
│ ├── task_sg_a_b_2 (SYSTEM_ERROR)
135+
│ └── task_sg_a_b_3 (SUCCEEDED)
136+
└── task_3 (QUEUED)
137+
138+
Ended statuses used across the 3 levels:
139+
Level 1 (root children): SUCCEEDED, FAILED
140+
Level 2 (sg_a children): CANCELLED, SKIPPED
141+
Level 3 (sg_a_b children): INVALID, SYSTEM_ERROR, SUCCEEDED
142+
"""
143+
with self.session_factory() as session:
144+
# -- Level 0: root --
145+
root = _make_execution_node(task_spec={"name": "root"})
146+
session.add(root)
147+
session.flush()
148+
149+
# -- Level 1: direct children of root --
150+
task_1 = _add_child_node(
151+
session,
152+
root.id,
153+
task_spec={"name": "task_1"},
154+
task_id_in_parent="task_1",
155+
container_execution_status=bts.ContainerExecutionStatus.SUCCEEDED,
156+
)
157+
task_2 = _add_child_node(
158+
session,
159+
root.id,
160+
task_spec={"name": "task_2"},
161+
task_id_in_parent="task_2",
162+
container_execution_status=bts.ContainerExecutionStatus.FAILED,
163+
)
164+
sub_graph_a = _add_child_node(
165+
session,
166+
root.id,
167+
task_spec={"name": "sub_graph_a"},
168+
task_id_in_parent="sub_graph_a",
169+
container_execution_status=None,
170+
)
171+
task_3 = _add_child_node(
172+
session,
173+
root.id,
174+
task_spec={"name": "task_3"},
175+
task_id_in_parent="task_3",
176+
container_execution_status=bts.ContainerExecutionStatus.QUEUED,
177+
)
178+
179+
# -- Level 2: children of sub_graph_a --
180+
task_sg_a_1 = _add_child_node(
181+
session,
182+
sub_graph_a.id,
183+
task_spec={"name": "task_sg_a_1"},
184+
task_id_in_parent="task_sg_a_1",
185+
container_execution_status=bts.ContainerExecutionStatus.CANCELLED,
186+
)
187+
task_sg_a_2 = _add_child_node(
188+
session,
189+
sub_graph_a.id,
190+
task_spec={"name": "task_sg_a_2"},
191+
task_id_in_parent="task_sg_a_2",
192+
container_execution_status=bts.ContainerExecutionStatus.SKIPPED,
193+
)
194+
task_sg_a_3 = _add_child_node(
195+
session,
196+
sub_graph_a.id,
197+
task_spec={"name": "task_sg_a_3"},
198+
task_id_in_parent="task_sg_a_3",
199+
container_execution_status=bts.ContainerExecutionStatus.RUNNING,
200+
)
201+
sub_graph_a_b = _add_child_node(
202+
session,
203+
sub_graph_a.id,
204+
task_spec={"name": "sub_graph_a_b"},
205+
task_id_in_parent="sub_graph_a_b",
206+
container_execution_status=None,
207+
)
208+
209+
# -- Level 3: children of sub_graph_a_b --
210+
task_sg_a_b_1 = _add_child_node(
211+
session,
212+
sub_graph_a_b.id,
213+
task_spec={"name": "task_sg_a_b_1"},
214+
task_id_in_parent="task_sg_a_b_1",
215+
container_execution_status=bts.ContainerExecutionStatus.INVALID,
216+
)
217+
task_sg_a_b_2 = _add_child_node(
218+
session,
219+
sub_graph_a_b.id,
220+
task_spec={"name": "task_sg_a_b_2"},
221+
task_id_in_parent="task_sg_a_b_2",
222+
container_execution_status=bts.ContainerExecutionStatus.SYSTEM_ERROR,
223+
)
224+
task_sg_a_b_3 = _add_child_node(
225+
session,
226+
sub_graph_a_b.id,
227+
task_spec={"name": "task_sg_a_b_3"},
228+
task_id_in_parent="task_sg_a_b_3",
229+
container_execution_status=bts.ContainerExecutionStatus.SUCCEEDED,
230+
)
231+
232+
# -- Ancestor links (closure table) --
233+
# sub_graph_a is ancestor of: all sg_a children + all sg_a_b children
234+
for node in [
235+
task_sg_a_1,
236+
task_sg_a_2,
237+
task_sg_a_3,
238+
sub_graph_a_b,
239+
task_sg_a_b_1,
240+
task_sg_a_b_2,
241+
task_sg_a_b_3,
242+
]:
243+
session.add(
244+
bts.ExecutionToAncestorExecutionLink(
245+
ancestor_execution=sub_graph_a,
246+
execution=node,
247+
)
248+
)
249+
# sub_graph_a_b is ancestor of: all sg_a_b children
250+
for node in [task_sg_a_b_1, task_sg_a_b_2, task_sg_a_b_3]:
251+
session.add(
252+
bts.ExecutionToAncestorExecutionLink(
253+
ancestor_execution=sub_graph_a_b,
254+
execution=node,
255+
)
256+
)
257+
# root is ancestor of: all nodes below root
258+
for node in [
259+
task_1,
260+
task_2,
261+
sub_graph_a,
262+
task_3,
263+
task_sg_a_1,
264+
task_sg_a_2,
265+
task_sg_a_3,
266+
sub_graph_a_b,
267+
task_sg_a_b_1,
268+
task_sg_a_b_2,
269+
task_sg_a_b_3,
270+
]:
271+
session.add(
272+
bts.ExecutionToAncestorExecutionLink(
273+
ancestor_execution=root,
274+
execution=node,
275+
)
276+
)
277+
session.flush()
278+
279+
result = self.service.get_graph_execution_state(session, root.id)
280+
281+
stats = result.child_execution_status_stats
282+
283+
# -- Direct container children of root (Query 2) --
284+
assert stats[task_1.id] == {"SUCCEEDED": 1}
285+
assert stats[task_2.id] == {"FAILED": 1}
286+
assert stats[task_3.id] == {"QUEUED": 1}
287+
288+
# -- sub_graph_a: aggregates ALL descendants via ancestor links (Query 1) --
289+
# Level 2: CANCELLED(1), SKIPPED(1), RUNNING(1)
290+
# Level 3: INVALID(1), SYSTEM_ERROR(1), SUCCEEDED(1)
291+
# sub_graph_a_b has NULL status so it is excluded from counts
292+
assert stats[sub_graph_a.id] == {
293+
"CANCELLED": 1,
294+
"SKIPPED": 1,
295+
"RUNNING": 1,
296+
"INVALID": 1,
297+
"SYSTEM_ERROR": 1,
298+
"SUCCEEDED": 1,
299+
}
300+
301+
# sub_graph_a_b is NOT a direct child of root, so it does not
302+
# appear as a key in the stats
303+
assert sub_graph_a_b.id not in stats
304+
305+
306+
if __name__ == "__main__":
307+
pytest.main()

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)