Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 62 additions & 41 deletions crates/http-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::{Duration, Instant};

use anyhow::{Error, Result, anyhow};
use http::{HeaderMap, HeaderName, Uri, header, uri::Scheme};
Expand Down Expand Up @@ -64,6 +65,10 @@ pub struct Backend<C> {
pub strategy: BackendStrategy,
ext_http_stats: Option<Arc<dyn ExtRequestStats>>,
hostname: Option<SmolStr>,
/// Counter shared with the wasmtime Store: each outbound HTTP call
/// deposits its elapsed wall-clock time (ms) here so the epoch
/// deadline callback can refund those ticks to the guest.
epoch_pause_ms: Arc<AtomicU64>,
}

pub struct Builder {
Expand Down Expand Up @@ -112,6 +117,7 @@ impl Builder {
strategy: self.strategy,
ext_http_stats: None,
hostname: self.hostname.clone(),
epoch_pause_ms: Arc::new(AtomicU64::new(0)),
}
}
}
Expand Down Expand Up @@ -140,6 +146,12 @@ impl<C> Backend<C> {
self.ext_http_stats.replace(stats);
}

/// Share the epoch-pause counter with the Store so that outbound HTTP
/// time can be excluded from the guest's execution-time budget.
pub fn set_epoch_pause_ms(&mut self, counter: Arc<AtomicU64>) {
self.epoch_pause_ms = counter;
}

pub fn propagate_header_names(&self) -> HeaderNameList {
self.propagate_header_names.clone()
}
Expand Down Expand Up @@ -321,48 +333,57 @@ where
.as_ref()
.map(|s| ExtStatsTimer::new(s.clone()));

let res = self.client.request(request).await.map_err(|error| {
warn!(cause=?error, "sending request to backend");
HttpError::RequestError
})?;

let status = res.status().as_u16();
let (parts, body) = res.into_parts();
let headers = if !parts.headers.is_empty() {
Some(
parts
.headers
.iter()
.filter_map(|(name, value)| match value.to_str() {
Ok(value) => Some((name.to_string(), value.to_string())),
Err(error) => {
warn!(cause=?error, "invalid value: {:?}", value);
None
}
})
.collect::<Vec<(String, String)>>(),
)
} else {
None
};

let body_bytes = body
.collect()
.await
.map_err(|error| {
warn!(cause=?error, "receiving body from backend");
// Time the network I/O so we can refund the equivalent epoch ticks
// to the guest. Both the request send and the body read count.
let started = Instant::now();
let result = async {
let res = self.client.request(request).await.map_err(|error| {
warn!(cause=?error, "sending request to backend");
HttpError::RequestError
})?
.to_bytes();
let body = Some(body_bytes.to_vec());

trace!(?status, ?headers, len = body_bytes.len(), "reply");
})?;

let status = res.status().as_u16();
let (parts, body) = res.into_parts();
let headers = if !parts.headers.is_empty() {
Some(
parts
.headers
.iter()
.filter_map(|(name, value)| match value.to_str() {
Ok(value) => Some((name.to_string(), value.to_string())),
Err(error) => {
warn!(cause=?error, "invalid value: {:?}", value);
None
}
})
.collect::<Vec<(String, String)>>(),
)
} else {
None
};

Ok(Response {
status,
headers,
body,
})
let body_bytes = body
.collect()
.await
.map_err(|error| {
warn!(cause=?error, "receiving body from backend");
HttpError::RequestError
})?
.to_bytes();
let body = Some(body_bytes.to_vec());

trace!(?status, ?headers, len = body_bytes.len(), "reply");

Ok::<_, HttpError>(Response {
status,
headers,
body,
})
}
.await;
self.epoch_pause_ms
.fetch_add(started.elapsed().as_millis() as u64, Ordering::Relaxed);
result
}
}

Expand Down
10 changes: 9 additions & 1 deletion crates/http-service/src/executor/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use reactor::gcore::fastedge;
use runtime::util::stats::{StatsTimer, StatsVisitor};
use runtime::{InstancePre, store::StoreBuilder};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use wasmtime_wasi_http::body::HyperOutgoingBody;

Expand Down Expand Up @@ -73,14 +74,21 @@ where

let properties = executor::get_properties(&parts.headers);

let store_builder = self.store_builder.with_properties(properties);
// Shared counter so external HTTP time in `http_backend.send_request`
// refunds epoch ticks via the Store's `epoch_deadline_callback`.
let epoch_pause_ms = Arc::new(AtomicU64::new(0));
let store_builder = self
.store_builder
.with_properties(properties)
.epoch_pause_ms(epoch_pause_ms.clone());
Comment thread
ruslanti marked this conversation as resolved.
let mut http_backend = self.backend;

http_backend
.propagate_headers(parts.headers.clone())
.context("propagate headers")?;

http_backend.set_ext_http_stats(stats.clone());
http_backend.set_epoch_pause_ms(epoch_pause_ms);

let propagate_header_names = http_backend.propagate_header_names();
let backend_uri = http_backend.uri();
Expand Down
11 changes: 10 additions & 1 deletion crates/http-service/src/executor/wasi_http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Duration;

use crate::executor;
Expand Down Expand Up @@ -88,8 +89,16 @@ where
let body = body.boxed();

let properties = executor::get_properties(&parts.headers);
let store_builder = self.store_builder.with_properties(properties);
// Shared counter so wasi-http outbound calls (handled by
// `WasiHttpView::send_request` in the runtime) refund epoch ticks
// via the Store's `epoch_deadline_callback`.
let epoch_pause_ms = Arc::new(AtomicU64::new(0));
let store_builder = self
.store_builder
.with_properties(properties)
.epoch_pause_ms(epoch_pause_ms.clone());
let mut http_backend = self.backend;
http_backend.set_epoch_pause_ms(epoch_pause_ms);

http_backend
.propagate_headers(parts.headers.clone())
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ lazy_static = { version = "1.5.0", optional = true }

[dev-dependencies]
claims = "0.8"
wat = "1"

[lints]
workspace = true
16 changes: 13 additions & 3 deletions crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::app::KvStoreOption;
use crate::store::HasStats;
use http_backend::stats::ExtStatsTimer;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use std::{fmt::Debug, ops::Deref};
use utils::{Dictionary, Utils};
use wasmtime_wasi::ResourceTable;
Expand Down Expand Up @@ -97,6 +99,11 @@ pub struct Data<T: 'static> {
pub dictionary: Dictionary,
pub utils: Utils,
pub cache: cache::CacheImpl,
/// Milliseconds of host I/O that should not count against the epoch
/// deadline. The `epoch_deadline_callback` installed on the Store reads
/// and clears this counter, converting milliseconds into extra ticks to
/// extend the deadline.
pub epoch_pause_ms: Arc<AtomicU64>,
}

pub trait BackendRequest {
Expand Down Expand Up @@ -143,13 +150,16 @@ impl<T: Send + BackendRequest + HasStats> WasiHttpView for Data<T> {
let request = Request::from_parts(head, body);
// start external request stats timer
let stats = self.inner.get_stats();
let epoch_pause_ms = self.epoch_pause_ms.clone();

let handle = wasmtime_wasi::runtime::spawn(async move {
let _stats_timer = ExtStatsTimer::new(stats); // keep timer alive until request is done
Ok(
let started = Instant::now();
let resp =
default_send_request_handler(request, OutgoingRequestConfig { use_tls, ..config })
.await,
)
.await;
epoch_pause_ms.fetch_add(started.elapsed().as_millis() as u64, Ordering::Relaxed);
Ok(resp)
});
Ok(HostFutureIncomingResponse::pending(handle))
}
Expand Down
Loading
Loading