From 8c70be05c17f505d4bee94c1e9e97e7695a8092a Mon Sep 17 00:00:00 2001 From: George Talbot Date: Thu, 9 Apr 2026 17:15:25 -0400 Subject: [PATCH 1/4] feat: compute deterministic timeseries_id column at ingest Add a timeseries_id column (Int64) to the metrics Arrow batch, computed as a SipHash-2-4 of the series identity columns (metric_name, metric_type, and all tags excluding temporal/value columns). The hash uses fixed keys for cross-process determinism. The column is already declared in the metrics default sort schema (between host and timestamp_secs), so the parquet writer now automatically sorts by it and places it in the correct physical position. Co-Authored-By: Claude Opus 4.6 (1M context) --- quickwit/Cargo.lock | 1 + .../src/otlp/arrow_metrics.rs | 40 ++-- quickwit/quickwit-parquet-engine/Cargo.toml | 1 + quickwit/quickwit-parquet-engine/src/lib.rs | 1 + .../src/schema/fields.rs | 6 + .../src/timeseries_id.rs | 197 ++++++++++++++++++ 6 files changed, 234 insertions(+), 12 deletions(-) create mode 100644 quickwit/quickwit-parquet-engine/src/timeseries_id.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6bb00a5944c..1cae2665a92 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7730,6 +7730,7 @@ dependencies = [ "sea-query", "serde", "serde_json", + "siphasher", "tempfile", "thiserror 2.0.18", "tokio", diff --git a/quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs b/quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs index 88ba0701d76..aeee696f1f8 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs @@ -24,11 +24,13 @@ use std::io::Cursor; use std::sync::Arc; use arrow::array::{ - ArrayRef, Float64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder, UInt64Builder, + ArrayRef, Float64Builder, Int64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder, + UInt64Builder, }; use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema}; use arrow::ipc::reader::StreamReader; use arrow::ipc::writer::StreamWriter; +use quickwit_parquet_engine::timeseries_id::compute_timeseries_id; use quickwit_proto::bytes::Bytes; use quickwit_proto::ingest::{DocBatchV2, DocFormat}; use quickwit_proto::types::DocUid; @@ -73,8 +75,9 @@ impl ArrowMetricsBatchBuilder { } let sorted_tag_keys: Vec<&str> = tag_keys.into_iter().collect(); - // Build the Arrow schema dynamically - let mut fields = Vec::with_capacity(4 + sorted_tag_keys.len()); + // Build the Arrow schema dynamically. + // 5 fixed columns: metric_name, metric_type, timestamp_secs, value, timeseries_id + let mut fields = Vec::with_capacity(5 + sorted_tag_keys.len()); fields.push(Field::new( "metric_name", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), @@ -83,6 +86,7 @@ impl ArrowMetricsBatchBuilder { fields.push(Field::new("metric_type", DataType::UInt8, false)); fields.push(Field::new("timestamp_secs", DataType::UInt64, false)); fields.push(Field::new("value", DataType::Float64, false)); + fields.push(Field::new("timeseries_id", DataType::Int64, false)); for &tag_key in &sorted_tag_keys { fields.push(Field::new( @@ -100,6 +104,7 @@ impl ArrowMetricsBatchBuilder { let mut metric_type_builder = UInt8Builder::with_capacity(num_rows); let mut timestamp_secs_builder = UInt64Builder::with_capacity(num_rows); let mut value_builder = Float64Builder::with_capacity(num_rows); + let mut timeseries_id_builder = Int64Builder::with_capacity(num_rows); let mut tag_builders: Vec> = sorted_tag_keys .iter() @@ -122,6 +127,11 @@ impl ArrowMetricsBatchBuilder { metric_type_builder.append_value(dp.metric_type as u8); timestamp_secs_builder.append_value(dp.timestamp_secs); value_builder.append_value(dp.value); + timeseries_id_builder.append_value(compute_timeseries_id( + &dp.metric_name, + dp.metric_type as u8, + &dp.tags, + )); // Only touch builders for tags this data point has. for (tag_key, tag_val) in &dp.tags { @@ -145,11 +155,12 @@ impl ArrowMetricsBatchBuilder { } } - let mut arrays: Vec = Vec::with_capacity(4 + sorted_tag_keys.len()); + let mut arrays: Vec = Vec::with_capacity(5 + sorted_tag_keys.len()); arrays.push(Arc::new(metric_name_builder.finish())); arrays.push(Arc::new(metric_type_builder.finish())); arrays.push(Arc::new(timestamp_secs_builder.finish())); arrays.push(Arc::new(value_builder.finish())); + arrays.push(Arc::new(timeseries_id_builder.finish())); for tag_builder in &mut tag_builders { arrays.push(Arc::new(tag_builder.finish())); @@ -265,6 +276,10 @@ pub fn ipc_to_json_values( serde_json::Value::Number(val.into()) } } + DataType::Int64 => { + let arr = column.as_primitive::(); + serde_json::Value::Number(arr.value(row_idx).into()) + } DataType::UInt64 => { let arr = column.as_primitive::(); serde_json::Value::Number(arr.value(row_idx).into()) @@ -387,8 +402,8 @@ mod tests { let batch = builder.finish(); assert_eq!(batch.num_rows(), 1); - // 4 fixed columns + 9 tag columns - assert_eq!(batch.num_columns(), 13); + // 5 fixed columns + 9 tag columns + assert_eq!(batch.num_columns(), 14); } #[test] @@ -471,8 +486,8 @@ mod tests { let batch = builder.finish(); assert_eq!(batch.num_rows(), 1); - // 4 fixed columns + 1 tag column (service_name) - assert_eq!(batch.num_columns(), 5); + // 5 fixed columns + 1 tag column (service_name) + assert_eq!(batch.num_columns(), 6); } #[test] @@ -507,8 +522,8 @@ mod tests { let batch = builder.finish(); assert_eq!(batch.num_rows(), 2); - // 4 fixed + 3 tag columns (env, host, region) - sorted alphabetically - assert_eq!(batch.num_columns(), 7); + // 5 fixed + 3 tag columns (env, host, region) - sorted alphabetically + assert_eq!(batch.num_columns(), 8); let schema = batch.schema(); let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); @@ -519,6 +534,7 @@ mod tests { "metric_type", "timestamp_secs", "value", + "timeseries_id", "env", "host", "region", @@ -569,8 +585,8 @@ mod tests { let batch = builder.finish(); assert_eq!(batch.num_rows(), 3); - // 4 fixed + 2 tags (a, b) - assert_eq!(batch.num_columns(), 6); + // 5 fixed + 2 tags (a, b) + assert_eq!(batch.num_columns(), 7); let schema = batch.schema(); let a_idx = schema.index_of("a").unwrap(); diff --git a/quickwit/quickwit-parquet-engine/Cargo.toml b/quickwit/quickwit-parquet-engine/Cargo.toml index ca1e8d72150..d52dcf01c1a 100644 --- a/quickwit/quickwit-parquet-engine/Cargo.toml +++ b/quickwit/quickwit-parquet-engine/Cargo.toml @@ -20,6 +20,7 @@ prost = { workspace = true } quickwit-common = { workspace = true } quickwit-dst = { workspace = true } quickwit-proto = { workspace = true } +siphasher = { workspace = true } sea-query = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/quickwit/quickwit-parquet-engine/src/lib.rs b/quickwit/quickwit-parquet-engine/src/lib.rs index d34c67c665d..180079e69a3 100644 --- a/quickwit/quickwit-parquet-engine/src/lib.rs +++ b/quickwit/quickwit-parquet-engine/src/lib.rs @@ -28,6 +28,7 @@ pub mod sort_fields; pub mod split; pub mod storage; pub mod table_config; +pub mod timeseries_id; #[cfg(any(test, feature = "testsuite"))] pub mod test_helpers; diff --git a/quickwit/quickwit-parquet-engine/src/schema/fields.rs b/quickwit/quickwit-parquet-engine/src/schema/fields.rs index b5d149a5b51..bcb0f17597a 100644 --- a/quickwit/quickwit-parquet-engine/src/schema/fields.rs +++ b/quickwit/quickwit-parquet-engine/src/schema/fields.rs @@ -29,6 +29,7 @@ pub const SORT_ORDER: &[&str] = &[ "datacenter", "region", "host", + "timeseries_id", "timestamp_secs", ]; @@ -41,6 +42,7 @@ pub enum ParquetField { TimestampSecs, StartTimestampSecs, Value, + TimeseriesId, TagService, TagEnv, TagDatacenter, @@ -61,6 +63,7 @@ impl ParquetField { Self::TimestampSecs => "timestamp_secs", Self::StartTimestampSecs => "start_timestamp_secs", Self::Value => "value", + Self::TimeseriesId => "timeseries_id", Self::TagService => "tag_service", Self::TagEnv => "tag_env", Self::TagDatacenter => "tag_datacenter", @@ -110,6 +113,8 @@ impl ParquetField { Self::TimestampSecs | Self::StartTimestampSecs => DataType::UInt64, // Metric value Self::Value => DataType::Float64, + // Deterministic hash of timeseries identity columns + Self::TimeseriesId => DataType::Int64, // Plain string for metric unit Self::MetricUnit => DataType::Utf8, // VARIANT type for semi-structured attributes @@ -145,6 +150,7 @@ impl ParquetField { Self::TimestampSecs, Self::StartTimestampSecs, Self::Value, + Self::TimeseriesId, Self::TagService, Self::TagEnv, Self::TagDatacenter, diff --git a/quickwit/quickwit-parquet-engine/src/timeseries_id.rs b/quickwit/quickwit-parquet-engine/src/timeseries_id.rs new file mode 100644 index 00000000000..83c337a5ec3 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/timeseries_id.rs @@ -0,0 +1,197 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Deterministic timeseries ID computation for the Parquet pipeline. +//! +//! The timeseries ID is a hash of all columns that identify a unique timeseries, +//! excluding temporal data (timestamps) and metric values. Two data points +//! from the same timeseries always produce the same ID, regardless of when +//! they were recorded or their payload. + +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; + +use siphasher::sip::SipHasher; + +/// Tag columns excluded from the timeseries ID hash. +/// +/// Temporal and value columns vary per data point within a timeseries and +/// must not contribute to the hash. +/// +/// - `timestamp_secs` and `value` are fixed fields on `MetricDataPoint` +/// (not in the tags HashMap) and are excluded by construction — they are +/// never passed to the hash function. +/// - `start_timestamp_secs` is the OTLP delta-window start time, stored +/// as a tag. It varies per data point so must be excluded here. +/// - `timestamp` is the generic well-known timestamp name from the sort +/// schema system. Excluded defensively in case it appears as a +/// user-provided attribute. +/// - DDSketch value columns (`count`, `sum`, `min`, `max`, `flags`, +/// `keys`, `counts`) are per-data-point sketch components. +pub const EXCLUDED_TAGS: &[&str] = &[ + "count", + "counts", + "flags", + "keys", + "max", + "min", + "start_timestamp_secs", + "sum", + "timestamp", + "timestamp_secs", + "value", +]; + +/// Compute a deterministic timeseries ID from metric identity columns. +/// +/// The hash includes: +/// - `metric_name` +/// - `metric_type` (Gauge=0, Sum=1, etc.) +/// - All tag key-value pairs except temporal columns (see [`EXCLUDED_TAGS`]) +/// +/// Tags are sorted by key before hashing so the result is independent of +/// HashMap iteration order. +/// +/// Uses SipHash-2-4 with fixed keys for deterministic, well-distributed output. +/// Returns an `i64` for the `timeseries_id` sort schema column. +pub fn compute_timeseries_id( + metric_name: &str, + metric_type: u8, + tags: &HashMap, +) -> i64 { + let mut hasher = SipHasher::new_with_keys(0, 0); + + metric_name.hash(&mut hasher); + metric_type.hash(&mut hasher); + + // Collect and sort tags for deterministic ordering. + let mut sorted_tags: Vec<(&str, &str)> = tags + .iter() + .filter(|(k, _)| !EXCLUDED_TAGS.contains(&k.as_str())) + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + sorted_tags.sort_unstable_by_key(|(k, _)| *k); + + for (key, value) in sorted_tags { + key.hash(&mut hasher); + value.hash(&mut hasher); + } + + hasher.finish() as i64 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_deterministic_same_input() { + let mut tags = HashMap::new(); + tags.insert("service".to_string(), "api".to_string()); + tags.insert("env".to_string(), "prod".to_string()); + tags.insert("host".to_string(), "node-1".to_string()); + + let id1 = compute_timeseries_id("cpu.usage", 0, &tags); + let id2 = compute_timeseries_id("cpu.usage", 0, &tags); + assert_eq!(id1, id2); + } + + #[test] + fn test_different_metric_name_different_id() { + let mut tags = HashMap::new(); + tags.insert("service".to_string(), "api".to_string()); + + let id1 = compute_timeseries_id("cpu.usage", 0, &tags); + let id2 = compute_timeseries_id("mem.usage", 0, &tags); + assert_ne!(id1, id2); + } + + #[test] + fn test_different_metric_type_different_id() { + let mut tags = HashMap::new(); + tags.insert("service".to_string(), "api".to_string()); + + let id_gauge = compute_timeseries_id("requests", 0, &tags); + let id_sum = compute_timeseries_id("requests", 1, &tags); + assert_ne!(id_gauge, id_sum); + } + + #[test] + fn test_different_tags_different_id() { + let mut tags1 = HashMap::new(); + tags1.insert("service".to_string(), "api".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("service".to_string(), "web".to_string()); + + let id1 = compute_timeseries_id("cpu.usage", 0, &tags1); + let id2 = compute_timeseries_id("cpu.usage", 0, &tags2); + assert_ne!(id1, id2); + } + + #[test] + fn test_excludes_start_timestamp_secs() { + let mut tags1 = HashMap::new(); + tags1.insert("service".to_string(), "api".to_string()); + tags1.insert("start_timestamp_secs".to_string(), "1000".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("service".to_string(), "api".to_string()); + tags2.insert("start_timestamp_secs".to_string(), "2000".to_string()); + + let id1 = compute_timeseries_id("cpu.usage", 0, &tags1); + let id2 = compute_timeseries_id("cpu.usage", 0, &tags2); + assert_eq!(id1, id2, "start_timestamp_secs should not affect the hash"); + } + + #[test] + fn test_empty_tags() { + let tags = HashMap::new(); + let id = compute_timeseries_id("cpu.usage", 0, &tags); + // Just verify it doesn't panic and returns a value. + let _ = id; + } + + #[test] + fn test_tag_insertion_order_independent() { + // Build two HashMaps with the same entries but different insertion order. + let mut tags1 = HashMap::new(); + tags1.insert("z_tag".to_string(), "z_val".to_string()); + tags1.insert("a_tag".to_string(), "a_val".to_string()); + tags1.insert("m_tag".to_string(), "m_val".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("a_tag".to_string(), "a_val".to_string()); + tags2.insert("m_tag".to_string(), "m_val".to_string()); + tags2.insert("z_tag".to_string(), "z_val".to_string()); + + let id1 = compute_timeseries_id("cpu.usage", 0, &tags1); + let id2 = compute_timeseries_id("cpu.usage", 0, &tags2); + assert_eq!(id1, id2, "tag insertion order must not affect the hash"); + } + + #[test] + fn test_tag_key_value_not_interchangeable() { + // Ensure ("a", "b") and ("b", "a") produce different hashes. + let mut tags1 = HashMap::new(); + tags1.insert("key".to_string(), "value".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("value".to_string(), "key".to_string()); + + let id1 = compute_timeseries_id("m", 0, &tags1); + let id2 = compute_timeseries_id("m", 0, &tags2); + assert_ne!(id1, id2); + } +} From 71d63a0039154b9408632e516d002e6b858e8297 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Thu, 9 Apr 2026 17:22:57 -0400 Subject: [PATCH 2/4] test: harden timeseries_id with stability pins and proptests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The timeseries_id hash is persisted to Parquet files — any change silently corrupts compaction and queries. Add: - 3 pinned stability tests with hardcoded expected hash values - 3 proptest properties (order independence, excluded tag immunity, extra-tag discrimination) each running 256 random cases - Boundary ambiguity test ({"ab":"c"} vs {"a":"bc"}) - Same-series-different-timestamp invariant test - All-excluded-tags coverage (every EXCLUDED_TAGS entry verified) - Edge cases: empty strings, unicode, 100-tag cardinality - Module-level doc explaining the stability contract Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/timeseries_id.rs | 345 ++++++++++++++++-- 1 file changed, 322 insertions(+), 23 deletions(-) diff --git a/quickwit/quickwit-parquet-engine/src/timeseries_id.rs b/quickwit/quickwit-parquet-engine/src/timeseries_id.rs index 83c337a5ec3..6511ac2e9bd 100644 --- a/quickwit/quickwit-parquet-engine/src/timeseries_id.rs +++ b/quickwit/quickwit-parquet-engine/src/timeseries_id.rs @@ -18,6 +18,19 @@ //! excluding temporal data (timestamps) and metric values. Two data points //! from the same timeseries always produce the same ID, regardless of when //! they were recorded or their payload. +//! +//! # Stability contract +//! +//! The hash value for a given set of inputs **must never change** across builds, +//! Rust versions, or process restarts. It is persisted in Parquet files and used +//! as a sort/grouping key. Any change would silently corrupt compaction and +//! query results. +//! +//! The implementation uses SipHash-2-4 with fixed keys `(0, 0)` from the +//! `siphasher` crate, and feeds bytes via Rust's `Hash` trait. The `Hash` +//! implementation for `str` writes `bytes ++ [0xFF]` and for `u8` writes +//! `[byte]`; this has been stable since Rust 1.0. A pinned stability test +//! (`test_hash_stability_pinned`) will catch any regression. use std::collections::HashMap; use std::hash::{Hash, Hasher}; @@ -55,16 +68,21 @@ pub const EXCLUDED_TAGS: &[&str] = &[ /// Compute a deterministic timeseries ID from metric identity columns. /// -/// The hash includes: -/// - `metric_name` -/// - `metric_type` (Gauge=0, Sum=1, etc.) -/// - All tag key-value pairs except temporal columns (see [`EXCLUDED_TAGS`]) +/// # Hash inputs (in order) +/// +/// 1. `metric_name` (string, via `Hash` trait — writes bytes + 0xFF) +/// 2. `metric_type` (u8, via `Hash` trait — writes single byte) +/// 3. For each tag in **lexicographic key order**, excluding [`EXCLUDED_TAGS`]: +/// - tag key (string) +/// - tag value (string) /// -/// Tags are sorted by key before hashing so the result is independent of -/// HashMap iteration order. +/// The tag sort ensures the result is independent of HashMap iteration order. /// -/// Uses SipHash-2-4 with fixed keys for deterministic, well-distributed output. -/// Returns an `i64` for the `timeseries_id` sort schema column. +/// # Algorithm +/// +/// SipHash-2-4 with fixed keys `(0, 0)` via the `siphasher` crate. +/// Returns the 64-bit hash truncated to `i64` for the `timeseries_id` +/// sort schema column. pub fn compute_timeseries_id( metric_name: &str, metric_type: u8, @@ -95,6 +113,53 @@ pub fn compute_timeseries_id( mod tests { use super::*; + // --------------------------------------------------------------- + // Stability: pinned expected values + // + // If any of these fail, the on-disk hash contract is broken. + // DO NOT update the expected values — fix the implementation. + // --------------------------------------------------------------- + + #[test] + fn test_hash_stability_pinned_no_tags() { + let tags = HashMap::new(); + let id = compute_timeseries_id("cpu.usage", 0, &tags); + assert_eq!( + id, 8585688161913568022, + "pinned hash for (cpu.usage, Gauge, no tags) must not change" + ); + } + + #[test] + fn test_hash_stability_pinned_with_tags() { + let mut tags = HashMap::new(); + tags.insert("env".to_string(), "prod".to_string()); + tags.insert("host".to_string(), "node-1".to_string()); + tags.insert("service".to_string(), "api".to_string()); + + let id = compute_timeseries_id("cpu.usage", 0, &tags); + assert_eq!( + id, -1249054409005369755, + "pinned hash for (cpu.usage, Gauge, env=prod, host=node-1, service=api) must not change" + ); + } + + #[test] + fn test_hash_stability_pinned_sum_type() { + let mut tags = HashMap::new(); + tags.insert("service".to_string(), "web".to_string()); + + let id = compute_timeseries_id("http.requests", 1, &tags); + assert_eq!( + id, -2615727422124831097, + "pinned hash for (http.requests, Sum, service=web) must not change" + ); + } + + // --------------------------------------------------------------- + // Core invariant: same series identity → same hash + // --------------------------------------------------------------- + #[test] fn test_deterministic_same_input() { let mut tags = HashMap::new(); @@ -107,6 +172,36 @@ mod tests { assert_eq!(id1, id2); } + #[test] + fn test_same_series_different_timestamps_same_id() { + // Two data points from the same series that differ only in + // temporal/value columns (which are excluded) must hash equal. + let mut tags1 = HashMap::new(); + tags1.insert("service".to_string(), "api".to_string()); + tags1.insert("host".to_string(), "node-1".to_string()); + tags1.insert("start_timestamp_secs".to_string(), "1000".to_string()); + tags1.insert("timestamp_secs".to_string(), "1010".to_string()); + tags1.insert("value".to_string(), "42.5".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("service".to_string(), "api".to_string()); + tags2.insert("host".to_string(), "node-1".to_string()); + tags2.insert("start_timestamp_secs".to_string(), "2000".to_string()); + tags2.insert("timestamp_secs".to_string(), "2010".to_string()); + tags2.insert("value".to_string(), "99.9".to_string()); + + let id1 = compute_timeseries_id("cpu.usage", 0, &tags1); + let id2 = compute_timeseries_id("cpu.usage", 0, &tags2); + assert_eq!( + id1, id2, + "same series with different timestamps/values must produce the same hash" + ); + } + + // --------------------------------------------------------------- + // Discrimination: different identity → different hash + // --------------------------------------------------------------- + #[test] fn test_different_metric_name_different_id() { let mut tags = HashMap::new(); @@ -128,7 +223,7 @@ mod tests { } #[test] - fn test_different_tags_different_id() { + fn test_different_tag_value_different_id() { let mut tags1 = HashMap::new(); tags1.insert("service".to_string(), "api".to_string()); @@ -141,31 +236,57 @@ mod tests { } #[test] - fn test_excludes_start_timestamp_secs() { + fn test_extra_tag_different_id() { let mut tags1 = HashMap::new(); tags1.insert("service".to_string(), "api".to_string()); - tags1.insert("start_timestamp_secs".to_string(), "1000".to_string()); let mut tags2 = HashMap::new(); tags2.insert("service".to_string(), "api".to_string()); - tags2.insert("start_timestamp_secs".to_string(), "2000".to_string()); + tags2.insert("env".to_string(), "prod".to_string()); let id1 = compute_timeseries_id("cpu.usage", 0, &tags1); let id2 = compute_timeseries_id("cpu.usage", 0, &tags2); - assert_eq!(id1, id2, "start_timestamp_secs should not affect the hash"); + assert_ne!(id1, id2, "adding a tag must change the hash"); } #[test] - fn test_empty_tags() { - let tags = HashMap::new(); - let id = compute_timeseries_id("cpu.usage", 0, &tags); - // Just verify it doesn't panic and returns a value. - let _ = id; + fn test_tag_key_value_not_interchangeable() { + // ("key", "value") vs ("value", "key") — different tag names. + let mut tags1 = HashMap::new(); + tags1.insert("key".to_string(), "value".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("value".to_string(), "key".to_string()); + + let id1 = compute_timeseries_id("m", 0, &tags1); + let id2 = compute_timeseries_id("m", 0, &tags2); + assert_ne!(id1, id2); + } + + #[test] + fn test_boundary_ambiguity_key_value_split() { + // {"ab": "c"} vs {"a": "bc"} must differ. + // This works because str::hash writes bytes + 0xFF separator. + let mut tags1 = HashMap::new(); + tags1.insert("ab".to_string(), "c".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("a".to_string(), "bc".to_string()); + + let id1 = compute_timeseries_id("m", 0, &tags1); + let id2 = compute_timeseries_id("m", 0, &tags2); + assert_ne!( + id1, id2, + "different key/value splits must produce different hashes" + ); } + // --------------------------------------------------------------- + // Order independence + // --------------------------------------------------------------- + #[test] fn test_tag_insertion_order_independent() { - // Build two HashMaps with the same entries but different insertion order. let mut tags1 = HashMap::new(); tags1.insert("z_tag".to_string(), "z_val".to_string()); tags1.insert("a_tag".to_string(), "a_val".to_string()); @@ -181,17 +302,195 @@ mod tests { assert_eq!(id1, id2, "tag insertion order must not affect the hash"); } + // --------------------------------------------------------------- + // Excluded columns: every entry in EXCLUDED_TAGS must be ignored + // --------------------------------------------------------------- + #[test] - fn test_tag_key_value_not_interchangeable() { - // Ensure ("a", "b") and ("b", "a") produce different hashes. + fn test_all_excluded_tags_are_ignored() { + let mut base_tags = HashMap::new(); + base_tags.insert("service".to_string(), "api".to_string()); + base_tags.insert("env".to_string(), "prod".to_string()); + + let base_id = compute_timeseries_id("cpu.usage", 0, &base_tags); + + for &excluded in EXCLUDED_TAGS { + let mut tags_with_excluded = base_tags.clone(); + tags_with_excluded.insert(excluded.to_string(), "some_value".to_string()); + + let id = compute_timeseries_id("cpu.usage", 0, &tags_with_excluded); + assert_eq!( + id, base_id, + "excluded tag '{}' must not affect the hash", + excluded + ); + } + } + + #[test] + fn test_excluded_tags_with_varying_values() { let mut tags1 = HashMap::new(); - tags1.insert("key".to_string(), "value".to_string()); + tags1.insert("service".to_string(), "api".to_string()); let mut tags2 = HashMap::new(); - tags2.insert("value".to_string(), "key".to_string()); + tags2.insert("service".to_string(), "api".to_string()); + + // Add every excluded tag with different values to each. + for (i, &excluded) in EXCLUDED_TAGS.iter().enumerate() { + tags1.insert(excluded.to_string(), format!("val_a_{}", i)); + tags2.insert(excluded.to_string(), format!("val_b_{}", i)); + } + + let id1 = compute_timeseries_id("cpu.usage", 0, &tags1); + let id2 = compute_timeseries_id("cpu.usage", 0, &tags2); + assert_eq!( + id1, id2, + "excluded tags with different values must not affect the hash" + ); + } + + // --------------------------------------------------------------- + // Edge cases + // --------------------------------------------------------------- + + #[test] + fn test_empty_tags() { + let tags = HashMap::new(); + // Must not panic; value is tested by the pinned stability test. + let _ = compute_timeseries_id("cpu.usage", 0, &tags); + } + + #[test] + fn test_empty_string_tag_key_and_value() { + let mut tags1 = HashMap::new(); + tags1.insert("".to_string(), "".to_string()); + + let mut tags2 = HashMap::new(); + tags2.insert("a".to_string(), "".to_string()); + + let id1 = compute_timeseries_id("m", 0, &tags1); + let id2 = compute_timeseries_id("m", 0, &tags2); + assert_ne!(id1, id2, "empty key must differ from non-empty key"); + } + + #[test] + fn test_unicode_tags() { + let mut tags1 = HashMap::new(); + tags1.insert("region".to_string(), "\u{1F30D}".to_string()); // globe emoji + + let mut tags2 = HashMap::new(); + tags2.insert("region".to_string(), "\u{1F30E}".to_string()); // different globe let id1 = compute_timeseries_id("m", 0, &tags1); let id2 = compute_timeseries_id("m", 0, &tags2); + assert_ne!(id1, id2, "different unicode values must produce different hashes"); + } + + #[test] + fn test_many_tags() { + let mut tags = HashMap::new(); + for i in 0..100 { + tags.insert(format!("tag_{:03}", i), format!("val_{}", i)); + } + let id1 = compute_timeseries_id("m", 0, &tags); + let id2 = compute_timeseries_id("m", 0, &tags); + assert_eq!(id1, id2, "hash must be deterministic with many tags"); + } + + #[test] + fn test_empty_metric_name() { + let tags = HashMap::new(); + let id1 = compute_timeseries_id("", 0, &tags); + let id2 = compute_timeseries_id("x", 0, &tags); assert_ne!(id1, id2); } } + +/// Property-based tests for order independence and collision resistance. +#[cfg(test)] +mod proptests { + use super::*; + + use proptest::prelude::*; + + /// Generate a HashMap with 0..max_tags entries. + fn arb_tags(max_tags: usize) -> impl Strategy> { + proptest::collection::hash_map("[a-z]{1,8}", "[a-z0-9]{0,16}", 0..max_tags) + } + + proptest! { + /// Core property: for any tag set, the hash must be the same + /// regardless of which order the entries are iterated. + /// + /// We verify this by computing twice (HashMap iteration order is + /// not guaranteed to be the same across builds, but the sort inside + /// compute_timeseries_id must canonicalize it). We also rebuild + /// the map in reverse-sorted key order to force a different + /// internal layout. + #[test] + fn prop_order_independent( + metric_name in "[a-z.]{1,20}", + metric_type in 0u8..5, + tags in arb_tags(15), + ) { + let id_original = compute_timeseries_id(&metric_name, metric_type, &tags); + + // Rebuild the HashMap with keys inserted in reverse-sorted order. + let mut keys: Vec<&String> = tags.keys().collect(); + keys.sort(); + keys.reverse(); + let mut reversed = HashMap::with_capacity(tags.len()); + for key in keys { + reversed.insert(key.clone(), tags[key].clone()); + } + + let id_reversed = compute_timeseries_id(&metric_name, metric_type, &reversed); + prop_assert_eq!(id_original, id_reversed, + "hash must be identical regardless of tag insertion order"); + } + + /// Excluded tags must never affect the hash. + #[test] + fn prop_excluded_tags_ignored( + metric_name in "[a-z.]{1,20}", + metric_type in 0u8..5, + base_tags in arb_tags(10), + excluded_value in "[a-z0-9]{1,16}", + ) { + let base_id = compute_timeseries_id(&metric_name, metric_type, &base_tags); + + for &excluded_key in EXCLUDED_TAGS { + let mut augmented = base_tags.clone(); + augmented.insert(excluded_key.to_string(), excluded_value.clone()); + + let augmented_id = compute_timeseries_id(&metric_name, metric_type, &augmented); + prop_assert_eq!(base_id, augmented_id, + "excluded tag '{}' must not change the hash", excluded_key); + } + } + + /// Adding a non-excluded tag must (almost certainly) change the hash. + /// We use a fresh key name that won't collide with excluded tags. + #[test] + fn prop_extra_tag_changes_hash( + metric_name in "[a-z.]{1,20}", + metric_type in 0u8..5, + base_tags in arb_tags(5), + extra_key in "xtag_[a-z]{1,8}", + extra_value in "[a-z0-9]{1,8}", + ) { + // Skip if the extra key already exists in base_tags. + prop_assume!(!base_tags.contains_key(&extra_key)); + + let base_id = compute_timeseries_id(&metric_name, metric_type, &base_tags); + + let mut augmented = base_tags.clone(); + augmented.insert(extra_key, extra_value); + let augmented_id = compute_timeseries_id(&metric_name, metric_type, &augmented); + + prop_assert_ne!(base_id, augmented_id, + "adding a non-excluded tag should change the hash (collision is theoretically \ + possible but astronomically unlikely for 64-bit hash)"); + } + } +} From 5582dcced63e4cfbe326d852e48290c3b494c56f Mon Sep 17 00:00:00 2001 From: George Talbot Date: Thu, 9 Apr 2026 18:15:50 -0400 Subject: [PATCH 3/4] style: rustfmt timeseries_id.rs Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/timeseries_id.rs | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/quickwit/quickwit-parquet-engine/src/timeseries_id.rs b/quickwit/quickwit-parquet-engine/src/timeseries_id.rs index 6511ac2e9bd..4c44a350d14 100644 --- a/quickwit/quickwit-parquet-engine/src/timeseries_id.rs +++ b/quickwit/quickwit-parquet-engine/src/timeseries_id.rs @@ -42,16 +42,14 @@ use siphasher::sip::SipHasher; /// Temporal and value columns vary per data point within a timeseries and /// must not contribute to the hash. /// -/// - `timestamp_secs` and `value` are fixed fields on `MetricDataPoint` -/// (not in the tags HashMap) and are excluded by construction — they are -/// never passed to the hash function. -/// - `start_timestamp_secs` is the OTLP delta-window start time, stored -/// as a tag. It varies per data point so must be excluded here. -/// - `timestamp` is the generic well-known timestamp name from the sort -/// schema system. Excluded defensively in case it appears as a -/// user-provided attribute. -/// - DDSketch value columns (`count`, `sum`, `min`, `max`, `flags`, -/// `keys`, `counts`) are per-data-point sketch components. +/// - `timestamp_secs` and `value` are fixed fields on `MetricDataPoint` (not in the tags HashMap) +/// and are excluded by construction — they are never passed to the hash function. +/// - `start_timestamp_secs` is the OTLP delta-window start time, stored as a tag. It varies per +/// data point so must be excluded here. +/// - `timestamp` is the generic well-known timestamp name from the sort schema system. Excluded +/// defensively in case it appears as a user-provided attribute. +/// - DDSketch value columns (`count`, `sum`, `min`, `max`, `flags`, `keys`, `counts`) are +/// per-data-point sketch components. pub const EXCLUDED_TAGS: &[&str] = &[ "count", "counts", @@ -140,7 +138,8 @@ mod tests { let id = compute_timeseries_id("cpu.usage", 0, &tags); assert_eq!( id, -1249054409005369755, - "pinned hash for (cpu.usage, Gauge, env=prod, host=node-1, service=api) must not change" + "pinned hash for (cpu.usage, Gauge, env=prod, host=node-1, service=api) must not \ + change" ); } @@ -383,7 +382,10 @@ mod tests { let id1 = compute_timeseries_id("m", 0, &tags1); let id2 = compute_timeseries_id("m", 0, &tags2); - assert_ne!(id1, id2, "different unicode values must produce different hashes"); + assert_ne!( + id1, id2, + "different unicode values must produce different hashes" + ); } #[test] @@ -409,10 +411,10 @@ mod tests { /// Property-based tests for order independence and collision resistance. #[cfg(test)] mod proptests { - use super::*; - use proptest::prelude::*; + use super::*; + /// Generate a HashMap with 0..max_tags entries. fn arb_tags(max_tags: usize) -> impl Strategy> { proptest::collection::hash_map("[a-z]{1,8}", "[a-z0-9]{0,16}", 0..max_tags) From 60d859c0c38de34af14d2506e67f1bb53016a67d Mon Sep 17 00:00:00 2001 From: George Talbot Date: Fri, 10 Apr 2026 06:05:10 -0400 Subject: [PATCH 4/4] fix: add FOR UPDATE row locking to delete_metrics_splits Mirror the CTE + FOR UPDATE pattern from delete_splits to prevent stale-state races. Without row locking, a concurrent mark_metrics_splits_for_deletion can commit between the state read and the DELETE, causing spurious FailedPrecondition errors and retry churn. The new query locks the target rows before reading their state, reports not-deletable (Staged/Published) and not-found splits separately, and only deletes when all requested splits are in MarkedForDeletion state. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/metastore/postgres/metastore.rs | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index adc3bc1fc19..36f648e810f 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2625,6 +2625,7 @@ impl MetastoreService for PostgresqlMetastore { if request.split_ids.is_empty() { return Ok(EmptyResponse {}); } + let index_uid: IndexUid = request.index_uid().clone(); info!( index_uid = %request.index_uid(), @@ -2632,16 +2633,23 @@ impl MetastoreService for PostgresqlMetastore { "deleting metrics splits" ); - // Only delete splits that are marked for deletion - // Match the non-metrics delete_splits pattern: distinguish - // "not found" (warn + succeed) from "not deletable" (FailedPrecondition). + // Match the non-metrics delete_splits pattern: CTE with FOR UPDATE + // to lock rows before reading state, avoiding stale-state races under + // concurrent mark_metrics_splits_for_deletion. Distinguishes "not found" + // (warn + succeed) from "not deletable" (FailedPrecondition). const DELETE_SPLITS_QUERY: &str = r#" WITH input_splits AS ( SELECT input_splits.split_id, metrics_splits.split_state FROM UNNEST($2::text[]) AS input_splits(split_id) - LEFT JOIN metrics_splits - ON metrics_splits.index_uid = $1 - AND metrics_splits.split_id = input_splits.split_id + LEFT JOIN ( + SELECT split_id, split_state + FROM metrics_splits + WHERE + index_uid = $1 + AND split_id = ANY($2) + FOR UPDATE + ) AS metrics_splits + USING (split_id) ), deleted AS ( DELETE FROM metrics_splits @@ -2680,7 +2688,7 @@ impl MetastoreService for PostgresqlMetastore { .bind(&request.split_ids) .fetch_one(&self.connection_pool) .await - .map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?; + .map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?; if !not_deletable_ids.is_empty() { let message = format!(