Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pin-project-lite = { workspace = true }
tracing-appender = "0.2"
tracing-capture = "0.1.0"
tracing-subscriber = { version = "0.3", features = ["registry"] }
tracing-test = "0.2"

[package.metadata.docs.rs]
all-features = true
220 changes: 218 additions & 2 deletions lambda-runtime/src/layers/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,226 @@ where
.boxed();
self.set(RuntimeApiClientFuture::Second(next_fut));
}
Err(err) => break Err(err),
Err(err) => {
log_or_print!(
tracing: tracing::error!(error = ?err, "failed to build Lambda Runtime API request"),
fallback: eprintln!("failed to build Lambda Runtime API request: {err:?}")
);
break Err(err);
}
},
RuntimeApiClientFutureProj::Second(fut) => match ready!(fut.poll(cx)) {
Ok(resp) if !resp.status().is_success() => {
let status = resp.status();

log_or_print!(
Comment thread
darklight3it marked this conversation as resolved.
tracing: tracing::error!(status = %status, "Lambda Runtime API returned non-200 response"),
fallback: eprintln!("Lambda Runtime API returned non-200 response: status={status}")
);

// Adding more information on top of 410 Gone, to make it more clear since we cannot access the body of the message
if status == 410 {
log_or_print!(
tracing: tracing::error!("Lambda function timeout!"),
fallback: eprintln!("Lambda function timeout!")
);
}

// Return Ok to maintain existing contract - runtime continues despite API errors
break Ok(());
}
Ok(_) => break Ok(()),
Err(err) => {
log_or_print!(
tracing: tracing::error!(error = ?err, "Lambda Runtime API request failed"),
fallback: eprintln!("Lambda Runtime API request failed: {err:?}")
);
break Err(err);
}
Comment on lines +96 to +127
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly what I had in mind.

},
RuntimeApiClientFutureProj::Second(fut) => break ready!(fut.poll(cx)).map(|_| ()),
}
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use http::StatusCode;
use http_body_util::Full;
use hyper::body::Bytes;
use lambda_runtime_api_client::body::Body;
use std::convert::Infallible;
use tokio::net::TcpListener;
use tracing_test::traced_test;

async fn start_mock_server(status: StatusCode) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://{}", addr);
Comment on lines +145 to +148
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much easier than harness tests ;)


tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let io = hyper_util::rt::TokioIo::new(stream);

let service = hyper::service::service_fn(move |_req| async move {
Ok::<_, Infallible>(
http::Response::builder()
.status(status)
.body(Full::new(Bytes::from("test response")))
.unwrap(),
)
});

let _ = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
.serve_connection(io, service)
.await;
});

// Give the server a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
url
}

#[tokio::test]
#[traced_test]
async fn test_successful_response() {
let url = start_mock_server(StatusCode::OK).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

assert!(result.is_ok());
// No error logs should be present
assert!(!logs_contain("Lambda Runtime API returned non-200 response"));
}

#[tokio::test]
#[traced_test]
async fn test_410_timeout_error() {
let url = start_mock_server(StatusCode::GONE).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Returns Ok to maintain contract, but logs the error
assert!(result.is_ok());

// Verify the error was logged
assert!(logs_contain("Lambda Runtime API returned non-200 response"));
assert!(logs_contain("Lambda function timeout!"));
}

#[tokio::test]
#[traced_test]
async fn test_500_error() {
let url = start_mock_server(StatusCode::INTERNAL_SERVER_ERROR).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Returns Ok to maintain contract, but logs the error
assert!(result.is_ok());

// Verify the error was logged with status code
assert!(logs_contain("Lambda Runtime API returned non-200 response"));
}

#[tokio::test]
#[traced_test]
async fn test_404_error() {
let url = start_mock_server(StatusCode::NOT_FOUND).await;
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint(url.parse().unwrap())
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Returns Ok to maintain contract, but logs the error
assert!(result.is_ok());

// Verify the error was logged
assert!(logs_contain("Lambda Runtime API returned non-200 response"));
}

#[tokio::test]
#[traced_test]
async fn test_request_build_error() {
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint("http://localhost:9001".parse().unwrap())
.build()
.unwrap(),
);

let request_fut = async { Err::<http::Request<Body>, BoxError>("Request build error".into()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("Request build error"));

// Verify the error was logged
assert!(logs_contain("failed to build Lambda Runtime API request"));
}

#[tokio::test]
#[traced_test]
async fn test_network_error() {
// Use an invalid endpoint that will fail to connect
let client = Arc::new(
lambda_runtime_api_client::Client::builder()
.with_endpoint("http://127.0.0.1:1".parse().unwrap()) // Port 1 should be unreachable
.build()
.unwrap(),
);

let request_fut =
async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) };

let future = RuntimeApiClientFuture::First(request_fut, client);
let result = future.await;

// Network errors should propagate as Err
assert!(result.is_err());

// Verify the error was logged
assert!(logs_contain("Lambda Runtime API request failed"));
}
}
3 changes: 3 additions & 0 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use tokio_stream::Stream;
use tower::util::ServiceFn;
pub use tower::{self, service_fn, Service};

#[macro_use]
mod macros;

/// Diagnostic utilities to convert Rust types into Lambda Error types.
pub mod diagnostic;
pub use diagnostic::Diagnostic;
Expand Down
10 changes: 10 additions & 0 deletions lambda-runtime/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Logs using tracing `error!` if a dispatcher is set, otherwise falls back to `eprintln!`.
macro_rules! log_or_print {
Comment thread
darklight3it marked this conversation as resolved.
(tracing: $tracing_expr:expr, fallback: $fallback_expr:expr) => {
if tracing::dispatcher::has_been_set() {
$tracing_expr;
} else {
$fallback_expr;
}
};
}
13 changes: 7 additions & 6 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,12 @@ where
/// unset.
pub async fn run(self) -> Result<(), BoxError> {
if let Some(raw) = concurrency_env_value() {
if tracing::dispatcher::has_been_set() {
tracing::warn!(
log_or_print!(
tracing: tracing::warn!(
"AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially",
);
} else {
eprintln!("AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially");
}
),
fallback: eprintln!("AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially")
);
}
let incoming = incoming(&self.client);
Self::run_with_incoming(self.service, self.config, incoming).await
Expand Down Expand Up @@ -938,6 +937,8 @@ mod endpoint_tests {

#[tokio::test]
#[cfg(feature = "concurrency-tokio")]
#[traced_test]
#[cfg(feature = "tokio-concurrent-runtime")]
async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> {
use std::collections::HashSet;
use tracing::info;
Expand Down