Skip to content

Commit 4ad6962

Browse files
Move public search logic into separate file
Also added custom cursor logic for tracking accurate page size of licenses.
1 parent c666f62 commit 4ad6962

5 files changed

Lines changed: 666 additions & 422 deletions

File tree

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
import json
2+
from base64 import b64decode, b64encode
3+
4+
from aws_lambda_powertools.utilities.typing import LambdaContext
5+
from cc_common.config import config, logger
6+
from cc_common.data_model.schema.provider.api import (
7+
PublicLicenseSearchResponseSchema,
8+
QueryProvidersRequestSchema,
9+
)
10+
from cc_common.exceptions import CCInvalidRequestException
11+
from cc_common.utils import api_handler
12+
from marshmallow import ValidationError
13+
from opensearch_client import OpenSearchClient
14+
15+
# Default and maximum page sizes for search results
16+
MAX_PROVIDER_PAGE_SIZE = 100
17+
18+
19+
# Instantiate the OpenSearch client outside the handler to cache the connection between invocations
20+
# Set timeout to 20 seconds to give API gateway time to respond with response
21+
opensearch_client = OpenSearchClient(timeout=25)
22+
23+
@api_handler
24+
def public_search_api_handler(event: dict, context: LambdaContext): # noqa: ARG001 unused-argument
25+
"""
26+
Public query providers endpoint (no auth).
27+
Translates structured query (licenseNumber, familyName, givenName, jurisdiction) into OpenSearch
28+
nested query and returns license-level results with existing pagination schema.
29+
"""
30+
http_method = event.get('httpMethod')
31+
resource_path = event.get('resource')
32+
if (http_method, resource_path) != ('POST', '/v1/public/compacts/{compact}/providers/query'):
33+
raise CCInvalidRequestException(f'Unsupported method or resource: {http_method} {resource_path}')
34+
35+
return _public_query_licenses(event, context)
36+
37+
38+
def _public_query_licenses(event: dict, context: LambdaContext): # noqa: ARG001 unused-argument
39+
compact = event['pathParameters']['compact']
40+
body = _parse_and_validate_public_query_body(event)
41+
query_obj = body.get('query', {})
42+
pagination = body.get('pagination') or {}
43+
page_size = pagination.get('pageSize') or config.default_page_size
44+
45+
cursor = _decode_public_cursor(pagination.get('lastKey'))
46+
search_body = _build_public_license_search_body(compact=compact, body=body, cursor=cursor)
47+
index_name = f'compact_{compact}_providers'
48+
49+
logger.info('Executing public license search', compact=compact, index_name=index_name)
50+
response = opensearch_client.search(index_name=index_name, body=search_body)
51+
52+
hits = response.get('hits', {}).get('hits', [])
53+
license_schema = PublicLicenseSearchResponseSchema()
54+
providers = []
55+
last_sort = None
56+
prev_sort = None
57+
resume_provider_id = cursor.get('resume_provider_id') if cursor else None
58+
resume_offset = (cursor.get('license_offset') or 0) if cursor else 0
59+
next_cursor_resume_provider_id = None
60+
next_cursor_resume_provider_sort = None
61+
next_cursor_license_offset = None
62+
next_cursor_search_after = None
63+
64+
for hit in hits:
65+
source = hit.get('_source', {})
66+
provider_id = source.get('providerId')
67+
if source.get('compact') != compact:
68+
logger.warning(
69+
'Provider compact does not match path, skipping',
70+
provider_id=provider_id,
71+
path_compact=compact,
72+
)
73+
continue
74+
inner_hits = hit.get('inner_hits', {}).get('licenses', {}).get('hits', {}).get('hits', [])
75+
skip = resume_offset if (resume_provider_id == provider_id) else 0
76+
if resume_provider_id == provider_id:
77+
resume_provider_id = None
78+
resume_offset = 0
79+
consumed_from_this_provider = 0
80+
for inner in inner_hits:
81+
if skip > 0:
82+
skip -= 1
83+
continue
84+
if len(providers) >= page_size:
85+
next_cursor_resume_provider_id = provider_id
86+
next_cursor_resume_provider_sort = hit.get('sort')
87+
next_cursor_license_offset = consumed_from_this_provider
88+
last_sort = hit.get('sort')
89+
next_cursor_search_after = prev_sort
90+
break
91+
license_source = inner.get('_source', {}).copy()
92+
license_source['providerId'] = provider_id
93+
license_source['compact'] = compact
94+
try:
95+
sanitized = license_schema.load(license_source)
96+
sanitized.pop('jurisdiction', None)
97+
providers.append(sanitized)
98+
consumed_from_this_provider += 1
99+
except ValidationError as e:
100+
logger.error(
101+
'Failed to sanitize license record',
102+
provider_id=provider_id,
103+
errors=e.messages,
104+
)
105+
if next_cursor_resume_provider_id is not None:
106+
break
107+
prev_sort = last_sort
108+
last_sort = hit.get('sort')
109+
110+
last_key = None
111+
if len(providers) >= page_size and last_sort is not None:
112+
if next_cursor_resume_provider_id is not None:
113+
last_key = _encode_public_cursor(
114+
search_after=next_cursor_search_after,
115+
resume_provider_id=next_cursor_resume_provider_id,
116+
resume_provider_sort=next_cursor_resume_provider_sort,
117+
license_offset=next_cursor_license_offset,
118+
)
119+
else:
120+
last_key = _encode_public_cursor(search_after=last_sort)
121+
122+
return {
123+
'providers': providers,
124+
'pagination': {
125+
'pageSize': page_size,
126+
'lastKey': last_key,
127+
'prevLastKey': pagination.get('lastKey'),
128+
},
129+
'query': query_obj,
130+
}
131+
132+
133+
def _parse_and_validate_public_query_body(event: dict) -> dict:
134+
try:
135+
schema = QueryProvidersRequestSchema()
136+
raw_body = event.get('body') or '{}'
137+
body = schema.loads(raw_body)
138+
except ValidationError as e:
139+
logger.warning('Invalid public query request body', errors=e.messages)
140+
raise CCInvalidRequestException(f'Invalid request: {e.messages}') from e
141+
142+
query = body.get('query', {})
143+
if query.get('givenName') and not query.get('familyName'):
144+
raise CCInvalidRequestException('familyName is required if givenName is provided')
145+
146+
if not any((query.get('licenseNumber'), query.get('jurisdiction'), query.get('familyName'))):
147+
raise CCInvalidRequestException(
148+
'At least one of licenseNumber, jurisdiction, or familyName must be provided'
149+
)
150+
151+
return body
152+
153+
154+
def _decode_public_cursor(last_key: str | None) -> dict | None:
155+
"""
156+
Decode and validate the public cursor.
157+
Returns dict with search_after (optional), and optionally resume_provider_id, resume_provider_sort, license_offset.
158+
Raises CCInvalidRequestException if lastKey is present but invalid.
159+
"""
160+
if not last_key:
161+
return None
162+
try:
163+
decoded = json.loads(b64decode(last_key).decode('utf-8'))
164+
except Exception as e:
165+
raise CCInvalidRequestException('Invalid lastKey') from e
166+
if not isinstance(decoded, dict):
167+
raise CCInvalidRequestException('Invalid lastKey')
168+
has_resume = 'resume_provider_id' in decoded and 'license_offset' in decoded
169+
if not has_resume and not decoded.get('search_after'):
170+
raise CCInvalidRequestException('Invalid lastKey')
171+
if has_resume and 'resume_provider_sort' not in decoded:
172+
raise CCInvalidRequestException('Invalid lastKey')
173+
return decoded
174+
175+
176+
def _encode_public_cursor(
177+
search_after: list | None,
178+
resume_provider_id: str | None = None,
179+
resume_provider_sort: list | None = None,
180+
license_offset: int | None = None,
181+
) -> str:
182+
payload = {}
183+
if search_after is not None:
184+
payload['search_after'] = search_after
185+
if resume_provider_id is not None and resume_provider_sort is not None and license_offset is not None:
186+
payload['resume_provider_id'] = resume_provider_id
187+
payload['resume_provider_sort'] = resume_provider_sort
188+
payload['license_offset'] = license_offset
189+
return b64encode(json.dumps(payload).encode('utf-8')).decode('utf-8')
190+
191+
192+
def _build_public_license_search_body(*, compact: str, body: dict, cursor: dict | None = None) -> dict:
193+
query_obj = body.get('query', {})
194+
pagination = body.get('pagination') or {}
195+
page_size = pagination.get('pageSize') or config.default_page_size
196+
197+
search_after = cursor.get('search_after') if cursor else None
198+
199+
nested_must = []
200+
if query_obj.get('licenseNumber'):
201+
nested_must.append({'term': {'licenses.licenseNumber': query_obj['licenseNumber']}})
202+
if query_obj.get('jurisdiction'):
203+
nested_must.append({'term': {'licenses.jurisdiction': query_obj['jurisdiction'].lower()}})
204+
if query_obj.get('familyName'):
205+
nested_must.append({'match': {'licenses.familyName': query_obj['familyName']}})
206+
if query_obj.get('givenName'):
207+
nested_must.append({'match': {'licenses.givenName': query_obj['givenName']}})
208+
209+
nested_query = {'nested': {'path': 'licenses', 'query': {'bool': {'must': nested_must}}}}
210+
if nested_must:
211+
nested_query['nested']['inner_hits'] = {
212+
'name': 'licenses',
213+
'size': MAX_PROVIDER_PAGE_SIZE,
214+
'sort': [
215+
{'licenses.jurisdiction.keyword': 'asc'},
216+
{'licenses.licenseType.keyword': 'asc'},
217+
{'licenses.licenseNumber.keyword': 'asc'},
218+
],
219+
}
220+
221+
must = [
222+
{'term': {'compact': compact}},
223+
nested_query,
224+
]
225+
226+
search_body = {
227+
'query': {'bool': {'must': must}},
228+
'size': page_size,
229+
'sort': [
230+
{'familyName.keyword': 'asc'},
231+
{'givenName.keyword': 'asc'},
232+
{'providerId': 'asc'},
233+
],
234+
}
235+
if search_after is not None:
236+
search_body['search_after'] = search_after
237+
238+
return search_body

0 commit comments

Comments
 (0)