Skip to content

Commit 98e7d41

Browse files
committed
feat: Search annotations in pipeline run API
1 parent f7dc9bc commit 98e7d41

7 files changed

Lines changed: 954 additions & 207 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 10 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import base64
21
import dataclasses
32
import datetime
4-
import json
53
import logging
64
import typing
75
from typing import Any, Final, Optional
@@ -12,7 +10,7 @@
1210
from . import backend_types_sql as bts
1311
from . import component_structures as structures
1412
from . import errors
15-
from . import filter_query_models
13+
from . import filter_query_sql
1614

1715
if typing.TYPE_CHECKING:
1816
from cloud_pipelines.orchestration.storage_providers import (
@@ -34,8 +32,6 @@ def _get_current_time() -> datetime.datetime:
3432
return datetime.datetime.now(tz=datetime.timezone.utc)
3533

3634

37-
_PAGE_TOKEN_OFFSET_KEY: Final[str] = "offset"
38-
_PAGE_TOKEN_FILTER_KEY: Final[str] = "filter"
3935
_DEFAULT_PAGE_SIZE: Final[int] = 10
4036

4137

@@ -175,22 +171,12 @@ def list(
175171
include_pipeline_names: bool = False,
176172
include_execution_stats: bool = False,
177173
) -> ListPipelineJobsResponse:
178-
if filter and filter_query:
179-
raise errors.MutuallyExclusiveFilterError(
180-
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
181-
)
182-
183-
if filter_query:
184-
filter_query_models.FilterQuery.model_validate_json(filter_query)
185-
raise NotImplementedError("filter_query is not yet implemented.")
186-
187-
filter_value, offset = _resolve_filter_value(
188-
filter=filter,
189-
page_token=page_token,
190-
)
191-
where_clauses, next_page_filter_value = _build_filter_where_clauses(
192-
filter_value=filter_value,
174+
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
175+
filter_value=filter,
176+
filter_query_value=filter_query,
177+
page_token_value=page_token,
193178
current_user=current_user,
179+
page_size=_DEFAULT_PAGE_SIZE,
194180
)
195181

196182
pipeline_runs = list(
@@ -202,14 +188,10 @@ def list(
202188
.limit(_DEFAULT_PAGE_SIZE)
203189
).all()
204190
)
205-
next_page_offset = offset + _DEFAULT_PAGE_SIZE
206-
next_page_token_dict = {
207-
_PAGE_TOKEN_OFFSET_KEY: next_page_offset,
208-
_PAGE_TOKEN_FILTER_KEY: next_page_filter_value,
209-
}
210-
next_page_token = _encode_page_token(next_page_token_dict)
211-
if len(pipeline_runs) < _DEFAULT_PAGE_SIZE:
212-
next_page_token = None
191+
192+
next_page_token = (
193+
next_token.encode() if len(pipeline_runs) >= _DEFAULT_PAGE_SIZE else None
194+
)
213195

214196
return ListPipelineJobsResponse(
215197
pipeline_runs=[
@@ -350,82 +332,6 @@ def delete_annotation(
350332
session.commit()
351333

352334

353-
def _resolve_filter_value(
354-
*,
355-
filter: str | None,
356-
page_token: str | None,
357-
) -> tuple[str | None, int]:
358-
"""Decode page_token and return the effective (filter_value, offset).
359-
360-
If a page_token is present, its stored filter takes precedence over the
361-
raw filter parameter (the token carries the resolved filter forward across pages).
362-
"""
363-
page_token_dict = _decode_page_token(page_token)
364-
offset = page_token_dict.get(_PAGE_TOKEN_OFFSET_KEY, 0)
365-
if page_token:
366-
filter = page_token_dict.get(_PAGE_TOKEN_FILTER_KEY, None)
367-
return filter, offset
368-
369-
370-
def _build_filter_where_clauses(
371-
*,
372-
filter_value: str | None,
373-
current_user: str | None,
374-
) -> tuple[list[sql.ColumnElement], str | None]:
375-
"""Parse a filter string into SQLAlchemy WHERE clauses.
376-
377-
Returns (where_clauses, next_page_filter_value). The second value is the
378-
filter string with shorthand values resolved (e.g. "created_by:me" becomes
379-
"created_by:alice@example.com") so it can be embedded in the next page token.
380-
"""
381-
where_clauses: list[sql.ColumnElement] = []
382-
parsed_filter = _parse_filter(filter_value) if filter_value else {}
383-
for key, value in parsed_filter.items():
384-
if key == "_text":
385-
raise NotImplementedError("Text search is not implemented yet.")
386-
elif key == "created_by":
387-
if value == "me":
388-
if current_user is None:
389-
current_user = ""
390-
value = current_user
391-
# TODO: Maybe make this a bit more robust.
392-
# We need to change the filter since it goes into the next_page_token.
393-
filter_value = filter_value.replace(
394-
"created_by:me", f"created_by:{current_user}"
395-
)
396-
if value:
397-
where_clauses.append(bts.PipelineRun.created_by == value)
398-
else:
399-
where_clauses.append(bts.PipelineRun.created_by == None)
400-
else:
401-
raise NotImplementedError(f"Unsupported filter {filter_value}.")
402-
return where_clauses, filter_value
403-
404-
405-
def _decode_page_token(page_token: str) -> dict[str, Any]:
406-
return json.loads(base64.b64decode(page_token)) if page_token else {}
407-
408-
409-
def _encode_page_token(page_token_dict: dict[str, Any]) -> str:
410-
return (base64.b64encode(json.dumps(page_token_dict).encode("utf8"))).decode(
411-
"utf-8"
412-
)
413-
414-
415-
def _parse_filter(filter: str) -> dict[str, str]:
416-
# TODO: Improve
417-
parts = filter.strip().split()
418-
parsed_filter = {}
419-
for part in parts:
420-
key, sep, value = part.partition(":")
421-
if sep:
422-
parsed_filter[key] = value
423-
else:
424-
parsed_filter.setdefault("_text", "")
425-
parsed_filter["_text"] += part
426-
return parsed_filter
427-
428-
429335
# ========== ExecutionNodeApiService_Sql
430336

431337

cloud_pipelines_backend/backend_types_sql.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@
22
import datetime
33
import enum
44
import typing
5-
from typing import Any
5+
from typing import Any, Final
66

77
import sqlalchemy as sql
88
from sqlalchemy import orm
99
from sqlalchemy.ext import mutable
1010

1111
IdType: typing.TypeAlias = str
1212

13+
IX_EXECUTION_NODE_CACHE_KEY: Final[str] = (
14+
"ix_execution_node_container_execution_cache_key"
15+
)
16+
IX_ANNOTATION_RUN_ID_KEY_VALUE: Final[str] = (
17+
"ix_pipeline_run_annotation_run_id_key_value"
18+
)
19+
1320

1421
class ContainerExecutionStatus(str, enum.Enum):
1522
INVALID = "INVALID" # Compatibility with Vertex AI CustomJob
@@ -64,7 +71,7 @@ def generate_unique_id() -> str:
6471

6572
# # Needed to put a union type into DB
6673
# class SqlIOTypeStruct(_BaseModel):
67-
# type: structures.TypeSpecType
74+
# type: structures.TypeSpecType
6875
# No. We'll represent TypeSpecType as name:str + properties:dict
6976
# Supported cases:
7077
# * type: "name"
@@ -500,6 +507,15 @@ class PipelineRunAnnotation(_TableBase):
500507
key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True)
501508
value: orm.Mapped[str | None] = orm.mapped_column(default=None)
502509

510+
__table_args__ = (
511+
sql.Index(
512+
IX_ANNOTATION_RUN_ID_KEY_VALUE,
513+
"pipeline_run_id",
514+
"key",
515+
"value",
516+
),
517+
)
518+
503519

504520
class Secret(_TableBase):
505521
__tablename__ = "secret"

cloud_pipelines_backend/component_structures.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import pydantic.alias_generators
5555
from pydantic.dataclasses import dataclass as pydantic_dataclasses
5656

57-
5857
# PrimitiveTypes = Union[str, int, float, bool]
5958
PrimitiveTypes = str
6059

cloud_pipelines_backend/database_ops.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,11 @@ def migrate_db(db_engine: sqlalchemy.Engine):
7575
# Or we need to avoid calling the Index constructor.
7676

7777
for index in bts.ExecutionNode.__table__.indexes:
78-
if index.name == "ix_execution_node_container_execution_cache_key":
78+
if index.name == bts.IX_EXECUTION_NODE_CACHE_KEY:
7979
index.create(db_engine, checkfirst=True)
80+
break
81+
82+
for index in bts.PipelineRunAnnotation.__table__.indexes:
83+
if index.name == bts.IX_ANNOTATION_RUN_ID_KEY_VALUE:
84+
index.create(db_engine, checkfirst=True)
85+
break

0 commit comments

Comments
 (0)