From a8457cb155b6837a50a2541dd400aeb3831de302 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Mon, 11 May 2026 13:08:00 +0300 Subject: [PATCH 1/2] feat: add support for epoch-based deadline extensions for host I/O - Introduced `epoch_pause_ms` to track elapsed host I/O time and extend execution deadlines. - Enhanced `StoreBuilder` with `epoch_pause_ms` and `max_external_duration_ms` configuration. - Updated HTTP backends to deposit I/O time into shared counters and refund guest execution ticks. - Integrated `epoch_deadline_callback` for dynamic deadline adjustments. --- crates/http-backend/src/lib.rs | 103 +++++++++++------- crates/http-service/src/executor/http.rs | 10 +- crates/http-service/src/executor/wasi_http.rs | 11 +- crates/runtime/src/lib.rs | 16 ++- crates/runtime/src/store.rs | 55 +++++++++- 5 files changed, 146 insertions(+), 49 deletions(-) diff --git a/crates/http-backend/src/lib.rs b/crates/http-backend/src/lib.rs index f14ed10..a2f5d1d 100644 --- a/crates/http-backend/src/lib.rs +++ b/crates/http-backend/src/lib.rs @@ -6,8 +6,9 @@ use std::future::Future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::{Error, Result, anyhow}; use http::{HeaderMap, HeaderName, Uri, header, uri::Scheme}; @@ -64,6 +65,10 @@ pub struct Backend { pub strategy: BackendStrategy, ext_http_stats: Option>, hostname: Option, + /// Counter shared with the wasmtime Store: each outbound HTTP call + /// deposits its elapsed wall-clock time (ms) here so the epoch + /// deadline callback can refund those ticks to the guest. + epoch_pause_ms: Arc, } pub struct Builder { @@ -112,6 +117,7 @@ impl Builder { strategy: self.strategy, ext_http_stats: None, hostname: self.hostname.clone(), + epoch_pause_ms: Arc::new(AtomicU64::new(0)), } } } @@ -140,6 +146,12 @@ impl Backend { self.ext_http_stats.replace(stats); } + /// Share the epoch-pause counter with the Store so that outbound HTTP + /// time can be excluded from the guest's execution-time budget. + pub fn set_epoch_pause_ms(&mut self, counter: Arc) { + self.epoch_pause_ms = counter; + } + pub fn propagate_header_names(&self) -> HeaderNameList { self.propagate_header_names.clone() } @@ -321,48 +333,57 @@ where .as_ref() .map(|s| ExtStatsTimer::new(s.clone())); - let res = self.client.request(request).await.map_err(|error| { - warn!(cause=?error, "sending request to backend"); - HttpError::RequestError - })?; - - let status = res.status().as_u16(); - let (parts, body) = res.into_parts(); - let headers = if !parts.headers.is_empty() { - Some( - parts - .headers - .iter() - .filter_map(|(name, value)| match value.to_str() { - Ok(value) => Some((name.to_string(), value.to_string())), - Err(error) => { - warn!(cause=?error, "invalid value: {:?}", value); - None - } - }) - .collect::>(), - ) - } else { - None - }; - - let body_bytes = body - .collect() - .await - .map_err(|error| { - warn!(cause=?error, "receiving body from backend"); + // Time the network I/O so we can refund the equivalent epoch ticks + // to the guest. Both the request send and the body read count. + let started = Instant::now(); + let result = async { + let res = self.client.request(request).await.map_err(|error| { + warn!(cause=?error, "sending request to backend"); HttpError::RequestError - })? - .to_bytes(); - let body = Some(body_bytes.to_vec()); - - trace!(?status, ?headers, len = body_bytes.len(), "reply"); + })?; + + let status = res.status().as_u16(); + let (parts, body) = res.into_parts(); + let headers = if !parts.headers.is_empty() { + Some( + parts + .headers + .iter() + .filter_map(|(name, value)| match value.to_str() { + Ok(value) => Some((name.to_string(), value.to_string())), + Err(error) => { + warn!(cause=?error, "invalid value: {:?}", value); + None + } + }) + .collect::>(), + ) + } else { + None + }; - Ok(Response { - status, - headers, - body, - }) + let body_bytes = body + .collect() + .await + .map_err(|error| { + warn!(cause=?error, "receiving body from backend"); + HttpError::RequestError + })? + .to_bytes(); + let body = Some(body_bytes.to_vec()); + + trace!(?status, ?headers, len = body_bytes.len(), "reply"); + + Ok::<_, HttpError>(Response { + status, + headers, + body, + }) + } + .await; + self.epoch_pause_ms + .fetch_add(started.elapsed().as_millis() as u64, Ordering::Relaxed); + result } } diff --git a/crates/http-service/src/executor/http.rs b/crates/http-service/src/executor/http.rs index de21605..26388dd 100644 --- a/crates/http-service/src/executor/http.rs +++ b/crates/http-service/src/executor/http.rs @@ -11,6 +11,7 @@ use reactor::gcore::fastedge; use runtime::util::stats::{StatsTimer, StatsVisitor}; use runtime::{InstancePre, store::StoreBuilder}; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use std::time::Duration; use wasmtime_wasi_http::body::HyperOutgoingBody; @@ -73,7 +74,13 @@ where let properties = executor::get_properties(&parts.headers); - let store_builder = self.store_builder.with_properties(properties); + // Shared counter so external HTTP time in `http_backend.send_request` + // refunds epoch ticks via the Store's `epoch_deadline_callback`. + let epoch_pause_ms = Arc::new(AtomicU64::new(0)); + let store_builder = self + .store_builder + .with_properties(properties) + .epoch_pause_ms(epoch_pause_ms.clone()); let mut http_backend = self.backend; http_backend @@ -81,6 +88,7 @@ where .context("propagate headers")?; http_backend.set_ext_http_stats(stats.clone()); + http_backend.set_epoch_pause_ms(epoch_pause_ms); let propagate_header_names = http_backend.propagate_header_names(); let backend_uri = http_backend.uri(); diff --git a/crates/http-service/src/executor/wasi_http.rs b/crates/http-service/src/executor/wasi_http.rs index 342d04d..9ca73cb 100644 --- a/crates/http-service/src/executor/wasi_http.rs +++ b/crates/http-service/src/executor/wasi_http.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::atomic::AtomicU64; use std::time::Duration; use crate::executor; @@ -88,8 +89,16 @@ where let body = body.boxed(); let properties = executor::get_properties(&parts.headers); - let store_builder = self.store_builder.with_properties(properties); + // Shared counter so wasi-http outbound calls (handled by + // `WasiHttpView::send_request` in the runtime) refund epoch ticks + // via the Store's `epoch_deadline_callback`. + let epoch_pause_ms = Arc::new(AtomicU64::new(0)); + let store_builder = self + .store_builder + .with_properties(properties) + .epoch_pause_ms(epoch_pause_ms.clone()); let mut http_backend = self.backend; + http_backend.set_epoch_pause_ms(epoch_pause_ms); http_backend .propagate_headers(parts.headers.clone()) diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 66444b1..cc1c973 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -2,6 +2,8 @@ use crate::app::KvStoreOption; use crate::store::HasStats; use http_backend::stats::ExtStatsTimer; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; use std::{fmt::Debug, ops::Deref}; use utils::{Dictionary, Utils}; use wasmtime_wasi::ResourceTable; @@ -97,6 +99,11 @@ pub struct Data { pub dictionary: Dictionary, pub utils: Utils, pub cache: cache::CacheImpl, + /// Milliseconds of host I/O that should not count against the epoch + /// deadline. The `epoch_deadline_callback` installed on the Store reads + /// and clears this counter, converting milliseconds into extra ticks to + /// extend the deadline. + pub epoch_pause_ms: Arc, } pub trait BackendRequest { @@ -143,13 +150,16 @@ impl WasiHttpView for Data { let request = Request::from_parts(head, body); // start external request stats timer let stats = self.inner.get_stats(); + let epoch_pause_ms = self.epoch_pause_ms.clone(); let handle = wasmtime_wasi::runtime::spawn(async move { let _stats_timer = ExtStatsTimer::new(stats); // keep timer alive until request is done - Ok( + let started = Instant::now(); + let resp = default_send_request_handler(request, OutgoingRequestConfig { use_tls, ..config }) - .await, - ) + .await; + epoch_pause_ms.fetch_add(started.elapsed().as_millis() as u64, Ordering::Relaxed); + Ok(resp) }); Ok(HostFutureIncomingResponse::pending(handle)) } diff --git a/crates/runtime/src/store.rs b/crates/runtime/src/store.rs index 6f4acaf..a692cf7 100644 --- a/crates/runtime/src/store.rs +++ b/crates/runtime/src/store.rs @@ -6,6 +6,7 @@ use crate::{DEFAULT_EPOCH_TICK_INTERVAL, Data, Wasi, WasiVersion}; use anyhow::Result; use secret::SecretStore; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{ collections::HashMap, fmt::{Debug, Formatter}, @@ -13,11 +14,18 @@ use std::{ }; use tracing::{debug, instrument, trace}; use utils::{Dictionary, Utils}; +use wasmtime::UpdateDeadline; use wasmtime::component::ResourceTable; use wasmtime_wasi::WasiCtxBuilder; use wasmtime_wasi_http::WasiHttpCtx; use wasmtime_wasi_nn::wit::WasiNnCtx; +/// Default extra wall-clock budget (in ms) added to the tokio timeout that +/// wraps wasm execution. Acts as a soft outer bound that must comfortably +/// exceed the epoch budget once host I/O credit (e.g. external HTTP) extends +/// the epoch deadline. +pub const DEFAULT_MAX_EXTERNAL_DURATION_MS: u64 = 30_000; + /// A `Store` holds the runtime state of a app instance. /// /// `Store` lives only for the lifetime of a single app invocation. @@ -92,6 +100,8 @@ pub struct StoreBuilder { key_value_store: key_value_store::Builder, dictionary: Dictionary, cache_backend: Option>, + epoch_pause_ms: Option>, + max_external_duration_ms: u64, } impl StoreBuilder { @@ -110,6 +120,8 @@ impl StoreBuilder { key_value_store: key_value_store::Builder::default(), dictionary: Default::default(), cache_backend: None, + epoch_pause_ms: None, + max_external_duration_ms: DEFAULT_MAX_EXTERNAL_DURATION_MS, } } @@ -193,6 +205,28 @@ impl StoreBuilder { } } + /// Provide the shared counter that host functions use to "pause" the + /// epoch deadline. Each ms accumulated here grants the guest extra + /// ticks when the epoch deadline next fires. If unset, the store + /// falls back to a private counter (no host call can deposit credit). + pub fn epoch_pause_ms(self, counter: Arc) -> Self { + Self { + epoch_pause_ms: Some(counter), + ..self + } + } + + /// Wall-clock slack (in ms) added to the tokio timeout that wraps + /// wasm execution. Must comfortably exceed the worst-case total time + /// spent in epoch-paused host calls so the tokio bound does not fire + /// before the epoch one. + pub fn max_external_duration_ms(self, ms: u64) -> Self { + Self { + max_external_duration_ms: ms, + ..self + } + } + pub fn make_wasi_nn(&self) -> Result { // initialize application specific graph let backends: Vec<&str> = self @@ -271,6 +305,9 @@ impl StoreBuilder { self.cache_backend .unwrap_or_else(|| Arc::new(cache::NoCacheBackend)), ); + let epoch_pause_ms = self + .epoch_pause_ms + .unwrap_or_else(|| Arc::new(AtomicU64::new(0))); let mut inner = wasmtime::Store::new( &self.engine, @@ -279,7 +316,8 @@ impl StoreBuilder { wasi, wasi_nn, store_limits: self.store_limits, - timeout: (self.max_duration + 1) * DEFAULT_EPOCH_TICK_INTERVAL, + timeout: (self.max_duration + 1) * DEFAULT_EPOCH_TICK_INTERVAL + + self.max_external_duration_ms, table, logger, http: WasiHttpCtx::new(), @@ -288,12 +326,23 @@ impl StoreBuilder { dictionary: self.dictionary, utils, cache: cache_impl, + epoch_pause_ms: epoch_pause_ms.clone(), }, ); inner.limiter(|state| &mut state.store_limits); // allow max number of epoch ticks (1 tick = 10 ms) - inner.set_epoch_deadline(self.max_duration); // allow max number of epoch ticks (1 tick = 10 ms) - inner.epoch_deadline_trap(); + inner.set_epoch_deadline(self.max_duration); + // When the deadline fires, consume any host-call credit accumulated by + // `epoch_pause_ms` to extend it; if there is no credit, trap as before. + inner.epoch_deadline_callback(move |_ctx| { + let credit_ms = epoch_pause_ms.swap(0, Ordering::Relaxed); + if credit_ms == 0 { + Err(anyhow::Error::new(wasmtime::Trap::Interrupt)) + } else { + let credit_ticks = (credit_ms / DEFAULT_EPOCH_TICK_INTERVAL).max(1); + Ok(UpdateDeadline::Continue(credit_ticks)) + } + }); Ok(Store { inner }) } } From 05a01f716272972b2c0cdb4dd83d588dea667226 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Tue, 12 May 2026 15:58:36 +0300 Subject: [PATCH 2/2] feat: enhance epoch deadline handling with credit refunding logic --- Cargo.lock | 1 + crates/runtime/Cargo.toml | 1 + crates/runtime/src/store.rs | 212 ++++++++++++++++++++++++++++++++++-- 3 files changed, 207 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f23de9..611f5f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2950,6 +2950,7 @@ dependencies = [ "wasmtime-wasi-http", "wasmtime-wasi-io 36.0.7 (git+https://github.com/G-Core/wasmtime.git?branch=release-36.0.0)", "wasmtime-wasi-nn", + "wat", "wit-component 0.246.2", ] diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 1c18832..79a3307 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -43,6 +43,7 @@ lazy_static = { version = "1.5.0", optional = true } [dev-dependencies] claims = "0.8" +wat = "1" [lints] workspace = true diff --git a/crates/runtime/src/store.rs b/crates/runtime/src/store.rs index a692cf7..0b2e5a5 100644 --- a/crates/runtime/src/store.rs +++ b/crates/runtime/src/store.rs @@ -316,8 +316,15 @@ impl StoreBuilder { wasi, wasi_nn, store_limits: self.store_limits, - timeout: (self.max_duration + 1) * DEFAULT_EPOCH_TICK_INTERVAL - + self.max_external_duration_ms, + // Saturating arithmetic: configuration values are `u64` and could, in + // pathological cases, overflow when combined. Wrapping would silently + // produce an unexpectedly tiny timeout in release builds, so cap at + // `u64::MAX` instead. + timeout: self + .max_duration + .saturating_add(1) + .saturating_mul(DEFAULT_EPOCH_TICK_INTERVAL) + .saturating_add(self.max_external_duration_ms), table, logger, http: WasiHttpCtx::new(), @@ -336,17 +343,34 @@ impl StoreBuilder { // `epoch_pause_ms` to extend it; if there is no credit, trap as before. inner.epoch_deadline_callback(move |_ctx| { let credit_ms = epoch_pause_ms.swap(0, Ordering::Relaxed); - if credit_ms == 0 { - Err(anyhow::Error::new(wasmtime::Trap::Interrupt)) - } else { - let credit_ticks = (credit_ms / DEFAULT_EPOCH_TICK_INTERVAL).max(1); - Ok(UpdateDeadline::Continue(credit_ticks)) + match epoch_credit_ticks(credit_ms) { + None => Err(anyhow::Error::new(wasmtime::Trap::Interrupt)), + Some(ticks) => Ok(UpdateDeadline::Continue(ticks)), } }); Ok(Store { inner }) } } +/// Compute how many epoch ticks to refund given accumulated host-call +/// credit (in ms). +/// +/// Returns `None` when no credit has been deposited — the epoch deadline +/// should fire as an interrupt. Otherwise returns the number of additional +/// ticks to grant, rounded **up** so any sub-tick remainder still counts +/// as a full tick. Without ceil-div, e.g. 15 ms of credit would only +/// refund 1 tick (10 ms) and the next epoch deadline could still trap +/// prematurely. +fn epoch_credit_ticks(credit_ms: u64) -> Option { + if credit_ms == 0 { + None + } else { + // Ceil-div: `div_ceil` only returns 0 when the dividend is 0, + // which is short-circuited above, so this never returns `Some(0)`. + Some(credit_ms.div_ceil(DEFAULT_EPOCH_TICK_INTERVAL)) + } +} + impl Debug for StoreBuilder { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("StoreBuilder") @@ -357,3 +381,177 @@ impl Debug for StoreBuilder { .finish() } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::AtomicBool; + use std::thread; + use std::time::Duration; + use wasmtime::{Config, Engine, Instance, Module, Store as WtStore, Trap}; + + // ── unit tests: epoch_credit_ticks helper ───────────────────────────── + + #[test] + fn epoch_credit_ticks_zero_means_trap() { + // No host-call credit accumulated → callback must signal a trap. + assert_eq!(epoch_credit_ticks(0), None); + } + + #[test] + fn epoch_credit_ticks_rounds_up() { + // 1 tick == DEFAULT_EPOCH_TICK_INTERVAL ms (10 ms). + // Any sub-tick remainder must round UP so the guest isn't under-refunded. + assert_eq!(epoch_credit_ticks(1), Some(1)); + assert_eq!(epoch_credit_ticks(9), Some(1)); + assert_eq!(epoch_credit_ticks(10), Some(1)); + assert_eq!(epoch_credit_ticks(11), Some(2)); // <-- regression case + assert_eq!(epoch_credit_ticks(15), Some(2)); // <-- regression case + assert_eq!(epoch_credit_ticks(20), Some(2)); + assert_eq!(epoch_credit_ticks(25), Some(3)); + assert_eq!(epoch_credit_ticks(100), Some(10)); + } + + #[test] + fn epoch_credit_ticks_handles_u64_max() { + // Must not panic or overflow on extreme inputs. + assert_eq!( + epoch_credit_ticks(u64::MAX), + Some(u64::MAX.div_ceil(DEFAULT_EPOCH_TICK_INTERVAL)), + ); + } + + // ── integration test: end-to-end deadline extension ─────────────────── + // + // These tests verify the same wiring used in production + // (`set_epoch_deadline` + `epoch_deadline_callback` consuming + // `epoch_pause_ms`) on a real wasmtime engine running a busy-loop guest. + // + // Without depositing credit, the deadline must fire → `Trap::Interrupt`. + // With credit deposited (simulating a host HTTP call that paused the + // epoch), the deadline must be extended and the guest must complete. + + /// A guest with a bounded busy loop. Counts down a mutable global so + /// the loop body contains a backward branch (which is what triggers + /// wasmtime's epoch deadline check). + const SPIN_WAT: &str = r#" + (module + (global $i (mut i32) (i32.const 5000000)) + (func (export "spin") + (loop $l + (global.set $i (i32.sub (global.get $i) (i32.const 1))) + (br_if $l (i32.gt_s (global.get $i) (i32.const 0)))))) + "#; + + fn make_engine() -> Engine { + let mut cfg = Config::new(); + cfg.epoch_interruption(true); + Engine::new(&cfg).unwrap() + } + + /// Install the same epoch-deadline callback used in production + /// (see `StoreBuilder::build_with_wasi`). + fn install_epoch_callback(store: &mut WtStore, epoch_pause_ms: Arc) { + store.epoch_deadline_callback(move |_ctx| { + let credit_ms = epoch_pause_ms.swap(0, Ordering::Relaxed); + match epoch_credit_ticks(credit_ms) { + None => Err(anyhow::Error::new(Trap::Interrupt)), + Some(ticks) => Ok(UpdateDeadline::Continue(ticks)), + } + }); + } + + #[test] + fn busy_loop_without_credit_traps_with_interrupt() { + let engine = make_engine(); + let module = Module::new(&engine, SPIN_WAT).unwrap(); + let epoch_pause_ms = Arc::new(AtomicU64::new(0)); + + let mut store = WtStore::new(&engine, ()); + store.set_epoch_deadline(1); // very short budget: 1 tick + install_epoch_callback(&mut store, epoch_pause_ms.clone()); + + // Background ticker: bump the engine epoch until the test signals stop. + let stop = Arc::new(AtomicBool::new(false)); + let ticker = { + let engine = engine.clone(); + let stop = stop.clone(); + thread::spawn(move || { + while !stop.load(Ordering::Relaxed) { + engine.increment_epoch(); + thread::sleep(Duration::from_millis(1)); + } + }) + }; + + let instance = Instance::new(&mut store, &module, &[]).unwrap(); + let spin = instance + .get_typed_func::<(), ()>(&mut store, "spin") + .unwrap(); + + let result = spin.call(&mut store, ()); + + stop.store(true, Ordering::Relaxed); + ticker.join().unwrap(); + + // The guest's busy loop must be cut short by the epoch deadline + // because `epoch_pause_ms` was never incremented. + let err = result.expect_err("guest must trap when no host-call credit is deposited"); + let trap = err.root_cause().downcast_ref::().copied(); + assert_eq!( + trap, + Some(Trap::Interrupt), + "expected Trap::Interrupt, got: {err:?}" + ); + // No leftover credit should remain in the pause counter — the + // callback must consume it on each fire. + assert_eq!(epoch_pause_ms.load(Ordering::Relaxed), 0); + } + + #[test] + fn busy_loop_completes_when_credit_is_deposited() { + let engine = make_engine(); + let module = Module::new(&engine, SPIN_WAT).unwrap(); + let epoch_pause_ms = Arc::new(AtomicU64::new(0)); + + let mut store = WtStore::new(&engine, ()); + store.set_epoch_deadline(1); // very short budget: 1 tick + install_epoch_callback(&mut store, epoch_pause_ms.clone()); + + // Background ticker bumps the epoch AND deposits generous credit so + // every fired deadline is extended via `UpdateDeadline::Continue`. + // This is the production scenario where an outbound HTTP request + // ran for some time and the host-side timer deposited that elapsed + // time into `epoch_pause_ms` (see `Data::with_paused_epoch`). + let stop = Arc::new(AtomicBool::new(false)); + let ticker = { + let engine = engine.clone(); + let stop = stop.clone(); + let credit = epoch_pause_ms.clone(); + thread::spawn(move || { + while !stop.load(Ordering::Relaxed) { + // Deposit *before* incrementing the epoch so the + // callback observes non-zero credit when it fires. + credit.fetch_add(1_000_000, Ordering::Relaxed); + engine.increment_epoch(); + thread::sleep(Duration::from_millis(1)); + } + }) + }; + + let instance = Instance::new(&mut store, &module, &[]).unwrap(); + let spin = instance + .get_typed_func::<(), ()>(&mut store, "spin") + .unwrap(); + + let result = spin.call(&mut store, ()); + + stop.store(true, Ordering::Relaxed); + ticker.join().unwrap(); + + // With epoch credit being continuously deposited, the deadline + // must always be extended — execution should reach the natural + // end of the bounded loop without trapping. + result.expect("guest must complete when epoch credit is deposited"); + } +}