Skip to content

Commit 119dadb

Browse files
committed
feat: Search pipeline run API uses pagination cursor
1 parent 986735a commit 119dadb

6 files changed

Lines changed: 314 additions & 173 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,26 +187,27 @@ def list(
187187
include_pipeline_names: bool = False,
188188
include_execution_stats: bool = False,
189189
) -> ListPipelineJobsResponse:
190-
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
190+
where_clauses = filter_query_sql.build_list_filters(
191191
filter_value=filter,
192192
filter_query_value=filter_query,
193-
page_token_value=page_token,
193+
cursor_value=page_token,
194194
current_user=current_user,
195-
page_size=self._DEFAULT_PAGE_SIZE,
196195
)
197196

198197
pipeline_runs = list(
199198
session.scalars(
200199
sql.select(bts.PipelineRun)
201200
.where(*where_clauses)
202-
.order_by(bts.PipelineRun.created_at.desc())
203-
.offset(offset)
201+
.order_by(
202+
bts.PipelineRun.created_at.desc(),
203+
bts.PipelineRun.id.desc(),
204+
)
204205
.limit(self._DEFAULT_PAGE_SIZE)
205206
).all()
206207
)
207208

208-
next_page_token = (
209-
next_token if len(pipeline_runs) >= self._DEFAULT_PAGE_SIZE else None
209+
next_page_token = filter_query_sql.maybe_next_page_token(
210+
rows=pipeline_runs, page_size=self._DEFAULT_PAGE_SIZE
210211
)
211212

212213
return ListPipelineJobsResponse(

cloud_pipelines_backend/backend_types_sql.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ class _TableBase(orm.MappedAsDataclass, orm.DeclarativeBase, kw_only=True):
128128

129129
class PipelineRun(_TableBase):
130130
__tablename__ = "pipeline_run"
131+
_IX_PR_CREATED_AT_DESC_ID_DESC: Final[str] = (
132+
"ix_pipeline_run_created_at_desc_id_desc"
133+
)
131134
id: orm.Mapped[IdType] = orm.mapped_column(
132135
primary_key=True, init=False, insert_default=generate_unique_id
133136
)
@@ -160,6 +163,11 @@ class PipelineRun(_TableBase):
160163
created_by,
161164
created_at.desc(),
162165
),
166+
sql.Index(
167+
_IX_PR_CREATED_AT_DESC_ID_DESC,
168+
created_at.desc(),
169+
id.desc(),
170+
),
163171
)
164172

165173

cloud_pipelines_backend/database_ops.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8989
index.create(db_engine, checkfirst=True)
9090
break
9191

92+
for index in bts.PipelineRun.__table__.indexes:
93+
if index.name == bts.PipelineRun._IX_PR_CREATED_AT_DESC_ID_DESC:
94+
index.create(db_engine, checkfirst=True)
95+
break
96+
9297
_backfill_pipeline_run_created_by_annotations(db_engine=db_engine)
9398
_backfill_pipeline_run_name_annotations(db_engine=db_engine)
9499

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import base64
21
import datetime
32
import json
43
import enum
@@ -38,39 +37,58 @@ class PipelineRunAnnotationSystemKey(enum.StrEnum):
3837
}
3938

4039
# ---------------------------------------------------------------------------
41-
# Page-token helpers
40+
# Cursor encode / decode
4241
# ---------------------------------------------------------------------------
4342

44-
_PAGE_TOKEN_OFFSET_KEY: Final[str] = "offset"
45-
_PAGE_TOKEN_FILTER_KEY: Final[str] = "filter"
46-
_PAGE_TOKEN_FILTER_QUERY_KEY: Final[str] = "filter_query"
43+
CURSOR_SEPARATOR: Final[str] = "~"
4744

4845

49-
def _encode_page_token(*, page_token_dict: dict[str, Any]) -> str:
50-
return base64.b64encode(json.dumps(page_token_dict).encode("utf-8")).decode("utf-8")
46+
def encode_cursor(created_at: datetime.datetime, run_id: str) -> str:
47+
"""Encode the last row's position as a tilde-separated cursor string.
5148
49+
The created_at from PipelineRun is naive UTC (no UtcDateTime decorator on
50+
this column). We stamp it as UTC here so the cursor string is
51+
timezone-explicit for readability and correctness.
52+
decode_cursor() normalizes back to naive UTC for DB comparison.
53+
"""
54+
if created_at.tzinfo is None:
55+
created_at = created_at.replace(tzinfo=datetime.timezone.utc)
56+
return f"{created_at.isoformat()}{CURSOR_SEPARATOR}{run_id}"
5257

53-
def _decode_page_token(*, page_token: str | None) -> dict[str, Any]:
54-
return json.loads(base64.b64decode(page_token)) if page_token else {}
5558

59+
def decode_cursor(cursor: str | None) -> tuple[datetime.datetime, str] | None:
60+
"""Parse a tilde-separated cursor string into (created_at, run_id).
5661
57-
def _resolve_filter_value(
58-
*,
59-
filter: str | None,
60-
filter_query: str | None,
61-
page_token: str | None,
62-
) -> tuple[str | None, str | None, int]:
63-
"""Decode page_token and return the effective (filter_value, filter_query_value, offset).
64-
65-
If a page_token is present, its stored values take precedence over the
66-
raw parameters (the token carries resolved values forward across pages).
62+
Returns None for empty/missing cursors. Raises ApiValidationError
63+
for unrecognized formats (e.g. legacy base64 tokens).
6764
"""
68-
page_token_dict = _decode_page_token(page_token=page_token)
69-
offset = page_token_dict.get(_PAGE_TOKEN_OFFSET_KEY, 0)
70-
if page_token:
71-
filter = page_token_dict.get(_PAGE_TOKEN_FILTER_KEY)
72-
filter_query = page_token_dict.get(_PAGE_TOKEN_FILTER_QUERY_KEY)
73-
return filter, filter_query, offset
65+
if not cursor:
66+
return None
67+
if CURSOR_SEPARATOR not in cursor:
68+
raise errors.ApiValidationError(
69+
f"Unrecognized page_token format. "
70+
f"Expected 'created_at~id' cursor. token={cursor[:20]}... (truncated)"
71+
)
72+
# maxsplit=1: split on first ~ only, so run_id can safely contain ~
73+
created_at_str, run_id = cursor.split(CURSOR_SEPARATOR, 1)
74+
created_at = datetime.datetime.fromisoformat(created_at_str)
75+
# Normalize to naive UTC to match DB storage format (PipelineRun.created_at
76+
# is plain DateTime, not UtcDateTime -- stores/returns naive datetimes).
77+
if created_at.tzinfo is not None:
78+
created_at = created_at.astimezone(datetime.timezone.utc).replace(tzinfo=None)
79+
return created_at, run_id
80+
81+
82+
def maybe_next_page_token(
83+
*,
84+
rows: list[bts.PipelineRun],
85+
page_size: int,
86+
) -> str | None:
87+
"""Return a cursor token for the next page, or None if this is the last page."""
88+
if len(rows) < page_size:
89+
return None
90+
last = rows[page_size - 1]
91+
return encode_cursor(last.created_at, last.id)
7492

7593

7694
# ---------------------------------------------------------------------------
@@ -154,25 +172,15 @@ def build_list_filters(
154172
*,
155173
filter_value: str | None,
156174
filter_query_value: str | None,
157-
page_token_value: str | None,
175+
cursor_value: str | None,
158176
current_user: str | None,
159-
page_size: int,
160-
) -> tuple[list[sql.ColumnElement], int, str]:
161-
"""Resolve pagination token, legacy filter, and filter_query into WHERE clauses.
162-
163-
Returns (where_clauses, offset, next_page_token_encoded).
164-
"""
177+
) -> list[sql.ColumnElement]:
178+
"""Build WHERE clauses from filters and cursor."""
165179
if filter_value and filter_query_value:
166180
raise errors.ApiValidationError(
167181
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
168182
)
169183

170-
filter_value, filter_query_value, offset = _resolve_filter_value(
171-
filter=filter_value,
172-
filter_query=filter_query_value,
173-
page_token=page_token_value,
174-
)
175-
176184
if filter_value:
177185
filter_query_value = _convert_legacy_filter_to_filter_query(
178186
filter_value=filter_value,
@@ -188,14 +196,18 @@ def build_list_filters(
188196
)
189197
)
190198

191-
next_page_token = _encode_page_token(
192-
page_token_dict={
193-
_PAGE_TOKEN_OFFSET_KEY: offset + page_size,
194-
_PAGE_TOKEN_FILTER_QUERY_KEY: filter_query_value,
195-
}
196-
)
199+
cursor = decode_cursor(cursor_value)
200+
if cursor:
201+
cursor_created_at, cursor_id = cursor
202+
where_clauses.append(
203+
sql.tuple_(bts.PipelineRun.created_at, bts.PipelineRun.id)
204+
< sql.tuple_(
205+
sql.literal(cursor_created_at),
206+
sql.literal(cursor_id),
207+
)
208+
)
197209

198-
return where_clauses, offset, next_page_token
210+
return where_clauses
199211

200212

201213
def filter_query_to_where_clause(

tests/test_api_server_sql.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ def test_list_pagination(self, session_factory, service):
179179
)
180180
assert len(page1.pipeline_runs) == 10
181181
assert page1.next_page_token is not None
182+
assert "~" in page1.next_page_token
182183

183184
with session_factory() as session:
184185
page2 = service.list(
@@ -188,6 +189,70 @@ def test_list_pagination(self, session_factory, service):
188189
assert len(page2.pipeline_runs) == 2
189190
assert page2.next_page_token is None
190191

192+
def test_list_cursor_pagination_order(self, session_factory, service):
193+
for i in range(5):
194+
_create_run(
195+
session_factory,
196+
service,
197+
root_task=_make_task_spec(f"pipeline-{i}"),
198+
)
199+
200+
with session_factory() as session:
201+
result = service.list(session=session)
202+
203+
dates = [r.created_at for r in result.pipeline_runs]
204+
assert dates == sorted(dates, reverse=True)
205+
206+
def test_list_cursor_pagination_no_overlap(self, session_factory, service):
207+
for i in range(12):
208+
_create_run(
209+
session_factory,
210+
service,
211+
root_task=_make_task_spec(f"pipeline-{i}"),
212+
)
213+
214+
with session_factory() as session:
215+
page1 = service.list(session=session)
216+
with session_factory() as session:
217+
page2 = service.list(session=session, page_token=page1.next_page_token)
218+
page1_ids = {r.id for r in page1.pipeline_runs}
219+
page2_ids = {r.id for r in page2.pipeline_runs}
220+
assert page1_ids.isdisjoint(page2_ids)
221+
222+
def test_list_cursor_pagination_stable_under_inserts(
223+
self, session_factory, service
224+
):
225+
for i in range(12):
226+
_create_run(
227+
session_factory,
228+
service,
229+
root_task=_make_task_spec(f"pipeline-{i}"),
230+
)
231+
232+
with session_factory() as session:
233+
page1 = service.list(session=session)
234+
page1_ids = {r.id for r in page1.pipeline_runs}
235+
236+
_create_run(
237+
session_factory,
238+
service,
239+
root_task=_make_task_spec("pipeline-new"),
240+
)
241+
242+
with session_factory() as session:
243+
page2 = service.list(session=session, page_token=page1.next_page_token)
244+
page2_ids = {r.id for r in page2.pipeline_runs}
245+
assert page1_ids.isdisjoint(page2_ids)
246+
assert len(page2.pipeline_runs) == 2
247+
248+
def test_list_invalid_page_token_raises(self, session_factory, service):
249+
"""page_token without ~ raises ApiValidationError (422)."""
250+
with session_factory() as session:
251+
with pytest.raises(
252+
errors.ApiValidationError, match="Unrecognized page_token"
253+
):
254+
service.list(session=session, page_token="not-a-cursor")
255+
191256
def test_list_filter_unsupported(self, session_factory, service):
192257
with session_factory() as session:
193258
with pytest.raises(NotImplementedError, match="Unsupported filter"):
@@ -1278,7 +1343,7 @@ def test_list_filter_query_time_range_offset_timezone(
12781343
returned_ids = {r.id for r in result.pipeline_runs}
12791344
assert returned_ids == {run_b.id, run_c.id}
12801345

1281-
def test_pagination_preserves_filter_query(self, session_factory, service):
1346+
def test_pagination_with_filter_query(self, session_factory, service):
12821347
for _ in range(12):
12831348
run = _create_run(
12841349
session_factory,
@@ -1302,14 +1367,13 @@ def test_pagination_preserves_filter_query(self, session_factory, service):
13021367
)
13031368
assert len(page1.pipeline_runs) == 10
13041369
assert page1.next_page_token is not None
1305-
1306-
decoded = filter_query_sql._decode_page_token(page_token=page1.next_page_token)
1307-
assert decoded["filter_query"] == fq
1370+
assert "~" in page1.next_page_token
13081371

13091372
with session_factory() as session:
13101373
page2 = service.list(
13111374
session=session,
13121375
page_token=page1.next_page_token,
1376+
filter_query=fq,
13131377
)
13141378
assert len(page2.pipeline_runs) == 2
13151379
assert page2.next_page_token is None

0 commit comments

Comments
 (0)