Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,9 @@ pub struct UsageAuditMetrics {
pub step_state: Counter,
/// Time spent doing math
pub step_math: Counter,
/// Count of shards skipped during a referenced-usage computation because
/// computing their usage panicked (e.g. an undecodable rollup).
pub referenced_shard_failures: IntCounter,
}

impl UsageAuditMetrics {
Expand Down Expand Up @@ -1890,6 +1893,10 @@ impl UsageAuditMetrics {
step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
step_state: step_timings.with_label_values(&["state"]),
step_math: step_timings.with_label_values(&["math"]),
referenced_shard_failures: registry.register(metric!(
name: "mz_persist_usage_referenced_shard_failures",
help: "count of shards skipped during referenced-usage computation due to a panic",
)),
}
}
}
Expand Down
96 changes: 95 additions & 1 deletion src/persist-client/src/usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

//! Introspection of storage utilization by persist

use std::any::Any;
use std::collections::BTreeMap;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Instant;

use futures::stream::{FuturesUnordered, StreamExt};
use mz_ore::cast::CastFrom;
use mz_ore::future::OreFutureExt;
use mz_persist::location::Blob;
use tokio::sync::Semaphore;
use tracing::{error, info};
Expand All @@ -25,6 +28,18 @@ use crate::internal::state::HollowBlobRef;
use crate::internal::state_versions::StateVersions;
use crate::{Metrics, PersistClient, ShardId, retry_external};

/// Best-effort extraction of the message from a caught panic payload, which is
/// almost always a `&str` or `String`.
fn panic_message(payload: &(dyn Any + Send)) -> &str {
if let Some(s) = payload.downcast_ref::<&'static str>() {
s
} else if let Some(s) = payload.downcast_ref::<String>() {
s.as_str()
} else {
"<non-string panic payload>"
}
}

/// A breakdown of the size of various contributions to a shard's blob
/// usage that is actively referenced by any live state in Consensus.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -265,7 +280,27 @@ impl StorageUsageClient {
.acquire()
.await
.expect("acquiring permit from open semaphore");
let shard_usage = self.shard_usage_referenced(shard_id).await;
// Computing a shard's usage requires decoding its rollups,
// which `expect`s the encoded state to be valid. This is a
// best-effort billing scan that runs periodically across every
// shard, so an undecodable (e.g. corrupt) rollup in any one
// shard must not abort the whole process. Catch the panic,
// skip the shard, and keep going.
let shard_usage = AssertUnwindSafe(self.shard_usage_referenced(shard_id))
.ore_catch_unwind()
.await
.unwrap_or_else(|payload| {
self.metrics.audit.referenced_shard_failures.inc();
error!(
"skipping shard {shard_id} in referenced storage usage \
computation due to a panic: {}",
panic_message(&*payload),
);
ShardUsageReferenced {
batches_bytes: 0,
rollup_bytes: 0,
}
});
(shard_id, shard_usage)
};
by_shard_futures.push(shard_usage_fut);
Expand Down Expand Up @@ -980,6 +1015,65 @@ mod tests {
assert_eq!(shard_usage_referenced.batches_bytes, batches_size);
}

// A corrupt (undecodable) rollup in one shard must not abort the whole
// process during the periodic, best-effort `shards_usage_referenced` scan.
// Instead, the bad shard is skipped and the rest are still reported.
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn usage_referenced_undecodable_rollup(dyncfgs: ConfigUpdates) {
mz_ore::test::init_logging();

let data = [
(("1".to_owned(), "one".to_owned()), 1, 1),
(("2".to_owned(), "two".to_owned()), 2, 1),
];

let shard_id = ShardId::new();
let client = new_test_client(&dyncfgs).await;
let (mut write, _read) = client
.expect_open::<String, String, u64, i64>(shard_id)
.await;
let mut b1 = write.expect_batch(&data[..], 0, 3).await;
write
.expect_compare_and_append_batch(&mut [&mut b1], 0, 3)
.await;

let usage = StorageUsageClient::open(client);

// Overwrite every rollup blob for this shard with bytes that can't be
// decoded as a `ProtoRollup`, simulating the corruption seen in the
// wild. `usage` and the test live in the same module, so we can reach
// into its `blob`.
let mut rollup_keys = Vec::new();
usage
.blob
.list_keys_and_metadata(&BlobKeyPrefix::Shard(&shard_id).to_string(), &mut |meta| {
if let Ok((_, PartialBlobKey::Rollup(_, _))) = BlobKey::parse_ids(meta.key) {
rollup_keys.push(meta.key.to_owned());
}
})
.await
.expect("list keys");
assert!(!rollup_keys.is_empty(), "shard should have a rollup");
for key in &rollup_keys {
usage
.blob
.set(key, Bytes::from_static(&[0xff; 32]))
.await
.expect("overwrite rollup");
}

let before = usage.metrics.audit.referenced_shard_failures.get();
// This used to panic (and, via the abort-on-panic handler, take down
// the process). It should now skip the shard and return cleanly.
let usages = usage.shards_usage_referenced([shard_id]).await;
assert_eq!(usages.by_shard.get(&shard_id).map(|u| u.size_bytes()), Some(0));
assert_eq!(
usage.metrics.audit.referenced_shard_failures.get(),
before + 1
);
}

struct TestCase {
current_state_batches_bytes: u64,
current_state_bytes: u64,
Expand Down
Loading