Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pin-project-lite = { workspace = true }
tracing-appender = "0.2"
tracing-capture = "0.1.0"
tracing-subscriber = { version = "0.3", features = ["registry"] }
tracing-test = "0.2"

[package.metadata.docs.rs]
all-features = true
220 changes: 218 additions & 2 deletions lambda-runtime/src/layers/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,226 @@ where
.boxed();
self.set(RuntimeApiClientFuture::Second(next_fut));
}
Err(err) => break Err(err),
Err(err) => {
log_or_print!(
tracing: tracing::error!(error = ?err, "failed to build Lambda Runtime API request"),
fallback: eprintln!("failed to build Lambda Runtime API request: {err:?}")
);
break Err(err);
}
},
RuntimeApiClientFutureProj::Second(fut) => match ready!(fut.poll(cx)) {
Ok(resp) if !resp.status().is_success() => {
let status = resp.status();

log_or_print!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: worth a TODO comment linking to the follow-up issue about the body?

tracing: tracing::error!(status = %status, "Lambda Runtime API returned non-200 response"),
fallback: eprintln!("Lambda Runtime API returned non-200 response: status={status}")
);

// Adding more information on top of 410 Gone, to make it more clear since we cannot access the body of the message
if status == 410 {
log_or_print!(
tracing: tracing::error!("Lambda function timeout!"),
fallback: eprintln!("Lambda function timeout!")
);
}

// Return Ok to maintain existing contract - runtime continues despite API errors
break Ok(());
}
Ok(_) => break Ok(()),
Err(err) => {
log_or_print!(
tracing: tracing::error!(error = ?err, "Lambda Runtime API request failed"),
fallback: eprintln!("Lambda Runtime API request failed: {err:?}")
);
break Err(err);
}
},
RuntimeApiClientFutureProj::Second(fut) => break ready!(fut.poll(cx)).map(|_| ()),
}
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use http::StatusCode;
use http_body_util::Full;
use hyper::body::Bytes;
use lambda_runtime_api_client::body::Body;
use std::convert::Infallible;
use tokio::net::TcpListener;
use tracing_test::traced_test;

async fn start_mock_server(status: StatusCode) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://{}", addr);

tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let io = hyper_util::rt::TokioIo::new(stream);

let service = hyper::service::service_fn(move |_req| async move {
Ok::<_, Infallible>(
http::Response::builder()
.status(status)
.body(Full::new(Bytes::from("test response")))
.unwrap(),
)
});

let _ = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
.serve_connection(io, service)
.await;
});

// Give the server a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
url
}

#[tokio::test]
#[traced_test]
async fn test_successful_response() {
let url = start_mock_server(StatusCode::OK).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

assert!(result.is_ok());
// No error logs should be present
assert!(!logs_contain("Lambda Runtime API returned non-200 response"));
}

#[tokio::test]
#[traced_test]
async fn test_410_timeout_error() {
let url = start_mock_server(StatusCode::GONE).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Returns Ok to maintain contract, but logs the error
assert!(result.is_ok());

// Verify the error was logged
assert!(logs_contain("Lambda Runtime API returned non-200 response"));
assert!(logs_contain("Lambda function timeout!"));
}

#[tokio::test]
#[traced_test]
async fn test_500_error() {
let url = start_mock_server(StatusCode::INTERNAL_SERVER_ERROR).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Returns Ok to maintain contract, but logs the error
assert!(result.is_ok());

// Verify the error was logged with status code
assert!(logs_contain("Lambda Runtime API returned non-200 response"));
}

#[tokio::test]
#[traced_test]
async fn test_404_error() {
let url = start_mock_server(StatusCode::NOT_FOUND).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Returns Ok to maintain contract, but logs the error
assert!(result.is_ok());

// Verify the error was logged
assert!(logs_contain("Lambda Runtime API returned non-200 response"));
}

#[tokio::test]
#[traced_test]
async fn test_request_build_error() {
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint("http://localhost:9001".parse().unwrap())
.build()
.unwrap(),
);

let request_fut = async { Err::<http::Request<Body>, BoxError>("Request build error".into()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("Request build error"));

// Verify the error was logged
assert!(logs_contain("failed to build Lambda Runtime API request"));
}

#[tokio::test]
#[traced_test]
async fn test_network_error() {
// Use an invalid endpoint that will fail to connect
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint("http://127.0.0.1:1".parse().unwrap()) // Port 1 should be unreachable
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Network errors should propagate as Err
assert!(result.is_err());

// Verify the error was logged
assert!(logs_contain("Lambda Runtime API request failed"));
}
}
3 changes: 3 additions & 0 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use tokio_stream::Stream;
use tower::util::ServiceFn;
pub use tower::{self, service_fn, Service};

#[macro_use]
mod macros;

/// Diagnostic utilities to convert Rust types into Lambda Error types.
pub mod diagnostic;
pub use diagnostic::Diagnostic;
Expand Down
10 changes: 10 additions & 0 deletions lambda-runtime/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Logs using tracing `error!` if a dispatcher is set, otherwise falls back to `eprintln!`.
macro_rules! log_or_print {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the new macro interface

(tracing: $tracing_expr:expr, fallback: $fallback_expr:expr) => {
if tracing::dispatcher::has_been_set() {
$tracing_expr;
} else {
$fallback_expr;
}
};
}
13 changes: 7 additions & 6 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,12 @@ where
/// unset.
pub async fn run(self) -> Result<(), BoxError> {
if let Some(raw) = concurrency_env_value() {
if tracing::dispatcher::has_been_set() {
tracing::warn!(
log_or_print!(
tracing: tracing::warn!(
"AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially",
);
} else {
eprintln!("AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially");
}
),
fallback: eprintln!("AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially")
);
}
let incoming = incoming(&self.client);
Self::run_with_incoming(self.service, self.config, incoming).await
Expand Down Expand Up @@ -938,6 +937,8 @@ mod endpoint_tests {

#[tokio::test]
#[cfg(feature = "concurrency-tokio")]
#[traced_test]
#[cfg(feature = "tokio-concurrent-runtime")]
async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> {
use std::collections::HashSet;
use tracing::info;
Expand Down