Skip to content

Commit 38dc5e7

Browse files
authored
feat: Search created by user in pipeline run API (#108)
### TL;DR Implemented search `created_by` user in Pipeline Runs. ### What changed? #### Functionality - API `GET /api/pipeline_runs/` - Search `created_by` user in `filter_query`. Example query (not URL encoded for example): ```json /api?filter_query={"and": [{"value_equals": {"key": "system/pipeline_run.created_by", "value": "alice@example.com"}}]} ``` - Annotation value `me` is supported for `created_by` search. - API `POST /api/pipeline_runs/` - Copies the `created_by` data to the annotations table for future searching. Example pipeline run annotations table after API is called: | pipeline_run_id | key | value | |---|---|---| | `abc123` | `system/pipeline_run.created_by` | `alice@example.com` | - API `[PUT|DELETE] /api/pipeline_runs/{id}/annotations/{key}` - Prevent (create, update, delete) of annotations with `system/` prefix. Reserved for system annotation usage. #### Other - **Database migration**: Added backfill logic to populate existing pipeline runs with `created_by` annotations. Example what a pipeline run's created by user would look like in the annotations table: | pipeline_run_id | key | value | |-----------------|-----|-------| | 42 | system/pipeline_run.created_by | alice@example.com | ### How to test? ``` uv run pytest tests/test_api_server_sql.py tests/test_filter_query_sql.py tests/test_database_ops.py ``` 1. Create pipeline runs with different `created_by` values 2. Use filter queries like `{"and": [{"value_equals": {"key": "system/pipeline_run.created_by", "value": "alice"}}]}` to search by creator 3. Test the `"me"` placeholder: `{"and": [{"value_equals": {"key": "system/pipeline_run.created_by", "value": "me"}}]}` 4. Verify that attempts to set/delete system annotations return 422 errors 5. Test that unsupported predicates on system keys (like `value_contains`) are rejected ### Why make this change? - This enables users to search for pipeline runs by creator using the new filter query system. - Following safety guards and synchronization will allow old and new data (`create_by`) to be searchable with the new filter. - Preventing (create, delete, update) system prefix in annotations - Saving `created_by` to annotations table when starting a Pipeline Run
1 parent 538bdc0 commit 38dc5e7

7 files changed

Lines changed: 968 additions & 13 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ class ListPipelineJobsResponse:
6868
class PipelineRunsApiService_Sql:
6969
_PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name"
7070
_DEFAULT_PAGE_SIZE: Final[int] = 10
71+
_SYSTEM_KEY_RESERVED_MSG = (
72+
"Annotation keys starting with "
73+
f"{filter_query_sql.SYSTEM_KEY_PREFIX!r} are reserved for system use."
74+
)
75+
76+
def _fail_if_changing_system_annotation(self, *, key: str) -> None:
77+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
78+
raise errors.ApiValidationError(self._SYSTEM_KEY_RESERVED_MSG)
7179

7280
def create(
7381
self,
@@ -105,6 +113,19 @@ def create(
105113
},
106114
)
107115
session.add(pipeline_run)
116+
# Mirror created_by into the annotations table so it's searchable
117+
# via filter_query like any other annotation.
118+
if created_by is not None:
119+
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
120+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
121+
session.flush()
122+
session.add(
123+
bts.PipelineRunAnnotation(
124+
pipeline_run_id=pipeline_run.id,
125+
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
126+
value=created_by,
127+
)
128+
)
108129
session.commit()
109130

110131
session.refresh(pipeline_run)
@@ -295,6 +316,7 @@ def set_annotation(
295316
user_name: str | None = None,
296317
skip_user_check: bool = False,
297318
):
319+
self._fail_if_changing_system_annotation(key=key)
298320
pipeline_run = session.get(bts.PipelineRun, id)
299321
if not pipeline_run:
300322
raise errors.ItemNotFoundError(f"Pipeline run {id} not found.")
@@ -317,6 +339,7 @@ def delete_annotation(
317339
user_name: str | None = None,
318340
skip_user_check: bool = False,
319341
):
342+
self._fail_if_changing_system_annotation(key=key)
320343
pipeline_run = session.get(bts.PipelineRun, id)
321344
if not pipeline_run:
322345
raise errors.ItemNotFoundError(f"Pipeline run {id} not found.")

cloud_pipelines_backend/database_ops.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import sqlalchemy
2+
from sqlalchemy import orm
23

34
from . import backend_types_sql as bts
5+
from . import filter_query_sql
46

57

68
def create_db_engine_and_migrate_db(
@@ -83,3 +85,60 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8385
if index.name == bts.PipelineRunAnnotation._IX_ANNOTATION_RUN_ID_KEY_VALUE:
8486
index.create(db_engine, checkfirst=True)
8587
break
88+
89+
_backfill_pipeline_run_created_by_annotations(db_engine=db_engine)
90+
91+
92+
def _is_pipeline_run_annotation_key_already_backfilled(
93+
*,
94+
session: orm.Session,
95+
key: str,
96+
) -> bool:
97+
"""Return True if at least one annotation with the given key exists."""
98+
return session.query(
99+
sqlalchemy.exists(
100+
sqlalchemy.select(sqlalchemy.literal(1))
101+
.select_from(bts.PipelineRunAnnotation)
102+
.where(
103+
bts.PipelineRunAnnotation.key == key,
104+
)
105+
)
106+
).scalar()
107+
108+
109+
def _backfill_pipeline_run_created_by_annotations(
110+
*,
111+
db_engine: sqlalchemy.Engine,
112+
) -> None:
113+
"""Copy pipeline_run.created_by into pipeline_run_annotation so
114+
annotation-based search works for created_by.
115+
116+
The check and insert run in a single session/transaction to avoid
117+
TOCTOU races between concurrent startup processes.
118+
119+
Skips entirely if any created_by annotation key already exists (i.e. the
120+
write-path is populating them, so the backfill has already run or is
121+
no longer needed).
122+
"""
123+
with orm.Session(db_engine) as session:
124+
if _is_pipeline_run_annotation_key_already_backfilled(
125+
session=session,
126+
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
127+
):
128+
return
129+
130+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
131+
["pipeline_run_id", "key", "value"],
132+
sqlalchemy.select(
133+
bts.PipelineRun.id,
134+
sqlalchemy.literal(
135+
filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY
136+
),
137+
bts.PipelineRun.created_by,
138+
).where(
139+
bts.PipelineRun.created_by.isnot(None),
140+
bts.PipelineRun.created_by != "",
141+
),
142+
)
143+
session.execute(stmt)
144+
session.commit()

cloud_pipelines_backend/filter_query_models.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import abc
34
from typing import Annotated
45

56
import pydantic
@@ -58,21 +59,45 @@ def _at_least_one_time_bound(self) -> TimeRange:
5859
# --- Predicate wrapper models (one field each) ---
5960

6061

61-
class KeyExistsPredicate(_BaseModel):
62+
class KeyPredicateBase(_BaseModel):
63+
"""Base for predicates that target an annotation key."""
64+
65+
@property
66+
@abc.abstractmethod
67+
def key(self) -> str: ...
68+
69+
70+
class KeyExistsPredicate(KeyPredicateBase):
6271
key_exists: KeyExists
6372

73+
@property
74+
def key(self) -> str:
75+
return self.key_exists.key
76+
6477

65-
class ValueContainsPredicate(_BaseModel):
78+
class ValueContainsPredicate(KeyPredicateBase):
6679
value_contains: ValueContains
6780

81+
@property
82+
def key(self) -> str:
83+
return self.value_contains.key
6884

69-
class ValueInPredicate(_BaseModel):
85+
86+
class ValueInPredicate(KeyPredicateBase):
7087
value_in: ValueIn
7188

89+
@property
90+
def key(self) -> str:
91+
return self.value_in.key
92+
7293

73-
class ValueEqualsPredicate(_BaseModel):
94+
class ValueEqualsPredicate(KeyPredicateBase):
7495
value_equals: ValueEquals
7596

97+
@property
98+
def key(self) -> str:
99+
return self.value_equals.key
100+
76101

77102
class TimeRangePredicate(_BaseModel):
78103
time_range: TimeRange

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 120 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import base64
22
import json
3+
import enum
34
from typing import Any, Final
45

56
import sqlalchemy as sql
@@ -8,6 +9,21 @@
89
from . import errors
910
from . import filter_query_models
1011

12+
SYSTEM_KEY_PREFIX: Final[str] = "system/"
13+
14+
15+
class PipelineRunAnnotationSystemKey(enum.StrEnum):
16+
CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by"
17+
18+
19+
SYSTEM_KEY_SUPPORTED_PREDICATES: dict[PipelineRunAnnotationSystemKey, set[type]] = {
20+
PipelineRunAnnotationSystemKey.CREATED_BY: {
21+
filter_query_models.KeyExistsPredicate,
22+
filter_query_models.ValueEqualsPredicate,
23+
filter_query_models.ValueInPredicate,
24+
},
25+
}
26+
1127
# ---------------------------------------------------------------------------
1228
# Page-token helpers
1329
# ---------------------------------------------------------------------------
@@ -44,6 +60,78 @@ def _resolve_filter_value(
4460
return filter, filter_query, offset
4561

4662

63+
# ---------------------------------------------------------------------------
64+
# PipelineRunAnnotationSystemKey validation and resolution
65+
# ---------------------------------------------------------------------------
66+
67+
68+
def _check_predicate_allowed(*, predicate: filter_query_models.Predicate) -> None:
69+
"""Raise if a system key is used with an unsupported predicate type."""
70+
if not isinstance(predicate, filter_query_models.KeyPredicateBase):
71+
return
72+
key = predicate.key
73+
74+
try:
75+
system_key = PipelineRunAnnotationSystemKey(key)
76+
except ValueError:
77+
return
78+
79+
supported = SYSTEM_KEY_SUPPORTED_PREDICATES.get(system_key, set())
80+
if type(predicate) not in supported:
81+
raise errors.ApiValidationError(
82+
f"Predicate {type(predicate).__name__} is not supported "
83+
f"for system key {system_key!r}. "
84+
f"Supported: {[t.__name__ for t in supported]}"
85+
)
86+
87+
88+
def _resolve_system_key_value(
89+
*,
90+
key: str,
91+
value: str,
92+
current_user: str | None,
93+
) -> str:
94+
"""Resolve special placeholder values for system keys."""
95+
if key == PipelineRunAnnotationSystemKey.CREATED_BY and value == "me":
96+
return current_user if current_user is not None else ""
97+
return value
98+
99+
100+
def _maybe_resolve_system_values(
101+
*,
102+
predicate: filter_query_models.ValueEqualsPredicate,
103+
current_user: str | None,
104+
) -> filter_query_models.ValueEqualsPredicate:
105+
"""Resolve special values in a ValueEqualsPredicate."""
106+
key = predicate.value_equals.key
107+
value = predicate.value_equals.value
108+
resolved = _resolve_system_key_value(
109+
key=key,
110+
value=value,
111+
current_user=current_user,
112+
)
113+
if resolved != value:
114+
return filter_query_models.ValueEqualsPredicate(
115+
value_equals=filter_query_models.ValueEquals(key=key, value=resolved)
116+
)
117+
return predicate
118+
119+
120+
def _validate_and_resolve_predicate(
121+
*,
122+
predicate: filter_query_models.Predicate,
123+
current_user: str | None,
124+
) -> filter_query_models.Predicate:
125+
"""Validate system key support, then resolve special values."""
126+
_check_predicate_allowed(predicate=predicate)
127+
if isinstance(predicate, filter_query_models.ValueEqualsPredicate):
128+
return _maybe_resolve_system_values(
129+
predicate=predicate,
130+
current_user=current_user,
131+
)
132+
return predicate
133+
134+
47135
# ---------------------------------------------------------------------------
48136
# Public API
49137
# ---------------------------------------------------------------------------
@@ -79,7 +167,12 @@ def build_list_filters(
79167

80168
if filter_query_value:
81169
parsed = filter_query_models.FilterQuery.model_validate_json(filter_query_value)
82-
where_clauses.append(filter_query_to_where_clause(filter_query=parsed))
170+
where_clauses.append(
171+
filter_query_to_where_clause(
172+
filter_query=parsed,
173+
current_user=current_user,
174+
)
175+
)
83176

84177
next_page_token = _encode_page_token(
85178
page_token_dict={
@@ -95,10 +188,13 @@ def build_list_filters(
95188
def filter_query_to_where_clause(
96189
*,
97190
filter_query: filter_query_models.FilterQuery,
191+
current_user: str | None = None,
98192
) -> sql.ColumnElement:
99193
predicates = filter_query.and_ or filter_query.or_
100194
is_and = filter_query.and_ is not None
101-
clauses = [_predicate_to_clause(predicate=p) for p in predicates]
195+
clauses = [
196+
_predicate_to_clause(predicate=p, current_user=current_user) for p in predicates
197+
]
102198
return sql.and_(*clauses) if is_and else sql.or_(*clauses)
103199

104200

@@ -163,17 +259,35 @@ def _build_filter_where_clauses(
163259

164260
def _predicate_to_clause(
165261
*,
166-
predicate,
262+
predicate: filter_query_models.Predicate,
263+
current_user: str | None = None,
167264
) -> sql.ColumnElement:
265+
predicate = _validate_and_resolve_predicate(
266+
predicate=predicate,
267+
current_user=current_user,
268+
)
269+
168270
match predicate:
169271
case filter_query_models.AndPredicate():
170272
return sql.and_(
171-
*[_predicate_to_clause(predicate=p) for p in predicate.and_]
273+
*[
274+
_predicate_to_clause(predicate=p, current_user=current_user)
275+
for p in predicate.and_
276+
]
172277
)
173278
case filter_query_models.OrPredicate():
174-
return sql.or_(*[_predicate_to_clause(predicate=p) for p in predicate.or_])
279+
return sql.or_(
280+
*[
281+
_predicate_to_clause(predicate=p, current_user=current_user)
282+
for p in predicate.or_
283+
]
284+
)
175285
case filter_query_models.NotPredicate():
176-
return sql.not_(_predicate_to_clause(predicate=predicate.not_))
286+
return sql.not_(
287+
_predicate_to_clause(
288+
predicate=predicate.not_, current_user=current_user
289+
)
290+
)
177291
case filter_query_models.KeyExistsPredicate():
178292
return _key_exists_to_clause(predicate=predicate)
179293
case filter_query_models.ValueEqualsPredicate():

0 commit comments

Comments
 (0)