Skip to content

Commit 527caa1

Browse files
committed
feat: Search created by user in pipeline run API
1 parent d6f04df commit 527caa1

7 files changed

Lines changed: 677 additions & 9 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def _get_current_time() -> datetime.datetime:
3333

3434

3535
_DEFAULT_PAGE_SIZE: Final[int] = 10
36+
_SYSTEM_KEY_RESERVED_MSG = f"Annotation keys starting with {filter_query_sql.SYSTEM_KEY_PREFIX!r} are reserved for system use."
3637

3738

3839
# ==== PipelineJobService
@@ -107,6 +108,19 @@ def create(
107108
},
108109
)
109110
session.add(pipeline_run)
111+
# Mirror created_by into the annotations table so it's searchable
112+
# via filter_query like any other annotation.
113+
if created_by is not None:
114+
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
115+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
116+
session.flush()
117+
session.add(
118+
bts.PipelineRunAnnotation(
119+
pipeline_run_id=pipeline_run.id,
120+
key=filter_query_sql.SystemKey.CREATED_BY,
121+
value=created_by,
122+
)
123+
)
110124
session.commit()
111125

112126
session.refresh(pipeline_run)
@@ -297,6 +311,8 @@ def set_annotation(
297311
user_name: str | None = None,
298312
skip_user_check: bool = False,
299313
):
314+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
315+
raise errors.InvalidAnnotationKeyError(_SYSTEM_KEY_RESERVED_MSG)
300316
pipeline_run = session.get(bts.PipelineRun, id)
301317
if not pipeline_run:
302318
raise errors.ItemNotFoundError(f"Pipeline run {id} not found.")
@@ -319,6 +335,8 @@ def delete_annotation(
319335
user_name: str | None = None,
320336
skip_user_check: bool = False,
321337
):
338+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
339+
raise errors.InvalidAnnotationKeyError(_SYSTEM_KEY_RESERVED_MSG)
322340
pipeline_run = session.get(bts.PipelineRun, id)
323341
if not pipeline_run:
324342
raise errors.ItemNotFoundError(f"Pipeline run {id} not found.")

cloud_pipelines_backend/database_ops.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import sqlalchemy
2+
from sqlalchemy import orm
23

34
from . import backend_types_sql as bts
5+
from . import filter_query_sql
46

57

68
def create_db_engine_and_migrate_db(
@@ -83,3 +85,59 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8385
if index.name == bts.IX_ANNOTATION_RUN_ID_KEY_VALUE:
8486
index.create(db_engine, checkfirst=True)
8587
break
88+
89+
backfill_created_by_annotations(db_engine=db_engine)
90+
91+
92+
def is_annotation_key_already_backfilled(
93+
*,
94+
db_engine: sqlalchemy.Engine,
95+
key: str,
96+
) -> bool:
97+
"""Return True if at least one annotation with the given key exists."""
98+
with orm.Session(db_engine) as session:
99+
return session.query(
100+
sqlalchemy.exists(
101+
sqlalchemy.select(sqlalchemy.literal(1))
102+
.select_from(bts.PipelineRunAnnotation)
103+
.where(
104+
bts.PipelineRunAnnotation.key == key,
105+
)
106+
)
107+
).scalar()
108+
109+
110+
def backfill_created_by_annotations(*, db_engine: sqlalchemy.Engine):
111+
"""Copy pipeline_run.created_by into pipeline_run_annotation so
112+
annotation-based search works for created_by.
113+
114+
Skips entirely if any created_by annotation key already exists (i.e. the
115+
write-path is populating them, so the backfill has already run or is
116+
no longer needed).
117+
"""
118+
if is_annotation_key_already_backfilled(
119+
db_engine=db_engine, key=filter_query_sql.SystemKey.CREATED_BY
120+
):
121+
return
122+
123+
with orm.Session(db_engine) as session:
124+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
125+
["pipeline_run_id", "key", "value"],
126+
sqlalchemy.select(
127+
bts.PipelineRun.id,
128+
sqlalchemy.literal(filter_query_sql.SystemKey.CREATED_BY),
129+
bts.PipelineRun.created_by,
130+
).where(
131+
bts.PipelineRun.created_by.isnot(None),
132+
# NOT EXISTS makes the backfill idempotent
133+
~sqlalchemy.exists(
134+
sqlalchemy.select(bts.PipelineRunAnnotation.pipeline_run_id).where(
135+
bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id,
136+
bts.PipelineRunAnnotation.key
137+
== filter_query_sql.SystemKey.CREATED_BY,
138+
)
139+
),
140+
),
141+
)
142+
session.execute(stmt)
143+
session.commit()

cloud_pipelines_backend/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,7 @@ class ApiValidationError(Exception):
1818

1919
class MutuallyExclusiveFilterError(ApiValidationError):
2020
pass
21+
22+
23+
class InvalidAnnotationKeyError(ApiValidationError):
24+
pass

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 136 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,30 @@
11
import base64
22
import dataclasses
33
import json
4+
import enum
5+
from typing import Final
46

57
import sqlalchemy as sql
68

79
from . import backend_types_sql as bts
810
from . import errors
911
from . import filter_query_models
1012

13+
SYSTEM_KEY_PREFIX: Final[str] = "system/"
14+
15+
16+
class SystemKey(enum.StrEnum):
17+
CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by"
18+
19+
20+
SYSTEM_KEY_SUPPORTED_PREDICATES: dict[SystemKey, set[type]] = {
21+
SystemKey.CREATED_BY: {
22+
filter_query_models.KeyExistsPredicate,
23+
filter_query_models.ValueEqualsPredicate,
24+
filter_query_models.ValueInPredicate,
25+
},
26+
}
27+
1128
# ---------------------------------------------------------------------------
1229
# PageToken
1330
# ---------------------------------------------------------------------------
@@ -31,6 +48,93 @@ def decode(cls, token: str | None) -> "PageToken":
3148
return cls(**json.loads(base64.b64decode(token)))
3249

3350

51+
# ---------------------------------------------------------------------------
52+
# SystemKey validation and resolution
53+
# ---------------------------------------------------------------------------
54+
55+
56+
def _get_predicate_key(*, predicate: filter_query_models.Predicate) -> str | None:
57+
"""Extract the annotation key from a leaf predicate, or None for logical operators."""
58+
match predicate:
59+
case filter_query_models.KeyExistsPredicate():
60+
return predicate.key_exists.key
61+
case filter_query_models.ValueEqualsPredicate():
62+
return predicate.value_equals.key
63+
case filter_query_models.ValueContainsPredicate():
64+
return predicate.value_contains.key
65+
case filter_query_models.ValueInPredicate():
66+
return predicate.value_in.key
67+
case _:
68+
return None
69+
70+
71+
def _check_predicate_allowed(*, predicate: filter_query_models.Predicate) -> None:
72+
"""Raise if a system key is used with an unsupported predicate type."""
73+
key = _get_predicate_key(predicate=predicate)
74+
if key is None:
75+
return
76+
77+
try:
78+
system_key = SystemKey(key)
79+
except ValueError:
80+
return
81+
82+
supported = SYSTEM_KEY_SUPPORTED_PREDICATES.get(system_key, set())
83+
if type(predicate) not in supported:
84+
raise errors.InvalidAnnotationKeyError(
85+
f"Predicate {type(predicate).__name__} is not supported "
86+
f"for system key {system_key!r}. "
87+
f"Supported: {[t.__name__ for t in supported]}"
88+
)
89+
90+
91+
def _resolve_system_key_value(
92+
*,
93+
key: str,
94+
value: str,
95+
current_user: str | None,
96+
) -> str:
97+
"""Resolve special placeholder values for system keys."""
98+
if key == SystemKey.CREATED_BY and value == "me":
99+
return current_user if current_user is not None else ""
100+
return value
101+
102+
103+
def _maybe_resolve_system_values(
104+
*,
105+
predicate: filter_query_models.ValueEqualsPredicate,
106+
current_user: str | None,
107+
) -> filter_query_models.ValueEqualsPredicate:
108+
"""Resolve special values in a ValueEqualsPredicate."""
109+
key = predicate.value_equals.key
110+
value = predicate.value_equals.value
111+
resolved = _resolve_system_key_value(
112+
key=key,
113+
value=value,
114+
current_user=current_user,
115+
)
116+
if resolved != value:
117+
return filter_query_models.ValueEqualsPredicate(
118+
value_equals=filter_query_models.ValueEquals(key=key, value=resolved)
119+
)
120+
return predicate
121+
122+
123+
def _validate_and_resolve_predicate(
124+
*,
125+
predicate: filter_query_models.Predicate,
126+
current_user: str | None,
127+
) -> filter_query_models.Predicate:
128+
"""Validate system key support, then resolve special values."""
129+
_check_predicate_allowed(predicate=predicate)
130+
if isinstance(predicate, filter_query_models.ValueEqualsPredicate):
131+
return _maybe_resolve_system_values(
132+
predicate=predicate,
133+
current_user=current_user,
134+
)
135+
return predicate
136+
137+
34138
# ---------------------------------------------------------------------------
35139
# Public API
36140
# ---------------------------------------------------------------------------
@@ -67,7 +171,12 @@ def build_list_filters(
67171

68172
if filter_query_value:
69173
parsed = filter_query_models.FilterQuery.model_validate_json(filter_query_value)
70-
where_clauses.append(filter_query_to_where_clause(filter_query=parsed))
174+
where_clauses.append(
175+
filter_query_to_where_clause(
176+
filter_query=parsed,
177+
current_user=current_user,
178+
)
179+
)
71180

72181
next_page_token = PageToken(
73182
offset=offset + page_size,
@@ -81,10 +190,13 @@ def build_list_filters(
81190
def filter_query_to_where_clause(
82191
*,
83192
filter_query: filter_query_models.FilterQuery,
193+
current_user: str | None = None,
84194
) -> sql.ColumnElement:
85195
predicates = filter_query.and_ or filter_query.or_
86196
is_and = filter_query.and_ is not None
87-
clauses = [_predicate_to_clause(predicate=p) for p in predicates]
197+
clauses = [
198+
_predicate_to_clause(predicate=p, current_user=current_user) for p in predicates
199+
]
88200
return sql.and_(*clauses) if is_and else sql.or_(*clauses)
89201

90202

@@ -149,17 +261,35 @@ def _build_filter_where_clauses(
149261

150262
def _predicate_to_clause(
151263
*,
152-
predicate,
264+
predicate: filter_query_models.Predicate,
265+
current_user: str | None = None,
153266
) -> sql.ColumnElement:
267+
predicate = _validate_and_resolve_predicate(
268+
predicate=predicate,
269+
current_user=current_user,
270+
)
271+
154272
match predicate:
155273
case filter_query_models.AndPredicate():
156274
return sql.and_(
157-
*[_predicate_to_clause(predicate=p) for p in predicate.and_]
275+
*[
276+
_predicate_to_clause(predicate=p, current_user=current_user)
277+
for p in predicate.and_
278+
]
158279
)
159280
case filter_query_models.OrPredicate():
160-
return sql.or_(*[_predicate_to_clause(predicate=p) for p in predicate.or_])
281+
return sql.or_(
282+
*[
283+
_predicate_to_clause(predicate=p, current_user=current_user)
284+
for p in predicate.or_
285+
]
286+
)
161287
case filter_query_models.NotPredicate():
162-
return sql.not_(_predicate_to_clause(predicate=predicate.not_))
288+
return sql.not_(
289+
_predicate_to_clause(
290+
predicate=predicate.not_, current_user=current_user
291+
)
292+
)
163293
case filter_query_models.KeyExistsPredicate():
164294
return _key_exists_to_clause(predicate=predicate)
165295
case filter_query_models.ValueEqualsPredicate():

0 commit comments

Comments
 (0)