feat: add generic search client traits and adapters#994
Conversation
Introduce a family of search client traits with blanket implementations and adapter utilities to reduce boilerplate across backends. New traits ---------- - ItemsClient: single-page item search (search required); defaults: item, items - StreamItemsClient: streaming items across pages (search_stream required); defaults: collect_items, item_count, items_stream - CollectionsClient: fetch all collections (collections required); default: collection point-lookup - PagedCollectionsClient: cursor-paginated collections (collections_page required) for future backends that support paginated /collections - StreamCollectionsClient: streaming collections (collections_stream required); default: collect_collections - ArrowItemsClient (geoarrow feature): Arrow record batch output (search_to_arrow required); default: items_to_arrow - TransactionClient: write operations Blanket implementations ----------------------- - CollectionsClient + Clone + Sync -> StreamCollectionsClient (eager fetch) - ArrowItemsClient + Sync -> ItemsClient + StreamItemsClient (geoarrow feature) Adapter utilities ----------------- - PagedItemsStream<T>: wraps ItemsClient to provide StreamItemsClient via token/skip pagination using ItemCollection::next - stream_pages_generic: free function driving the pagination loop - stream_pages_collections_generic: collections equivalent for PagedCollectionsClient backends - RecordBatchReaderAdapter<I> (geoarrow feature): bridges any Iterator<Item = Result<RecordBatch, E>> to arrow_array::RecordBatchReader Documentation ------------- Add docs/search-clients.md covering the trait family, blanket impls, adapter conversion chart, pagination mechanics, and performance notes.
| #[cfg(feature = "geoarrow")] | ||
| #[derive(Debug, thiserror::Error)] | ||
| #[non_exhaustive] | ||
| pub enum AdapterError<E: std::error::Error + Send> { | ||
| /// Error from the wrapped client. | ||
| #[error(transparent)] | ||
| Client(E), | ||
|
|
||
| /// Error from STAC core (e.g. Arrow decode failure). | ||
| #[error(transparent)] | ||
| Stac(#[from] crate::Error), | ||
| } |
There was a problem hiding this comment.
I've tried to only have one error enum per crate, so this should probably go up into crates/core/src/error.rs.
| // --------------------------------------------------------------------------- | ||
| // PagedItemsStream |
There was a problem hiding this comment.
Robot-code leftover? If we can nuke any of these // out-of-function comments in this PR, I'd be grateful 🙇🏼
| // --------------------------------------------------------------------------- | |
| // PagedItemsStream |
| /// is `Some`, its entries are merged into `additional_fields` of the next | ||
| /// search request (overwriting any previous pagination values). The stream ends | ||
| /// when the page is empty or `next` is `None`. | ||
| pub fn stream_pages_generic<T>( |
There was a problem hiding this comment.
I don't love the _generic suffix, that's implied by the <T>? Same for the other function(s) in this file.
| pub fn stream_pages_generic<T>( | |
| pub fn stream_pages<T>( |
| impl<I> std::fmt::Debug for RecordBatchReaderAdapter<I> { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.debug_struct("RecordBatchReaderAdapter") | ||
| .field("schema", &self.schema) | ||
| .finish_non_exhaustive() | ||
| } | ||
| } | ||
|
|
| Output = Result<impl Stream<Item = Result<Item, Self::Error>> + Send, Self::Error>, | ||
| > + Send { | ||
| // Collect synchronously while we hold `&self`. | ||
| let result: Result<Vec<Item>, AdapterError<T::Error>> = self |
There was a problem hiding this comment.
Is there any way to not force us to load all items into memory?
| let stream = self.search_stream(search).await?; | ||
| futures::pin_mut!(stream); | ||
| let mut items = Vec::new(); | ||
| while let Some(result) = stream.next().await { | ||
| items.push(result?); | ||
| } |
There was a problem hiding this comment.
I thought you could collect streams? But maybe I'm wrong...
| async-stream.workspace = true | ||
| futures.workspace = true |
There was a problem hiding this comment.
I don't love pulling these in by default. We probably want to put them behind an async feature?
| futures.workspace = true | ||
| futures-core.workspace = true |
There was a problem hiding this comment.
Same as core, let's put these behind an async feature.
There was a problem hiding this comment.
Our docs/ are generally light, I prefer to use the docs.rs docs. These should probably live in the stac::api module-level documentation?
| package-lock.json | ||
| crates/wasm/tests/__screenshots__ | ||
| .yarn/ | ||
| planfor*.md |
Description
Introduce a family of search client traits with blanket implementations and adapter utilities, replacing ad-hoc per-backend boilerplate with a consistent, extensible design.
New traits (
stac::api)ItemsClientsearchStreamItemsClientsearch_streamCollectionsClientcollectionsPagedCollectionsClientcollections_pageStreamCollectionsClientcollections_streamArrowItemsClient(geoarrow)search_to_arrowTransactionClientadd_item,add_collectionBlanket implementations
CollectionsClient + Clone + Sync → StreamCollectionsClient— eagerly fetches all collections and yields as a stream; no wrapper struct neededArrowItemsClient + Sync → ItemsClient + StreamItemsClient(geoarrow feature) — collects record batches synchronously and returns owned itemsAdapter utilities
PagedItemsStream\<T>— wraps anyItemsClientto provideStreamItemsClientvia token/skip pagination (ItemCollection::next)stream_pages_generic— free function driving the items pagination loop; used byPagedItemsStreamand all server backendsstream_pages_collections_generic— collections equivalent forPagedCollectionsClientbackends (ready for future paginated/collectionssupport)RecordBatchReaderAdapter\<I>(geoarrow) — bridgesIterator<Item = Result<RecordBatch, E>>toarrow_array::RecordBatchReaderBackend implementations
All three server backends (memory, duckdb, pgstac) implement the full trait family.
stac-duckdbprovidesHrefClient(ArrowItemsClient) andSyncHrefClient(ItemsClient+CollectionsClient+StreamItemsClientviaMutex).stac-io'sClientimplementsStreamItemsClientwith HATEOAS link-following rather than token pagination.Design notes
ItemsClient + Clone → StreamItemsClientblanket cannot be added because it would overlap with theArrowItemsClientblanket under Rust's coherence rules. Server backends usestream_pages_genericdirectly in their explicitStreamItemsClientimpls.PagedCollectionsClienthas no blanketStreamCollectionsClientfor the same reason (would overlap with theCollectionsClientblanket). Paginated backends callstream_pages_collections_genericin their own impl.Related issues
Checklist
prek run --all-files)