Skip to content

Commit 5169db1

Browse files
committed
feat: Search pipeline name in pipeline run API
1 parent 16c033f commit 5169db1

6 files changed

Lines changed: 747 additions & 97 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from . import backend_types_sql as bts
1111
from . import component_structures as structures
12+
from . import database_ops
1213
from . import errors
1314
from . import filter_query_sql
1415

@@ -109,19 +110,15 @@ def create(
109110
},
110111
)
111112
session.add(pipeline_run)
112-
# Mirror created_by into the annotations table so it's searchable
113-
# via filter_query like any other annotation.
114-
if created_by is not None:
115-
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
116-
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
117-
session.flush()
118-
session.add(
119-
bts.PipelineRunAnnotation(
120-
pipeline_run_id=pipeline_run.id,
121-
key=filter_query_sql.SystemKey.CREATED_BY,
122-
value=created_by,
123-
)
124-
)
113+
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
114+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
115+
session.flush()
116+
_mirror_system_annotations(
117+
session=session,
118+
pipeline_run_id=pipeline_run.id,
119+
created_by=created_by,
120+
pipeline_name=pipeline_name,
121+
)
125122
session.commit()
126123

127124
session.refresh(pipeline_run)
@@ -240,12 +237,9 @@ def _create_pipeline_run_response(
240237
bts.ExecutionNode, pipeline_run.root_execution_id
241238
)
242239
if execution_node:
243-
task_spec = structures.TaskSpec.from_json_dict(
244-
execution_node.task_spec
240+
pipeline_name = database_ops.get_pipeline_name_from_task_spec(
241+
task_spec_dict=execution_node.task_spec
245242
)
246-
component_spec = task_spec.component_ref.spec
247-
if component_spec:
248-
pipeline_name = component_spec.name
249243
response.pipeline_name = pipeline_name
250244
if include_execution_stats:
251245
execution_status_stats = self._calculate_execution_status_stats(
@@ -1151,6 +1145,32 @@ def list_secrets(
11511145
]
11521146

11531147

1148+
def _mirror_system_annotations(
1149+
*,
1150+
session: orm.Session,
1151+
pipeline_run_id: bts.IdType,
1152+
created_by: str | None,
1153+
pipeline_name: str | None,
1154+
) -> None:
1155+
"""Mirror pipeline run fields as system annotations for filter_query search."""
1156+
if created_by:
1157+
session.add(
1158+
bts.PipelineRunAnnotation(
1159+
pipeline_run_id=pipeline_run_id,
1160+
key=filter_query_sql.SystemKey.CREATED_BY,
1161+
value=created_by,
1162+
)
1163+
)
1164+
if pipeline_name:
1165+
session.add(
1166+
bts.PipelineRunAnnotation(
1167+
pipeline_run_id=pipeline_run_id,
1168+
key=filter_query_sql.SystemKey.NAME,
1169+
value=pipeline_name,
1170+
)
1171+
)
1172+
1173+
11541174
def _recursively_create_all_executions_and_artifacts_root(
11551175
session: orm.Session,
11561176
root_task_spec: structures.TaskSpec,

cloud_pipelines_backend/database_ops.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1+
import logging
2+
from typing import Any
3+
14
import sqlalchemy
25
from sqlalchemy import orm
36

47
from . import backend_types_sql as bts
8+
from . import component_structures as structures
59
from . import filter_query_sql
610

11+
logger = logging.getLogger(__name__)
12+
713

814
def create_db_engine_and_migrate_db(
915
database_uri: str,
@@ -87,6 +93,7 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8793
break
8894

8995
backfill_created_by_annotations(db_engine=db_engine)
96+
backfill_pipeline_name_annotations(db_engine=db_engine)
9097

9198

9299
def is_annotation_key_already_backfilled(
@@ -107,6 +114,27 @@ def is_annotation_key_already_backfilled(
107114
).scalar()
108115

109116

117+
def get_pipeline_name_from_task_spec(
118+
*,
119+
task_spec_dict: dict[str, Any],
120+
) -> str | None:
121+
"""Extract pipeline name from a task_spec dict via component_ref.spec.name.
122+
123+
Traversal path:
124+
task_spec_dict -> TaskSpec -> component_ref -> spec -> name
125+
126+
Returns None if any step in the chain is missing or parsing fails.
127+
"""
128+
try:
129+
task_spec = structures.TaskSpec.from_json_dict(task_spec_dict)
130+
except Exception:
131+
return None
132+
spec = task_spec.component_ref.spec
133+
if spec is None:
134+
return None
135+
return spec.name or None
136+
137+
110138
def backfill_created_by_annotations(*, db_engine: sqlalchemy.Engine):
111139
"""Copy pipeline_run.created_by into pipeline_run_annotation so
112140
annotation-based search works for created_by.
@@ -134,3 +162,142 @@ def backfill_created_by_annotations(*, db_engine: sqlalchemy.Engine):
134162
)
135163
session.execute(stmt)
136164
session.commit()
165+
166+
167+
def _backfill_pipeline_names_from_extra_data(*, db_engine: sqlalchemy.Engine):
168+
"""Phase 1: bulk SQL backfill from extra_data['pipeline_name'].
169+
170+
INSERT INTO pipeline_run_annotation
171+
SELECT id, key, json_extract(extra_data, '$.pipeline_name')
172+
FROM pipeline_run
173+
WHERE json_extract(...) IS NOT NULL AND != ''
174+
175+
SQLAlchemy's JSON path extraction is NULL-safe: returns SQL NULL
176+
when extra_data is NULL or the key is absent (no Python error).
177+
"""
178+
with orm.Session(db_engine) as session:
179+
pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string()
180+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
181+
["pipeline_run_id", "key", "value"],
182+
sqlalchemy.select(
183+
bts.PipelineRun.id,
184+
sqlalchemy.literal(filter_query_sql.SystemKey.NAME),
185+
pipeline_name_expr,
186+
).where(
187+
pipeline_name_expr.isnot(None),
188+
pipeline_name_expr != "",
189+
),
190+
)
191+
session.execute(stmt)
192+
session.commit()
193+
194+
195+
def _backfill_pipeline_names_from_component_spec(*, db_engine: sqlalchemy.Engine):
196+
"""Phase 2: Python fallback for runs still missing a name annotation.
197+
198+
Find the "delta" -- runs that still have no name annotation
199+
after Phase 1 -- using a LEFT JOIN anti-join pattern:
200+
201+
SELECT pr.id, pr.root_execution_id
202+
FROM pipeline_run pr
203+
LEFT JOIN pipeline_run_annotation ann
204+
ON ann.pipeline_run_id = pr.id
205+
AND ann.key = 'system/pipeline_run.name'
206+
WHERE ann.pipeline_run_id IS NULL
207+
208+
How the LEFT JOIN works:
209+
210+
pipeline_run pipeline_run_annotation
211+
+----+------------------+ +--------+---------------------------+-------+
212+
| id | root_exec_id | | run_id | key | value |
213+
+----+------------------+ +--------+---------------------------+-------+
214+
| 1 | exec_1 | | 1 | system/pipeline_run.name | foo |
215+
| 2 | exec_2 | | 3 | system/pipeline_run.name | bar |
216+
| 3 | exec_3 | +--------+---------------------------+-------+
217+
| 4 | exec_4 |
218+
+----+------------------+
219+
220+
LEFT JOIN result (ON run_id = id AND key = 'system/pipeline_run.name'):
221+
+----+------------------+------------+-----------+
222+
| id | root_exec_id | ann.run_id | ann.value |
223+
+----+------------------+------------+-----------+
224+
| 1 | exec_1 | 1 | foo | <- matched
225+
| 2 | exec_2 | NULL | NULL | <- no match
226+
| 3 | exec_3 | 3 | bar | <- matched
227+
| 4 | exec_4 | NULL | NULL | <- no match
228+
+----+------------------+------------+-----------+
229+
230+
+ WHERE ann.pipeline_run_id IS NULL -> rows 2, 4 (the delta)
231+
232+
For each delta run, load execution_node.task_spec and extract
233+
the name via:
234+
task_spec_dict -> TaskSpec -> component_ref -> spec -> name
235+
"""
236+
key = filter_query_sql.SystemKey.NAME
237+
ann = bts.PipelineRunAnnotation
238+
with orm.Session(db_engine) as session:
239+
delta_query = (
240+
sqlalchemy.select(
241+
bts.PipelineRun.id,
242+
bts.PipelineRun.root_execution_id,
243+
)
244+
.outerjoin(
245+
ann,
246+
sqlalchemy.and_(
247+
ann.pipeline_run_id == bts.PipelineRun.id,
248+
ann.key == key,
249+
),
250+
)
251+
.where(ann.pipeline_run_id.is_(None))
252+
)
253+
delta_rows = session.execute(delta_query).all()
254+
255+
for run_id, root_execution_id in delta_rows:
256+
execution_node = session.get(bts.ExecutionNode, root_execution_id)
257+
if execution_node is None:
258+
logger.warning(
259+
f"Backfill pipeline run name: run {run_id} has no "
260+
f"execution node (root_execution_id={root_execution_id}), "
261+
"skipping. TODO: consider inserting 'UNKNOWN'?"
262+
)
263+
continue
264+
name = get_pipeline_name_from_task_spec(
265+
task_spec_dict=execution_node.task_spec
266+
)
267+
if name:
268+
session.add(
269+
bts.PipelineRunAnnotation(
270+
pipeline_run_id=run_id, key=key, value=name
271+
)
272+
)
273+
else:
274+
logger.warning(
275+
f"Backfill pipeline run name: run {run_id} has no "
276+
"resolvable pipeline name from task_spec "
277+
f"(root_execution_id={root_execution_id}), "
278+
"skipping. TODO: consider inserting 'UNKNOWN'?"
279+
)
280+
session.commit()
281+
282+
283+
def backfill_pipeline_name_annotations(*, db_engine: sqlalchemy.Engine):
284+
"""Backfill pipeline_run_annotation with pipeline names.
285+
286+
Skips entirely if any name annotation already exists (i.e. the
287+
write-path is populating them, so the backfill has already run or is
288+
no longer needed).
289+
290+
Phase 1 -- _backfill_pipeline_names_from_extra_data:
291+
Bulk SQL insert from extra_data['pipeline_name'].
292+
293+
Phase 2 -- _backfill_pipeline_names_from_component_spec:
294+
Python fallback for runs Phase 1 missed (extra_data is NULL or
295+
missing the key). Resolves name via component_ref.spec.name.
296+
"""
297+
if is_annotation_key_already_backfilled(
298+
db_engine=db_engine, key=filter_query_sql.SystemKey.NAME
299+
):
300+
return
301+
302+
_backfill_pipeline_names_from_extra_data(db_engine=db_engine)
303+
_backfill_pipeline_names_from_component_spec(db_engine=db_engine)

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
from . import filter_query_models
1111

1212
SYSTEM_KEY_PREFIX: Final[str] = "system/"
13+
_PIPELINE_RUN_KEY_PREFIX: Final[str] = f"{SYSTEM_KEY_PREFIX}pipeline_run."
1314

1415

1516
class SystemKey(enum.StrEnum):
16-
CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by"
17+
CREATED_BY = f"{_PIPELINE_RUN_KEY_PREFIX}created_by"
18+
NAME = f"{_PIPELINE_RUN_KEY_PREFIX}name"
1719

1820

1921
SYSTEM_KEY_SUPPORTED_PREDICATES: dict[SystemKey, set[type]] = {
@@ -22,6 +24,12 @@ class SystemKey(enum.StrEnum):
2224
filter_query_models.ValueEqualsPredicate,
2325
filter_query_models.ValueInPredicate,
2426
},
27+
SystemKey.NAME: {
28+
filter_query_models.KeyExistsPredicate,
29+
filter_query_models.ValueEqualsPredicate,
30+
filter_query_models.ValueContainsPredicate,
31+
filter_query_models.ValueInPredicate,
32+
},
2533
}
2634

2735
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)