Skip to content

Add ability to compare/sort/merge items using a sortby config#856

Draft
bitner wants to merge 9 commits intomainfrom
item_sort
Draft

Add ability to compare/sort/merge items using a sortby config#856
bitner wants to merge 9 commits intomainfrom
item_sort

Conversation

@bitner
Copy link
Collaborator

@bitner bitner commented Nov 20, 2025

Description

Creates an ItemComparator that implements compare.Compare that can be built using a json sortby config.

This can be used to compare items (returning Ordering) or to sort an array of items.

There is also a function that can be used to merge and interleave multiple streams of sorted items with this config (with an eye towards federated search or similar processing of multiple streams of items).

Checklist

Delete any checklist items that do not apply (e.g. if your change is minor, it may not require documentation updates).

  • Unit tests
  • Documentation, including doctests
  • Git history is linear
  • Commit messages are descriptive
  • (optional) Git commit messages follow conventional commits
  • Code is formatted (cargo fmt)
  • cargo test
  • Changes are added to the CHANGELOG

@bitner bitner requested a review from gadomski as a code owner November 20, 2025 17:38
@gadomski gadomski self-requested a review November 21, 2025 14:32
Comment on lines +114 to +132
impl Default for ItemComparator {
/// Creates a new `ItemComparator` with the default sort order.
///
/// The default sort order is `datetime` descending, followed by `id` ascending.
fn default() -> Self {
Self {
sort_fields: vec![
SortField {
field: "datetime".to_string(),
direction: Direction::Desc,
},
SortField {
field: "id".to_string(),
direction: Direction::Asc,
},
],
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existance of this default implementation implies (I think) that we could impl PartialOrd for Item?

Comment on lines +231 to +267
/// Sorts multiple streams of items into a single sorted stream.
///
/// # Examples
///
/// ```
/// use stac::{Item, sort::sort_streams};
/// use serde_json::json;
/// use futures::stream::{self, StreamExt};
///
/// # tokio_test::block_on(async {
/// let stream1 = stream::iter(vec![Item::new("a"), Item::new("c")]);
/// let stream2 = stream::iter(vec![Item::new("b"), Item::new("d")]);
/// let config = json!({
/// "sortby": [
/// { "field": "id", "direction": "asc" }
/// ]
/// });
/// let mut sorted = sort_streams(vec![stream1, stream2], config).unwrap();
/// assert_eq!(sorted.next().await.unwrap().id, "a");
/// assert_eq!(sorted.next().await.unwrap().id, "b");
/// assert_eq!(sorted.next().await.unwrap().id, "c");
/// assert_eq!(sorted.next().await.unwrap().id, "d");
/// # });
/// ```
pub fn sort_streams<S, I>(
streams: I,
config: Value,
) -> Result<impl Stream<Item = S::Item>, serde_json::Error>
where
S: Stream<Item = Item> + Unpin,
I: IntoIterator<Item = S>,
{
let comparator = ItemComparator::new(config)?;
Ok(kmerge_by(streams, move |a, b| {
comparator.compare(a, b).reverse()
}))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use-case for the stream merging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized I totally abandoned this... the use case for stream merging is to use to create something like federated search. This would allow you to async run searches to multiple catalogs returning iterators that will automatically page when necessary and then merge those results into a sorted stream combining the results from the multiple catalogs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "sorted stream"? Do you mean that we look at the an attribute of the next item of each steam, see which one is biggest (or smallest) and yield that next?

Seems interesting but kind of niche. Maybe better implemented in Python first?

@@ -0,0 +1,541 @@
use crate::Item;
use futures::Stream;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be bringing in any async stuff into this crate, unless we really need to. What's the use-case for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stuck the stream stuff behind a streams feature - i think for the federation use case, async would be really key

@gadomski gadomski marked this pull request as draft December 31, 2025 14:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants