diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 8daba8b775854..08dade2315538 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -1853,6 +1853,9 @@ pub struct UsageAuditMetrics { pub step_state: Counter, /// Time spent doing math pub step_math: Counter, + /// Count of shards skipped during a referenced-usage computation because + /// computing their usage panicked (e.g. an undecodable rollup). + pub referenced_shard_failures: IntCounter, } impl UsageAuditMetrics { @@ -1890,6 +1893,10 @@ impl UsageAuditMetrics { step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]), step_state: step_timings.with_label_values(&["state"]), step_math: step_timings.with_label_values(&["math"]), + referenced_shard_failures: registry.register(metric!( + name: "mz_persist_usage_referenced_shard_failures", + help: "count of shards skipped during referenced-usage computation due to a panic", + )), } } } diff --git a/src/persist-client/src/usage.rs b/src/persist-client/src/usage.rs index 2f1d87d0db46f..661b320e493eb 100644 --- a/src/persist-client/src/usage.rs +++ b/src/persist-client/src/usage.rs @@ -9,12 +9,15 @@ //! Introspection of storage utilization by persist +use std::any::Any; use std::collections::BTreeMap; +use std::panic::AssertUnwindSafe; use std::sync::Arc; use std::time::Instant; use futures::stream::{FuturesUnordered, StreamExt}; use mz_ore::cast::CastFrom; +use mz_ore::future::OreFutureExt; use mz_persist::location::Blob; use tokio::sync::Semaphore; use tracing::{error, info}; @@ -25,6 +28,18 @@ use crate::internal::state::HollowBlobRef; use crate::internal::state_versions::StateVersions; use crate::{Metrics, PersistClient, ShardId, retry_external}; +/// Best-effort extraction of the message from a caught panic payload, which is +/// almost always a `&str` or `String`. +fn panic_message(payload: &(dyn Any + Send)) -> &str { + if let Some(s) = payload.downcast_ref::<&'static str>() { + s + } else if let Some(s) = payload.downcast_ref::() { + s.as_str() + } else { + "" + } +} + /// A breakdown of the size of various contributions to a shard's blob /// usage that is actively referenced by any live state in Consensus. #[derive(Clone, Debug)] @@ -265,7 +280,27 @@ impl StorageUsageClient { .acquire() .await .expect("acquiring permit from open semaphore"); - let shard_usage = self.shard_usage_referenced(shard_id).await; + // Computing a shard's usage requires decoding its rollups, + // which `expect`s the encoded state to be valid. This is a + // best-effort billing scan that runs periodically across every + // shard, so an undecodable (e.g. corrupt) rollup in any one + // shard must not abort the whole process. Catch the panic, + // skip the shard, and keep going. + let shard_usage = AssertUnwindSafe(self.shard_usage_referenced(shard_id)) + .ore_catch_unwind() + .await + .unwrap_or_else(|payload| { + self.metrics.audit.referenced_shard_failures.inc(); + error!( + "skipping shard {shard_id} in referenced storage usage \ + computation due to a panic: {}", + panic_message(&*payload), + ); + ShardUsageReferenced { + batches_bytes: 0, + rollup_bytes: 0, + } + }); (shard_id, shard_usage) }; by_shard_futures.push(shard_usage_fut); @@ -980,6 +1015,65 @@ mod tests { assert_eq!(shard_usage_referenced.batches_bytes, batches_size); } + // A corrupt (undecodable) rollup in one shard must not abort the whole + // process during the periodic, best-effort `shards_usage_referenced` scan. + // Instead, the bad shard is skipped and the rest are still reported. + #[mz_persist_proc::test(tokio::test)] + #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented + async fn usage_referenced_undecodable_rollup(dyncfgs: ConfigUpdates) { + mz_ore::test::init_logging(); + + let data = [ + (("1".to_owned(), "one".to_owned()), 1, 1), + (("2".to_owned(), "two".to_owned()), 2, 1), + ]; + + let shard_id = ShardId::new(); + let client = new_test_client(&dyncfgs).await; + let (mut write, _read) = client + .expect_open::(shard_id) + .await; + let mut b1 = write.expect_batch(&data[..], 0, 3).await; + write + .expect_compare_and_append_batch(&mut [&mut b1], 0, 3) + .await; + + let usage = StorageUsageClient::open(client); + + // Overwrite every rollup blob for this shard with bytes that can't be + // decoded as a `ProtoRollup`, simulating the corruption seen in the + // wild. `usage` and the test live in the same module, so we can reach + // into its `blob`. + let mut rollup_keys = Vec::new(); + usage + .blob + .list_keys_and_metadata(&BlobKeyPrefix::Shard(&shard_id).to_string(), &mut |meta| { + if let Ok((_, PartialBlobKey::Rollup(_, _))) = BlobKey::parse_ids(meta.key) { + rollup_keys.push(meta.key.to_owned()); + } + }) + .await + .expect("list keys"); + assert!(!rollup_keys.is_empty(), "shard should have a rollup"); + for key in &rollup_keys { + usage + .blob + .set(key, Bytes::from_static(&[0xff; 32])) + .await + .expect("overwrite rollup"); + } + + let before = usage.metrics.audit.referenced_shard_failures.get(); + // This used to panic (and, via the abort-on-panic handler, take down + // the process). It should now skip the shard and return cleanly. + let usages = usage.shards_usage_referenced([shard_id]).await; + assert_eq!(usages.by_shard.get(&shard_id).map(|u| u.size_bytes()), Some(0)); + assert_eq!( + usage.metrics.audit.referenced_shard_failures.get(), + before + 1 + ); + } + struct TestCase { current_state_batches_bytes: u64, current_state_bytes: u64,