Skip to content

Commit 9e58770

Browse files
committed
feat: Search created by user in pipeline run API
1 parent e29a08e commit 9e58770

6 files changed

Lines changed: 719 additions & 9 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ class ListPipelineJobsResponse:
6868
class PipelineRunsApiService_Sql:
6969
_PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name"
7070
_DEFAULT_PAGE_SIZE: Final[int] = 10
71+
_SYSTEM_KEY_RESERVED_MSG = (
72+
"Annotation keys starting with "
73+
f"{filter_query_sql.SYSTEM_KEY_PREFIX!r} are reserved for system use."
74+
)
7175

7276
def create(
7377
self,
@@ -105,6 +109,19 @@ def create(
105109
},
106110
)
107111
session.add(pipeline_run)
112+
# Mirror created_by into the annotations table so it's searchable
113+
# via filter_query like any other annotation.
114+
if created_by is not None:
115+
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
116+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
117+
session.flush()
118+
session.add(
119+
bts.PipelineRunAnnotation(
120+
pipeline_run_id=pipeline_run.id,
121+
key=filter_query_sql.SystemKey.CREATED_BY,
122+
value=created_by,
123+
)
124+
)
108125
session.commit()
109126

110127
session.refresh(pipeline_run)
@@ -295,6 +312,8 @@ def set_annotation(
295312
user_name: str | None = None,
296313
skip_user_check: bool = False,
297314
):
315+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
316+
raise errors.ApiValidationError(self._SYSTEM_KEY_RESERVED_MSG)
298317
pipeline_run = session.get(bts.PipelineRun, id)
299318
if not pipeline_run:
300319
raise errors.ItemNotFoundError(f"Pipeline run {id} not found.")
@@ -317,6 +336,8 @@ def delete_annotation(
317336
user_name: str | None = None,
318337
skip_user_check: bool = False,
319338
):
339+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
340+
raise errors.ApiValidationError(self._SYSTEM_KEY_RESERVED_MSG)
320341
pipeline_run = session.get(bts.PipelineRun, id)
321342
if not pipeline_run:
322343
raise errors.ItemNotFoundError(f"Pipeline run {id} not found.")

cloud_pipelines_backend/database_ops.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import sqlalchemy
2+
from sqlalchemy import orm
23

34
from . import backend_types_sql as bts
5+
from . import filter_query_sql
46

57

68
def create_db_engine_and_migrate_db(
@@ -83,3 +85,52 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8385
if index.name == bts.PipelineRunAnnotation._IX_ANNOTATION_RUN_ID_KEY_VALUE:
8486
index.create(db_engine, checkfirst=True)
8587
break
88+
89+
backfill_created_by_annotations(db_engine=db_engine)
90+
91+
92+
def is_annotation_key_already_backfilled(
93+
*,
94+
db_engine: sqlalchemy.Engine,
95+
key: str,
96+
) -> bool:
97+
"""Return True if at least one annotation with the given key exists."""
98+
with orm.Session(db_engine) as session:
99+
return session.query(
100+
sqlalchemy.exists(
101+
sqlalchemy.select(sqlalchemy.literal(1))
102+
.select_from(bts.PipelineRunAnnotation)
103+
.where(
104+
bts.PipelineRunAnnotation.key == key,
105+
)
106+
)
107+
).scalar()
108+
109+
110+
def backfill_created_by_annotations(*, db_engine: sqlalchemy.Engine):
111+
"""Copy pipeline_run.created_by into pipeline_run_annotation so
112+
annotation-based search works for created_by.
113+
114+
Skips entirely if any created_by annotation key already exists (i.e. the
115+
write-path is populating them, so the backfill has already run or is
116+
no longer needed).
117+
"""
118+
if is_annotation_key_already_backfilled(
119+
db_engine=db_engine, key=filter_query_sql.SystemKey.CREATED_BY
120+
):
121+
return
122+
123+
with orm.Session(db_engine) as session:
124+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
125+
["pipeline_run_id", "key", "value"],
126+
sqlalchemy.select(
127+
bts.PipelineRun.id,
128+
sqlalchemy.literal(filter_query_sql.SystemKey.CREATED_BY),
129+
bts.PipelineRun.created_by,
130+
).where(
131+
bts.PipelineRun.created_by.isnot(None),
132+
bts.PipelineRun.created_by != "",
133+
),
134+
)
135+
session.execute(stmt)
136+
session.commit()

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 135 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import base64
22
import json
3+
import enum
34
from typing import Any, Final
45

56
import sqlalchemy as sql
@@ -8,6 +9,21 @@
89
from . import errors
910
from . import filter_query_models
1011

12+
SYSTEM_KEY_PREFIX: Final[str] = "system/"
13+
14+
15+
class SystemKey(enum.StrEnum):
16+
CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by"
17+
18+
19+
SYSTEM_KEY_SUPPORTED_PREDICATES: dict[SystemKey, set[type]] = {
20+
SystemKey.CREATED_BY: {
21+
filter_query_models.KeyExistsPredicate,
22+
filter_query_models.ValueEqualsPredicate,
23+
filter_query_models.ValueInPredicate,
24+
},
25+
}
26+
1127
# ---------------------------------------------------------------------------
1228
# Page-token helpers
1329
# ---------------------------------------------------------------------------
@@ -44,6 +60,93 @@ def _resolve_filter_value(
4460
return filter, filter_query, offset
4561

4662

63+
# ---------------------------------------------------------------------------
64+
# SystemKey validation and resolution
65+
# ---------------------------------------------------------------------------
66+
67+
68+
def _get_predicate_key(*, predicate: filter_query_models.Predicate) -> str | None:
69+
"""Extract the annotation key from a leaf predicate, or None for logical operators."""
70+
match predicate:
71+
case filter_query_models.KeyExistsPredicate():
72+
return predicate.key_exists.key
73+
case filter_query_models.ValueEqualsPredicate():
74+
return predicate.value_equals.key
75+
case filter_query_models.ValueContainsPredicate():
76+
return predicate.value_contains.key
77+
case filter_query_models.ValueInPredicate():
78+
return predicate.value_in.key
79+
case _:
80+
return None
81+
82+
83+
def _check_predicate_allowed(*, predicate: filter_query_models.Predicate) -> None:
84+
"""Raise if a system key is used with an unsupported predicate type."""
85+
key = _get_predicate_key(predicate=predicate)
86+
if key is None:
87+
return
88+
89+
try:
90+
system_key = SystemKey(key)
91+
except ValueError:
92+
return
93+
94+
supported = SYSTEM_KEY_SUPPORTED_PREDICATES.get(system_key, set())
95+
if type(predicate) not in supported:
96+
raise errors.ApiValidationError(
97+
f"Predicate {type(predicate).__name__} is not supported "
98+
f"for system key {system_key!r}. "
99+
f"Supported: {[t.__name__ for t in supported]}"
100+
)
101+
102+
103+
def _resolve_system_key_value(
104+
*,
105+
key: str,
106+
value: str,
107+
current_user: str | None,
108+
) -> str:
109+
"""Resolve special placeholder values for system keys."""
110+
if key == SystemKey.CREATED_BY and value == "me":
111+
return current_user if current_user is not None else ""
112+
return value
113+
114+
115+
def _maybe_resolve_system_values(
116+
*,
117+
predicate: filter_query_models.ValueEqualsPredicate,
118+
current_user: str | None,
119+
) -> filter_query_models.ValueEqualsPredicate:
120+
"""Resolve special values in a ValueEqualsPredicate."""
121+
key = predicate.value_equals.key
122+
value = predicate.value_equals.value
123+
resolved = _resolve_system_key_value(
124+
key=key,
125+
value=value,
126+
current_user=current_user,
127+
)
128+
if resolved != value:
129+
return filter_query_models.ValueEqualsPredicate(
130+
value_equals=filter_query_models.ValueEquals(key=key, value=resolved)
131+
)
132+
return predicate
133+
134+
135+
def _validate_and_resolve_predicate(
136+
*,
137+
predicate: filter_query_models.Predicate,
138+
current_user: str | None,
139+
) -> filter_query_models.Predicate:
140+
"""Validate system key support, then resolve special values."""
141+
_check_predicate_allowed(predicate=predicate)
142+
if isinstance(predicate, filter_query_models.ValueEqualsPredicate):
143+
return _maybe_resolve_system_values(
144+
predicate=predicate,
145+
current_user=current_user,
146+
)
147+
return predicate
148+
149+
47150
# ---------------------------------------------------------------------------
48151
# Public API
49152
# ---------------------------------------------------------------------------
@@ -79,7 +182,12 @@ def build_list_filters(
79182

80183
if filter_query_value:
81184
parsed = filter_query_models.FilterQuery.model_validate_json(filter_query_value)
82-
where_clauses.append(filter_query_to_where_clause(filter_query=parsed))
185+
where_clauses.append(
186+
filter_query_to_where_clause(
187+
filter_query=parsed,
188+
current_user=current_user,
189+
)
190+
)
83191

84192
next_page_token = _encode_page_token(
85193
page_token_dict={
@@ -95,10 +203,13 @@ def build_list_filters(
95203
def filter_query_to_where_clause(
96204
*,
97205
filter_query: filter_query_models.FilterQuery,
206+
current_user: str | None = None,
98207
) -> sql.ColumnElement:
99208
predicates = filter_query.and_ or filter_query.or_
100209
is_and = filter_query.and_ is not None
101-
clauses = [_predicate_to_clause(predicate=p) for p in predicates]
210+
clauses = [
211+
_predicate_to_clause(predicate=p, current_user=current_user) for p in predicates
212+
]
102213
return sql.and_(*clauses) if is_and else sql.or_(*clauses)
103214

104215

@@ -163,17 +274,35 @@ def _build_filter_where_clauses(
163274

164275
def _predicate_to_clause(
165276
*,
166-
predicate,
277+
predicate: filter_query_models.Predicate,
278+
current_user: str | None = None,
167279
) -> sql.ColumnElement:
280+
predicate = _validate_and_resolve_predicate(
281+
predicate=predicate,
282+
current_user=current_user,
283+
)
284+
168285
match predicate:
169286
case filter_query_models.AndPredicate():
170287
return sql.and_(
171-
*[_predicate_to_clause(predicate=p) for p in predicate.and_]
288+
*[
289+
_predicate_to_clause(predicate=p, current_user=current_user)
290+
for p in predicate.and_
291+
]
172292
)
173293
case filter_query_models.OrPredicate():
174-
return sql.or_(*[_predicate_to_clause(predicate=p) for p in predicate.or_])
294+
return sql.or_(
295+
*[
296+
_predicate_to_clause(predicate=p, current_user=current_user)
297+
for p in predicate.or_
298+
]
299+
)
175300
case filter_query_models.NotPredicate():
176-
return sql.not_(_predicate_to_clause(predicate=predicate.not_))
301+
return sql.not_(
302+
_predicate_to_clause(
303+
predicate=predicate.not_, current_user=current_user
304+
)
305+
)
177306
case filter_query_models.KeyExistsPredicate():
178307
return _key_exists_to_clause(predicate=predicate)
179308
case filter_query_models.ValueEqualsPredicate():

0 commit comments

Comments
 (0)