1- import base64
21import dataclasses
32import datetime
4- import json
53import logging
64import typing
75from typing import Any , Final , Optional
1210from . import backend_types_sql as bts
1311from . import component_structures as structures
1412from . import errors
15- from . import filter_query_models
13+ from . import filter_query_sql
1614
1715if typing .TYPE_CHECKING :
1816 from cloud_pipelines .orchestration .storage_providers import (
@@ -34,8 +32,6 @@ def _get_current_time() -> datetime.datetime:
3432 return datetime .datetime .now (tz = datetime .timezone .utc )
3533
3634
37- _PAGE_TOKEN_OFFSET_KEY : Final [str ] = "offset"
38- _PAGE_TOKEN_FILTER_KEY : Final [str ] = "filter"
3935_DEFAULT_PAGE_SIZE : Final [int ] = 10
4036
4137
@@ -175,22 +171,12 @@ def list(
175171 include_pipeline_names : bool = False ,
176172 include_execution_stats : bool = False ,
177173 ) -> ListPipelineJobsResponse :
178- if filter and filter_query :
179- raise errors .MutuallyExclusiveFilterError (
180- "Cannot use both 'filter' and 'filter_query'. Use one or the other."
181- )
182-
183- if filter_query :
184- filter_query_models .FilterQuery .model_validate_json (filter_query )
185- raise NotImplementedError ("filter_query is not yet implemented." )
186-
187- filter_value , offset = _resolve_filter_value (
188- filter = filter ,
189- page_token = page_token ,
190- )
191- where_clauses , next_page_filter_value = _build_filter_where_clauses (
192- filter_value = filter_value ,
174+ where_clauses , offset , next_token = filter_query_sql .build_list_filters (
175+ filter_value = filter ,
176+ filter_query_value = filter_query ,
177+ page_token_value = page_token ,
193178 current_user = current_user ,
179+ page_size = _DEFAULT_PAGE_SIZE ,
194180 )
195181
196182 pipeline_runs = list (
@@ -202,14 +188,10 @@ def list(
202188 .limit (_DEFAULT_PAGE_SIZE )
203189 ).all ()
204190 )
205- next_page_offset = offset + _DEFAULT_PAGE_SIZE
206- next_page_token_dict = {
207- _PAGE_TOKEN_OFFSET_KEY : next_page_offset ,
208- _PAGE_TOKEN_FILTER_KEY : next_page_filter_value ,
209- }
210- next_page_token = _encode_page_token (next_page_token_dict )
211- if len (pipeline_runs ) < _DEFAULT_PAGE_SIZE :
212- next_page_token = None
191+
192+ next_page_token = (
193+ next_token .encode () if len (pipeline_runs ) >= _DEFAULT_PAGE_SIZE else None
194+ )
213195
214196 return ListPipelineJobsResponse (
215197 pipeline_runs = [
@@ -350,82 +332,6 @@ def delete_annotation(
350332 session .commit ()
351333
352334
353- def _resolve_filter_value (
354- * ,
355- filter : str | None ,
356- page_token : str | None ,
357- ) -> tuple [str | None , int ]:
358- """Decode page_token and return the effective (filter_value, offset).
359-
360- If a page_token is present, its stored filter takes precedence over the
361- raw filter parameter (the token carries the resolved filter forward across pages).
362- """
363- page_token_dict = _decode_page_token (page_token )
364- offset = page_token_dict .get (_PAGE_TOKEN_OFFSET_KEY , 0 )
365- if page_token :
366- filter = page_token_dict .get (_PAGE_TOKEN_FILTER_KEY , None )
367- return filter , offset
368-
369-
370- def _build_filter_where_clauses (
371- * ,
372- filter_value : str | None ,
373- current_user : str | None ,
374- ) -> tuple [list [sql .ColumnElement ], str | None ]:
375- """Parse a filter string into SQLAlchemy WHERE clauses.
376-
377- Returns (where_clauses, next_page_filter_value). The second value is the
378- filter string with shorthand values resolved (e.g. "created_by:me" becomes
379- "created_by:alice@example.com") so it can be embedded in the next page token.
380- """
381- where_clauses : list [sql .ColumnElement ] = []
382- parsed_filter = _parse_filter (filter_value ) if filter_value else {}
383- for key , value in parsed_filter .items ():
384- if key == "_text" :
385- raise NotImplementedError ("Text search is not implemented yet." )
386- elif key == "created_by" :
387- if value == "me" :
388- if current_user is None :
389- current_user = ""
390- value = current_user
391- # TODO: Maybe make this a bit more robust.
392- # We need to change the filter since it goes into the next_page_token.
393- filter_value = filter_value .replace (
394- "created_by:me" , f"created_by:{ current_user } "
395- )
396- if value :
397- where_clauses .append (bts .PipelineRun .created_by == value )
398- else :
399- where_clauses .append (bts .PipelineRun .created_by == None )
400- else :
401- raise NotImplementedError (f"Unsupported filter { filter_value } ." )
402- return where_clauses , filter_value
403-
404-
405- def _decode_page_token (page_token : str ) -> dict [str , Any ]:
406- return json .loads (base64 .b64decode (page_token )) if page_token else {}
407-
408-
409- def _encode_page_token (page_token_dict : dict [str , Any ]) -> str :
410- return (base64 .b64encode (json .dumps (page_token_dict ).encode ("utf8" ))).decode (
411- "utf-8"
412- )
413-
414-
415- def _parse_filter (filter : str ) -> dict [str , str ]:
416- # TODO: Improve
417- parts = filter .strip ().split ()
418- parsed_filter = {}
419- for part in parts :
420- key , sep , value = part .partition (":" )
421- if sep :
422- parsed_filter [key ] = value
423- else :
424- parsed_filter .setdefault ("_text" , "" )
425- parsed_filter ["_text" ] += part
426- return parsed_filter
427-
428-
429335# ========== ExecutionNodeApiService_Sql
430336
431337
0 commit comments