Skip to content

Commit f8b3f75

Browse files
committed
feat: Search pipeline name in pipeline run API
1 parent f8f6e26 commit f8b3f75

6 files changed

Lines changed: 335 additions & 28 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,15 @@ def create(
108108
},
109109
)
110110
session.add(pipeline_run)
111-
# Mirror created_by into the annotations table so it's searchable
112-
# via filter_query like any other annotation.
113-
if created_by is not None:
114-
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
115-
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
116-
session.flush()
117-
session.add(
118-
bts.PipelineRunAnnotation(
119-
pipeline_run_id=pipeline_run.id,
120-
key=filter_query_sql.SystemKey.CREATED_BY,
121-
value=created_by,
122-
)
123-
)
111+
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
112+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
113+
session.flush()
114+
_mirror_system_annotations(
115+
session=session,
116+
pipeline_run_id=pipeline_run.id,
117+
created_by=created_by,
118+
pipeline_name=pipeline_name,
119+
)
124120
session.commit()
125121

126122
session.refresh(pipeline_run)
@@ -1150,6 +1146,32 @@ def list_secrets(
11501146
]
11511147

11521148

1149+
def _mirror_system_annotations(
1150+
*,
1151+
session: orm.Session,
1152+
pipeline_run_id: bts.IdType,
1153+
created_by: str | None,
1154+
pipeline_name: str | None,
1155+
) -> None:
1156+
"""Mirror pipeline run fields as system annotations for filter_query search."""
1157+
if created_by:
1158+
session.add(
1159+
bts.PipelineRunAnnotation(
1160+
pipeline_run_id=pipeline_run_id,
1161+
key=filter_query_sql.SystemKey.CREATED_BY,
1162+
value=created_by,
1163+
)
1164+
)
1165+
if pipeline_name:
1166+
session.add(
1167+
bts.PipelineRunAnnotation(
1168+
pipeline_run_id=pipeline_run_id,
1169+
key=filter_query_sql.SystemKey.NAME,
1170+
value=pipeline_name,
1171+
)
1172+
)
1173+
1174+
11531175
def _recursively_create_all_executions_and_artifacts_root(
11541176
session: orm.Session,
11551177
root_task_spec: structures.TaskSpec,

cloud_pipelines_backend/database_ops.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,38 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8787
break
8888

8989
backfill_created_by_annotations(db_engine)
90+
backfill_pipeline_name_annotations(db_engine)
91+
92+
93+
def backfill_pipeline_name_annotations(db_engine: sqlalchemy.Engine):
94+
"""Copy pipeline_run.extra_data['pipeline_name'] into pipeline_run_annotation
95+
so annotation-based search works for pipeline names.
96+
97+
Idempotent -- skips rows that already have the annotation.
98+
"""
99+
with orm.Session(db_engine) as session:
100+
pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string()
101+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
102+
["pipeline_run_id", "key", "value"],
103+
sqlalchemy.select(
104+
bts.PipelineRun.id,
105+
sqlalchemy.literal(filter_query_sql.SystemKey.NAME),
106+
pipeline_name_expr,
107+
).where(
108+
pipeline_name_expr.isnot(None),
109+
pipeline_name_expr != "",
110+
# NOT EXISTS makes the backfill idempotent
111+
~sqlalchemy.exists(
112+
sqlalchemy.select(bts.PipelineRunAnnotation.pipeline_run_id).where(
113+
bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id,
114+
bts.PipelineRunAnnotation.key
115+
== filter_query_sql.SystemKey.NAME,
116+
)
117+
),
118+
),
119+
)
120+
session.execute(stmt)
121+
session.commit()
90122

91123

92124
def backfill_created_by_annotations(db_engine: sqlalchemy.Engine):
@@ -104,6 +136,7 @@ def backfill_created_by_annotations(db_engine: sqlalchemy.Engine):
104136
bts.PipelineRun.created_by,
105137
).where(
106138
bts.PipelineRun.created_by.isnot(None),
139+
bts.PipelineRun.created_by != "",
107140
# NOT EXISTS makes the backfill idempotent
108141
~sqlalchemy.exists(
109142
sqlalchemy.select(bts.PipelineRunAnnotation.pipeline_run_id).where(

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
from . import filter_query_models
1212

1313
SYSTEM_KEY_PREFIX: Final[str] = "system/"
14+
_PIPELINE_RUN_KEY_PREFIX: Final[str] = f"{SYSTEM_KEY_PREFIX}pipeline_run."
1415

1516

1617
class SystemKey(enum.StrEnum):
17-
CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by"
18+
CREATED_BY = f"{_PIPELINE_RUN_KEY_PREFIX}created_by"
19+
NAME = f"{_PIPELINE_RUN_KEY_PREFIX}name"
1820

1921

2022
SYSTEM_KEY_SUPPORTED_PREDICATES: dict[SystemKey, set[type]] = {
@@ -23,6 +25,12 @@ class SystemKey(enum.StrEnum):
2325
filter_query_models.ValueEqualsPredicate,
2426
filter_query_models.ValueInPredicate,
2527
},
28+
SystemKey.NAME: {
29+
filter_query_models.KeyExistsPredicate,
30+
filter_query_models.ValueEqualsPredicate,
31+
filter_query_models.ValueContainsPredicate,
32+
filter_query_models.ValueInPredicate,
33+
},
2634
}
2735

2836
# ---------------------------------------------------------------------------

tests/test_api_server_sql.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -350,25 +350,86 @@ def test_create_without_created_by(self, session_factory, service):
350350
result = _create_run(session_factory, service, root_task=_make_task_spec())
351351
assert result.created_by is None
352352

353-
def test_create_writes_created_by_annotation(self, session_factory, service):
353+
def test_create_mirrors_name_and_created_by(self, session_factory, service):
354354
run = _create_run(
355355
session_factory,
356356
service,
357-
root_task=_make_task_spec(),
358-
created_by="alice@example.com",
357+
root_task=_make_task_spec("my-pipeline"),
358+
created_by="alice",
359359
)
360360
with session_factory() as session:
361361
annotations = service.list_annotations(session=session, id=run.id)
362-
assert annotations[filter_query_sql.SystemKey.CREATED_BY] == "alice@example.com"
362+
assert annotations[filter_query_sql.SystemKey.NAME] == "my-pipeline"
363+
assert annotations[filter_query_sql.SystemKey.CREATED_BY] == "alice"
363364

364-
def test_create_without_created_by_no_annotation(self, session_factory, service):
365-
run = _create_run(session_factory, service, root_task=_make_task_spec())
365+
def test_create_mirrors_name_only(self, session_factory, service):
366+
run = _create_run(
367+
session_factory,
368+
service,
369+
root_task=_make_task_spec("solo-pipeline"),
370+
)
366371
with session_factory() as session:
367372
annotations = service.list_annotations(session=session, id=run.id)
373+
assert annotations[filter_query_sql.SystemKey.NAME] == "solo-pipeline"
374+
assert filter_query_sql.SystemKey.CREATED_BY not in annotations
375+
376+
def test_create_mirrors_created_by_only(self, session_factory, service):
377+
task_spec = _make_task_spec("placeholder")
378+
task_spec.component_ref.spec.name = None
379+
run = _create_run(
380+
session_factory, service, root_task=task_spec, created_by="alice"
381+
)
382+
with session_factory() as session:
383+
annotations = service.list_annotations(session=session, id=run.id)
384+
assert annotations[filter_query_sql.SystemKey.CREATED_BY] == "alice"
385+
assert filter_query_sql.SystemKey.NAME not in annotations
386+
387+
def test_create_skips_mirror_when_empty_values(self, session_factory, service):
388+
run = _create_run(
389+
session_factory,
390+
service,
391+
root_task=_make_task_spec(""),
392+
created_by="",
393+
)
394+
with session_factory() as session:
395+
annotations = service.list_annotations(session=session, id=run.id)
396+
assert filter_query_sql.SystemKey.NAME not in annotations
397+
assert filter_query_sql.SystemKey.CREATED_BY not in annotations
398+
399+
def test_create_skips_mirror_when_both_absent(self, session_factory, service):
400+
task_spec = _make_task_spec("placeholder")
401+
task_spec.component_ref.spec.name = None
402+
run = _create_run(session_factory, service, root_task=task_spec)
403+
with session_factory() as session:
404+
annotations = service.list_annotations(session=session, id=run.id)
405+
assert filter_query_sql.SystemKey.NAME not in annotations
368406
assert filter_query_sql.SystemKey.CREATED_BY not in annotations
369407

370408

371409
class TestPipelineRunAnnotationCrud:
410+
def test_system_annotations_coexist_with_user_annotations(
411+
self, session_factory, service
412+
):
413+
run = _create_run(
414+
session_factory,
415+
service,
416+
root_task=_make_task_spec("my-pipeline"),
417+
created_by="alice",
418+
)
419+
with session_factory() as session:
420+
service.set_annotation(
421+
session=session,
422+
id=run.id,
423+
key="team",
424+
value="ml-ops",
425+
user_name="alice",
426+
)
427+
with session_factory() as session:
428+
annotations = service.list_annotations(session=session, id=run.id)
429+
assert annotations["team"] == "ml-ops"
430+
assert annotations[filter_query_sql.SystemKey.NAME] == "my-pipeline"
431+
assert annotations[filter_query_sql.SystemKey.CREATED_BY] == "alice"
432+
372433
def test_set_annotation(self, session_factory, service):
373434
run = _create_run(
374435
session_factory,
@@ -441,11 +502,11 @@ def test_delete_annotation(self, session_factory, service):
441502
annotations = service.list_annotations(session=session, id=run.id)
442503
assert "team" not in annotations
443504

444-
def test_list_annotations_empty(self, session_factory, service):
505+
def test_list_annotations_only_system(self, session_factory, service):
445506
run = _create_run(session_factory, service, root_task=_make_task_spec())
446507
with session_factory() as session:
447508
annotations = service.list_annotations(session=session, id=run.id)
448-
assert annotations == {}
509+
assert annotations == {filter_query_sql.SystemKey.NAME: "test-pipeline"}
449510

450511
def test_set_annotation_rejects_system_key(self, session_factory, service):
451512
run = _create_run(

0 commit comments

Comments
 (0)