Skip to content

Commit fa0c050

Browse files
authored
feat: Create Pydantic models for Search Pipeline Run API (#106)
### TL;DR Added a new `filter_query` parameter to the pipeline runs API with Pydantic validation models, while maintaining backward compatibility with the existing `filter` parameter. ### What's new? JSON definition for `filter_query`: ```json { // logical (any predicate) "<and|or>": [ // leaf {"key_exists": {"key": "<KEY>"}}, // leaf {"value_contains": {"key": "<KEY>", "value_substring": "<VALUE>"}}, // leaf {"value_in": {"key": "<KEY>", "values": ["<V1>", "<V2>", "..."]}}, // leaf {"value_equals": {"key": "<KEY>", "value": "<VALUE>"}}, // leaf {"time_range": {"key": "system/pipeline_run.date.created_at", "start_time": "<START_DATE>", "end_time": "<END_DATE>"}}, // logical (leaf only) {"not": {"<LEAF_PREDICATE>": {}}}, // logical (any predicate) {"<and|or>": [...]}, ... ] } ``` Example JSON for `filter_query`: ```json { "and": [ {"key_exists": {"key": "team"}}, {"value_equals": {"key": "env", "value": "prod"}}, {"value_in": {"key": "region", "values": ["us-east", "us-west", "eu-west"]}}, {"not": {"key_exists": {"key": "deprecated"}}}, {"or": [{"value_contains": {"key": "name", "value_substring": "nightly"}}]} ] } ``` ### What changed? API `GET /api/pipeline_runs/` #### Functional - New `filter_query` if used in API will return an HTTP 501 (unimplemented error) - Only `filter` or `filter_query` (mutual exclusive) can be in the API, else it returns a HTTP 422. #### Other - Added a new `filter_query` parameter to `ListPipelineRunsParams` that accepts structured JSON queries - Created comprehensive Pydantic models in `filter_query_models.py` for validating filter queries, including: - Leaf predicates: `key_exists`, `value_equals`, `value_contains`, `value_in`, `time_range` - Logical operators: `and`, `or`, `not` - Recursive nesting support for complex queries - Implemented JSON validation for `filter_query` with proper error handling - Enforced strict validation rules including non-empty strings and timezone-aware datetimes ### How to test? ```bash uv run pytest tests/test_api_server_sql.py tests/test_filter_query_models.py ``` - Test the new `filter_query` parameter with valid JSON structures like `{"and": [{"key_exists": {"key": "team"}}]}` - Verify that using both `filter` and `filter_query` simultaneously returns a 422 error - Confirm that invalid JSON in `filter_query` triggers proper validation errors - Check that valid `filter_query` usage returns a 501 "not yet implemented" response - Test edge cases like empty strings, missing timezone info, and nested logical operators ### Why make this change? This PR introduces the Pydantic models and API wiring that upstream PRs depend on to implement the Search Pipeline Run API.
1 parent 098f799 commit fa0c050

6 files changed

Lines changed: 437 additions & 1 deletion

File tree

cloud_pipelines_backend/api_router.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,24 @@ def handle_item_already_exists_error(
125125
content={"message": str(exc)},
126126
)
127127

128+
@app.exception_handler(errors.ApiValidationError)
129+
def handle_api_validation_error(
130+
request: fastapi.Request, exc: errors.ApiValidationError
131+
):
132+
return fastapi.responses.JSONResponse(
133+
status_code=422,
134+
content={"detail": str(exc)},
135+
)
136+
137+
@app.exception_handler(NotImplementedError)
138+
def handle_not_implemented_error(
139+
request: fastapi.Request, exc: NotImplementedError
140+
):
141+
return fastapi.responses.JSONResponse(
142+
status_code=501,
143+
content={"detail": str(exc)},
144+
)
145+
128146
get_user_details_dependency = fastapi.Depends(user_details_getter)
129147

130148
def get_user_name(

cloud_pipelines_backend/api_server_sql.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from . import backend_types_sql as bts
1313
from . import component_structures as structures
1414
from . import errors
15+
from . import filter_query_models
1516

1617
if typing.TYPE_CHECKING:
1718
from cloud_pipelines.orchestration.storage_providers import (
@@ -167,10 +168,20 @@ def list(
167168
session: orm.Session,
168169
page_token: str | None = None,
169170
filter: str | None = None,
171+
filter_query: str | None = None,
170172
current_user: str | None = None,
171173
include_pipeline_names: bool = False,
172174
include_execution_stats: bool = False,
173175
) -> ListPipelineJobsResponse:
176+
if filter and filter_query:
177+
raise errors.ApiValidationError(
178+
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
179+
)
180+
181+
if filter_query:
182+
filter_query_models.FilterQuery.model_validate_json(filter_query)
183+
raise NotImplementedError("filter_query is not yet implemented.")
184+
174185
filter_value, offset = _resolve_filter_value(
175186
filter=filter,
176187
page_token=page_token,

cloud_pipelines_backend/errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,9 @@ class ItemAlreadyExistsError(Exception):
88

99
class PermissionError(Exception):
1010
pass
11+
12+
13+
class ApiValidationError(Exception):
14+
"""Base for all filter/annotation validation errors -> 422."""
15+
16+
pass
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from __future__ import annotations
2+
3+
from typing import Annotated
4+
5+
import pydantic
6+
7+
NonEmptyStr = Annotated[str, pydantic.StringConstraints(min_length=1)]
8+
9+
10+
class _BaseModel(pydantic.BaseModel):
11+
model_config = {"extra": "forbid"}
12+
13+
14+
# --- Leaf argument models ---
15+
16+
17+
class KeyExists(_BaseModel):
18+
key: NonEmptyStr
19+
20+
21+
class ValueContains(_BaseModel):
22+
key: NonEmptyStr
23+
value_substring: NonEmptyStr
24+
25+
26+
class ValueIn(_BaseModel):
27+
key: NonEmptyStr
28+
values: list[NonEmptyStr] = pydantic.Field(min_length=1)
29+
30+
31+
class ValueEquals(_BaseModel):
32+
key: NonEmptyStr
33+
value: str
34+
35+
36+
class TimeRange(_BaseModel):
37+
"""At least one of start_time or end_time is required.
38+
39+
Valid combinations: start+end (range), start-only (after), end-only (before).
40+
AwareDatetime requires timezone info (e.g. "2024-01-01T00:00:00Z").
41+
Naive datetimes like "2024-01-01T00:00:00" are rejected, preventing
42+
ambiguous timestamps that could silently resolve to the wrong timezone.
43+
"""
44+
45+
key: NonEmptyStr
46+
start_time: pydantic.AwareDatetime | None = None
47+
end_time: pydantic.AwareDatetime | None = None
48+
49+
@pydantic.model_validator(mode="after")
50+
def _at_least_one_time_bound(self) -> TimeRange:
51+
if self.start_time is None and self.end_time is None:
52+
raise ValueError(
53+
"TimeRange requires at least one of 'start_time' or 'end_time'."
54+
)
55+
return self
56+
57+
58+
# --- Predicate wrapper models (one field each) ---
59+
60+
61+
class KeyExistsPredicate(_BaseModel):
62+
key_exists: KeyExists
63+
64+
65+
class ValueContainsPredicate(_BaseModel):
66+
value_contains: ValueContains
67+
68+
69+
class ValueInPredicate(_BaseModel):
70+
value_in: ValueIn
71+
72+
73+
class ValueEqualsPredicate(_BaseModel):
74+
value_equals: ValueEquals
75+
76+
77+
class TimeRangePredicate(_BaseModel):
78+
time_range: TimeRange
79+
80+
81+
LeafPredicate = (
82+
KeyExistsPredicate
83+
| ValueContainsPredicate
84+
| ValueInPredicate
85+
| ValueEqualsPredicate
86+
| TimeRangePredicate
87+
)
88+
89+
90+
class NotPredicate(_BaseModel):
91+
not_: LeafPredicate = pydantic.Field(alias="not")
92+
93+
94+
class AndPredicate(_BaseModel):
95+
and_: list["Predicate"] = pydantic.Field(alias="and", min_length=1)
96+
97+
98+
class OrPredicate(_BaseModel):
99+
or_: list["Predicate"] = pydantic.Field(alias="or", min_length=1)
100+
101+
102+
Predicate = (
103+
KeyExistsPredicate
104+
| ValueContainsPredicate
105+
| ValueInPredicate
106+
| ValueEqualsPredicate
107+
| TimeRangePredicate
108+
| NotPredicate
109+
| AndPredicate
110+
| OrPredicate
111+
)
112+
113+
# Resolve forward reference to "Predicate" in recursive and/or models
114+
AndPredicate.model_rebuild()
115+
OrPredicate.model_rebuild()
116+
117+
118+
class FilterQuery(_BaseModel):
119+
"""Root: must be exactly one of {"and": [...]} or {"or": [...]}."""
120+
121+
and_: list[Predicate] | None = pydantic.Field(None, alias="and", min_length=1)
122+
or_: list[Predicate] | None = pydantic.Field(None, alias="or", min_length=1)
123+
124+
@pydantic.model_validator(mode="after")
125+
def _exactly_one_root_operator(self) -> FilterQuery:
126+
has_and = self.and_ is not None
127+
has_or = self.or_ is not None
128+
if has_and == has_or:
129+
raise ValueError("FilterQuery root must have exactly one of 'and' or 'or'.")
130+
return self

tests/test_api_server_sql.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import pytest
22
from sqlalchemy import orm
33

4+
from cloud_pipelines_backend import api_server_sql
45
from cloud_pipelines_backend import backend_types_sql as bts
56
from cloud_pipelines_backend import component_structures as structures
6-
from cloud_pipelines_backend import api_server_sql
77
from cloud_pipelines_backend import database_ops
8+
from cloud_pipelines_backend import errors
89

910

1011
class TestExecutionStatusSummary:
@@ -537,3 +538,34 @@ def test_text_search_raises(self):
537538
filter_value="some_text_without_colon",
538539
current_user=None,
539540
)
541+
542+
543+
class TestFilterQueryApiWiring:
544+
def test_filter_query_returns_not_implemented(self, session_factory, service):
545+
valid_json = '{"and": [{"key_exists": {"key": "team"}}]}'
546+
with session_factory() as session:
547+
with pytest.raises(NotImplementedError, match="not yet implemented"):
548+
service.list(
549+
session=session,
550+
filter_query=valid_json,
551+
)
552+
553+
def test_filter_query_validates_before_501(self, session_factory, service):
554+
from pydantic import ValidationError
555+
556+
invalid_json = '{"bad_key": "not_valid"}'
557+
with session_factory() as session:
558+
with pytest.raises(ValidationError):
559+
service.list(
560+
session=session,
561+
filter_query=invalid_json,
562+
)
563+
564+
def test_mutual_exclusivity_rejected(self, session_factory, service):
565+
with session_factory() as session:
566+
with pytest.raises(errors.ApiValidationError, match="Cannot use both"):
567+
service.list(
568+
session=session,
569+
filter="created_by:alice",
570+
filter_query='{"and": [{"key_exists": {"key": "team"}}]}',
571+
)

0 commit comments

Comments
 (0)