Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 76 additions & 43 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2625,55 +2625,88 @@ impl MetastoreService for PostgresqlMetastore {
if request.split_ids.is_empty() {
return Ok(EmptyResponse {});
}
let index_uid: IndexUid = request.index_uid().clone();
let split_ids = request.split_ids;

info!(
index_uid = %request.index_uid(),
split_ids = ?request.split_ids,
"deleting metrics splits"
);

// Only delete splits that are marked for deletion
const DELETE_SPLITS_QUERY: &str = r#"
DELETE FROM metrics_splits
WHERE
index_uid = $1
AND split_id = ANY($2)
AND split_state = 'MarkedForDeletion'
RETURNING split_id
// Mirrors the CTE + FOR UPDATE pattern from delete_splits to avoid
// stale-state races under concurrent mark_metrics_splits_for_deletion.
// The FOR UPDATE in the subquery locks the rows before we read their
// state, ensuring the DELETE sees the latest committed split_state.
const DELETE_METRICS_SPLITS_QUERY: &str = r#"
WITH input_splits AS (
SELECT input_splits.split_id, metrics_splits.split_state
FROM UNNEST($2) AS input_splits(split_id)
LEFT JOIN (
SELECT split_id, split_state
FROM metrics_splits
WHERE
index_uid = $1
AND split_id = ANY($2)
FOR UPDATE
) AS metrics_splits
USING (split_id)
),
deleted_splits AS (
DELETE FROM metrics_splits
USING input_splits
WHERE
metrics_splits.index_uid = $1
AND metrics_splits.split_id = input_splits.split_id
AND NOT EXISTS (
SELECT 1
FROM input_splits
WHERE
split_state IN ('Staged', 'Published')
)
)
SELECT
COUNT(split_state),
COUNT(1) FILTER (WHERE split_state = 'MarkedForDeletion'),
COALESCE(ARRAY_AGG(split_id) FILTER (WHERE split_state IN ('Staged', 'Published')), ARRAY[]::TEXT[]),
COALESCE(ARRAY_AGG(split_id) FILTER (WHERE split_state IS NULL), ARRAY[]::TEXT[])
FROM input_splits
"#;

let deleted_split_ids: Vec<String> = sqlx::query_scalar(DELETE_SPLITS_QUERY)
.bind(request.index_uid())
.bind(&request.split_ids)
.fetch_all(&self.connection_pool)
let (num_found_splits, num_deleted_splits, not_deletable_split_ids, not_found_split_ids): (
i64,
i64,
Vec<String>,
Vec<String>,
) = sqlx::query_as(DELETE_METRICS_SPLITS_QUERY)
.bind(&index_uid)
.bind(split_ids)
.fetch_one(&self.connection_pool)
.await
.map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?;

// Log if some splits were not deleted (either non-existent or not
// in MarkedForDeletion state). Delete is idempotent — we don't error
// for missing splits.
if deleted_split_ids.len() != request.split_ids.len() {
let not_deleted: Vec<String> = request
.split_ids
.iter()
.filter(|id| !deleted_split_ids.contains(id))
.cloned()
.collect();
.map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?;

if !not_deleted.is_empty() {
warn!(
index_uid = %request.index_uid(),
not_deleted = ?not_deleted,
"some metrics splits were not deleted (non-existent or not marked for deletion)"
);
}
if num_found_splits == 0
&& index_opt_for_uid(&self.connection_pool, index_uid.clone(), false)
.await?
.is_none()
{
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_uid.index_id,
}));
}
if !not_deletable_split_ids.is_empty() {
let message = format!(
"metrics splits `{}` are not deletable",
not_deletable_split_ids.join(", ")
);
let entity = EntityKind::Splits {
split_ids: not_deletable_split_ids,
};
return Err(MetastoreError::FailedPrecondition { entity, message });
}
info!(%index_uid, "deleted {} metrics splits from index", num_deleted_splits);

info!(
index_uid = %request.index_uid(),
deleted_count = deleted_split_ids.len(),
"deleted metrics splits successfully"
);
if !not_found_split_ids.is_empty() {
warn!(
%index_uid,
split_ids=?PrettySample::new(&not_found_split_ids, 5),
"{} metrics splits were not found and could not be deleted",
not_found_split_ids.len()
);
}
Ok(EmptyResponse {})
}
}
Expand Down
40 changes: 28 additions & 12 deletions quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ use std::io::Cursor;
use std::sync::Arc;

use arrow::array::{
ArrayRef, Float64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder, UInt64Builder,
ArrayRef, Float64Builder, Int64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder,
UInt64Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema};
use arrow::ipc::reader::StreamReader;
use arrow::ipc::writer::StreamWriter;
use quickwit_parquet_engine::timeseries_id::compute_timeseries_id;
use quickwit_proto::bytes::Bytes;
use quickwit_proto::ingest::{DocBatchV2, DocFormat};
use quickwit_proto::types::DocUid;
Expand Down Expand Up @@ -73,8 +75,9 @@ impl ArrowMetricsBatchBuilder {
}
let sorted_tag_keys: Vec<&str> = tag_keys.into_iter().collect();

// Build the Arrow schema dynamically
let mut fields = Vec::with_capacity(4 + sorted_tag_keys.len());
// Build the Arrow schema dynamically.
// 5 fixed columns: metric_name, metric_type, timestamp_secs, value, timeseries_id
let mut fields = Vec::with_capacity(5 + sorted_tag_keys.len());
fields.push(Field::new(
"metric_name",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
Expand All @@ -83,6 +86,7 @@ impl ArrowMetricsBatchBuilder {
fields.push(Field::new("metric_type", DataType::UInt8, false));
fields.push(Field::new("timestamp_secs", DataType::UInt64, false));
fields.push(Field::new("value", DataType::Float64, false));
fields.push(Field::new("timeseries_id", DataType::Int64, false));

for &tag_key in &sorted_tag_keys {
fields.push(Field::new(
Expand All @@ -100,6 +104,7 @@ impl ArrowMetricsBatchBuilder {
let mut metric_type_builder = UInt8Builder::with_capacity(num_rows);
let mut timestamp_secs_builder = UInt64Builder::with_capacity(num_rows);
let mut value_builder = Float64Builder::with_capacity(num_rows);
let mut timeseries_id_builder = Int64Builder::with_capacity(num_rows);

let mut tag_builders: Vec<StringDictionaryBuilder<Int32Type>> = sorted_tag_keys
.iter()
Expand All @@ -122,6 +127,11 @@ impl ArrowMetricsBatchBuilder {
metric_type_builder.append_value(dp.metric_type as u8);
timestamp_secs_builder.append_value(dp.timestamp_secs);
value_builder.append_value(dp.value);
timeseries_id_builder.append_value(compute_timeseries_id(
&dp.metric_name,
dp.metric_type as u8,
&dp.tags,
));

// Only touch builders for tags this data point has.
for (tag_key, tag_val) in &dp.tags {
Expand All @@ -145,11 +155,12 @@ impl ArrowMetricsBatchBuilder {
}
}

let mut arrays: Vec<ArrayRef> = Vec::with_capacity(4 + sorted_tag_keys.len());
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(5 + sorted_tag_keys.len());
arrays.push(Arc::new(metric_name_builder.finish()));
arrays.push(Arc::new(metric_type_builder.finish()));
arrays.push(Arc::new(timestamp_secs_builder.finish()));
arrays.push(Arc::new(value_builder.finish()));
arrays.push(Arc::new(timeseries_id_builder.finish()));

for tag_builder in &mut tag_builders {
arrays.push(Arc::new(tag_builder.finish()));
Expand Down Expand Up @@ -265,6 +276,10 @@ pub fn ipc_to_json_values(
serde_json::Value::Number(val.into())
}
}
DataType::Int64 => {
let arr = column.as_primitive::<arrow::datatypes::Int64Type>();
serde_json::Value::Number(arr.value(row_idx).into())
}
DataType::UInt64 => {
let arr = column.as_primitive::<arrow::datatypes::UInt64Type>();
serde_json::Value::Number(arr.value(row_idx).into())
Expand Down Expand Up @@ -387,8 +402,8 @@ mod tests {

let batch = builder.finish();
assert_eq!(batch.num_rows(), 1);
// 4 fixed columns + 9 tag columns
assert_eq!(batch.num_columns(), 13);
// 5 fixed columns + 9 tag columns
assert_eq!(batch.num_columns(), 14);
}

#[test]
Expand Down Expand Up @@ -471,8 +486,8 @@ mod tests {
let batch = builder.finish();

assert_eq!(batch.num_rows(), 1);
// 4 fixed columns + 1 tag column (service_name)
assert_eq!(batch.num_columns(), 5);
// 5 fixed columns + 1 tag column (service_name)
assert_eq!(batch.num_columns(), 6);
}

#[test]
Expand Down Expand Up @@ -507,8 +522,8 @@ mod tests {

let batch = builder.finish();
assert_eq!(batch.num_rows(), 2);
// 4 fixed + 3 tag columns (env, host, region) - sorted alphabetically
assert_eq!(batch.num_columns(), 7);
// 5 fixed + 3 tag columns (env, host, region) - sorted alphabetically
assert_eq!(batch.num_columns(), 8);

let schema = batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
Expand All @@ -519,6 +534,7 @@ mod tests {
"metric_type",
"timestamp_secs",
"value",
"timeseries_id",
"env",
"host",
"region",
Expand Down Expand Up @@ -569,8 +585,8 @@ mod tests {

let batch = builder.finish();
assert_eq!(batch.num_rows(), 3);
// 4 fixed + 2 tags (a, b)
assert_eq!(batch.num_columns(), 6);
// 5 fixed + 2 tags (a, b)
assert_eq!(batch.num_columns(), 7);

let schema = batch.schema();
let a_idx = schema.index_of("a").unwrap();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-parquet-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ prost = { workspace = true }
quickwit-common = { workspace = true }
quickwit-dst = { workspace = true }
quickwit-proto = { workspace = true }
siphasher = { workspace = true }
sea-query = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-parquet-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod sort_fields;
pub mod split;
pub mod storage;
pub mod table_config;
pub mod timeseries_id;

#[cfg(any(test, feature = "testsuite"))]
pub mod test_helpers;
6 changes: 6 additions & 0 deletions quickwit/quickwit-parquet-engine/src/schema/fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub const SORT_ORDER: &[&str] = &[
"datacenter",
"region",
"host",
"timeseries_id",
"timestamp_secs",
];

Expand All @@ -41,6 +42,7 @@ pub enum ParquetField {
TimestampSecs,
StartTimestampSecs,
Value,
TimeseriesId,
TagService,
TagEnv,
TagDatacenter,
Expand All @@ -61,6 +63,7 @@ impl ParquetField {
Self::TimestampSecs => "timestamp_secs",
Self::StartTimestampSecs => "start_timestamp_secs",
Self::Value => "value",
Self::TimeseriesId => "timeseries_id",
Self::TagService => "tag_service",
Self::TagEnv => "tag_env",
Self::TagDatacenter => "tag_datacenter",
Expand Down Expand Up @@ -110,6 +113,8 @@ impl ParquetField {
Self::TimestampSecs | Self::StartTimestampSecs => DataType::UInt64,
// Metric value
Self::Value => DataType::Float64,
// Deterministic hash of timeseries identity columns
Self::TimeseriesId => DataType::Int64,
// Plain string for metric unit
Self::MetricUnit => DataType::Utf8,
// VARIANT type for semi-structured attributes
Expand Down Expand Up @@ -145,6 +150,7 @@ impl ParquetField {
Self::TimestampSecs,
Self::StartTimestampSecs,
Self::Value,
Self::TimeseriesId,
Self::TagService,
Self::TagEnv,
Self::TagDatacenter,
Expand Down
Loading
Loading