Conversation
| impl Default for ItemComparator { | ||
| /// Creates a new `ItemComparator` with the default sort order. | ||
| /// | ||
| /// The default sort order is `datetime` descending, followed by `id` ascending. | ||
| fn default() -> Self { | ||
| Self { | ||
| sort_fields: vec![ | ||
| SortField { | ||
| field: "datetime".to_string(), | ||
| direction: Direction::Desc, | ||
| }, | ||
| SortField { | ||
| field: "id".to_string(), | ||
| direction: Direction::Asc, | ||
| }, | ||
| ], | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The existance of this default implementation implies (I think) that we could impl PartialOrd for Item?
| /// Sorts multiple streams of items into a single sorted stream. | ||
| /// | ||
| /// # Examples | ||
| /// | ||
| /// ``` | ||
| /// use stac::{Item, sort::sort_streams}; | ||
| /// use serde_json::json; | ||
| /// use futures::stream::{self, StreamExt}; | ||
| /// | ||
| /// # tokio_test::block_on(async { | ||
| /// let stream1 = stream::iter(vec![Item::new("a"), Item::new("c")]); | ||
| /// let stream2 = stream::iter(vec![Item::new("b"), Item::new("d")]); | ||
| /// let config = json!({ | ||
| /// "sortby": [ | ||
| /// { "field": "id", "direction": "asc" } | ||
| /// ] | ||
| /// }); | ||
| /// let mut sorted = sort_streams(vec![stream1, stream2], config).unwrap(); | ||
| /// assert_eq!(sorted.next().await.unwrap().id, "a"); | ||
| /// assert_eq!(sorted.next().await.unwrap().id, "b"); | ||
| /// assert_eq!(sorted.next().await.unwrap().id, "c"); | ||
| /// assert_eq!(sorted.next().await.unwrap().id, "d"); | ||
| /// # }); | ||
| /// ``` | ||
| pub fn sort_streams<S, I>( | ||
| streams: I, | ||
| config: Value, | ||
| ) -> Result<impl Stream<Item = S::Item>, serde_json::Error> | ||
| where | ||
| S: Stream<Item = Item> + Unpin, | ||
| I: IntoIterator<Item = S>, | ||
| { | ||
| let comparator = ItemComparator::new(config)?; | ||
| Ok(kmerge_by(streams, move |a, b| { | ||
| comparator.compare(a, b).reverse() | ||
| })) | ||
| } |
There was a problem hiding this comment.
What's the use-case for the stream merging?
There was a problem hiding this comment.
Just realized I totally abandoned this... the use case for stream merging is to use to create something like federated search. This would allow you to async run searches to multiple catalogs returning iterators that will automatically page when necessary and then merge those results into a sorted stream combining the results from the multiple catalogs.
There was a problem hiding this comment.
What do you mean by "sorted stream"? Do you mean that we look at the an attribute of the next item of each steam, see which one is biggest (or smallest) and yield that next?
Seems interesting but kind of niche. Maybe better implemented in Python first?
crates/core/src/sort.rs
Outdated
| @@ -0,0 +1,541 @@ | |||
| use crate::Item; | |||
| use futures::Stream; | |||
There was a problem hiding this comment.
I don't think we should be bringing in any async stuff into this crate, unless we really need to. What's the use-case for this?
There was a problem hiding this comment.
I stuck the stream stuff behind a streams feature - i think for the federation use case, async would be really key
Description
Creates an ItemComparator that implements compare.Compare that can be built using a json sortby config.
This can be used to compare items (returning Ordering) or to sort an array of items.
There is also a function that can be used to merge and interleave multiple streams of sorted items with this config (with an eye towards federated search or similar processing of multiple streams of items).
Checklist
Delete any checklist items that do not apply (e.g. if your change is minor, it may not require documentation updates).
cargo fmt)cargo test