Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub struct Context {
///
/// This span is only present if the function is using `SnapStart` and being invoked for the first time.
pub snapstart_restore_span: Option<Span>,
/// The timestamp (in nanoseconds since UNIX epoch) when `SnapStart` restore started.
///
/// This is used to detect and adjust tracer spans that have stale timestamps
/// from the snapshot creation phase.
pub snapstart_restore_time: Option<i64>,
/// The extracted span context from the incoming request, used for distributed
/// tracing.
///
Expand Down Expand Up @@ -92,6 +97,7 @@ impl Default for Context {
runtime_done_received: false,
cold_start_span: None,
snapstart_restore_span: None,
snapstart_restore_time: None,
tracer_span: None,
extracted_span_context: None,
}
Expand Down
12 changes: 12 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ impl Processor {
return;
};

context.snapstart_restore_time = Some(start_time);

// Create a SnapStart restore span
let mut snapstart_restore_span = create_empty_span(
String::from("aws.lambda.snapstart_restore"),
Expand Down Expand Up @@ -1342,6 +1344,16 @@ impl Processor {
self.context_buffer.add_tracer_span(request_id, span);
}
}

/// Get the `SnapStart` restore time for a given `request_id`.
///
/// Returns `None` if the context is not found or if this is not a `SnapStart` restore.
#[must_use]
pub fn get_snapstart_restore_time(&self, request_id: &String) -> Option<i64> {
self.context_buffer
.get(request_id)
.and_then(|ctx| ctx.snapstart_restore_time)
}
}

#[cfg(test)]
Expand Down
30 changes: 30 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ pub enum ProcessorCommand {
AddTracerSpan {
span: Box<Span>,
},
GetSnapstartRestoreTime {
request_id: String,
response: oneshot::Sender<Result<Option<i64>, ProcessorError>>,
},
OnOutOfMemoryError {
timestamp: i64,
},
Expand Down Expand Up @@ -380,6 +384,25 @@ impl InvocationProcessorHandle {
.await
}

pub async fn get_snapstart_restore_time(
&self,
request_id: String,
) -> Result<Option<i64>, ProcessorError> {
let (response_tx, response_rx) = oneshot::channel();

self.sender
.send(ProcessorCommand::GetSnapstartRestoreTime {
request_id,
response: response_tx,
})
.await
.map_err(|e| ProcessorError::ChannelSend(e.to_string()))?;

response_rx
.await
.map_err(|e| ProcessorError::ChannelReceive(e.to_string()))?
}

pub async fn on_out_of_memory_error(
&self,
timestamp: i64,
Expand Down Expand Up @@ -588,6 +611,13 @@ impl InvocationProcessorService {
ProcessorCommand::AddTracerSpan { span } => {
self.processor.add_tracer_span(&span);
}
ProcessorCommand::GetSnapstartRestoreTime {
request_id,
response,
} => {
let result = Ok(self.processor.get_snapstart_restore_time(&request_id));
let _ = response.send(result);
}
ProcessorCommand::OnOutOfMemoryError { timestamp } => {
self.processor.on_out_of_memory_error(timestamp);
}
Expand Down
40 changes: 39 additions & 1 deletion bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,47 @@ impl TraceAgent {
}
handle_reparenting(&mut reparenting_info, &mut span);

// Keep the span
chunk.push(span);
}

// SnapStart spans may have timestamps from when the snapshot was created,
// not when the Lambda was restored. Detect and adjust these stale timestamps.
// Find request_id from any span in the chunk to look up restore time.
let restore_time = {
let mut found_restore_time = None;
for span in chunk.iter() {
if let Some(request_id) = span.meta.get("request_id")
&& let Ok(Some(time)) = invocation_processor_handle
.get_snapstart_restore_time(request_id.clone())
.await
{
found_restore_time = Some(time);
break;
}
}
found_restore_time
};

// Apply timestamp adjustment to ALL spans with stale timestamps
if let Some(restore_time) = restore_time {
const SIXTY_SECONDS_NS: i64 = 60 * 1_000_000_000;
let threshold = restore_time - SIXTY_SECONDS_NS;
for span in chunk.iter_mut() {
if span.start < threshold {
debug!(
"Adjusting SnapStart span timestamp: original start {} is before restore time {}, shifting forward",
span.start, restore_time
);
span.meta
.insert("_dd.snapstart_adjusted".to_string(), "true".to_string());
span.meta.insert(
"_dd.snapstart_original_start".to_string(),
span.start.to_string(),
);
span.start = restore_time;
}
}
}
}

// Remove empty chunks
Expand Down
4 changes: 4 additions & 0 deletions integration-tests/bin/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as cdk from 'aws-cdk-lib';
import {OnDemand} from '../lib/stacks/on-demand';
import {Otlp} from '../lib/stacks/otlp';
import {Snapstart} from '../lib/stacks/snapstart';
import {SnapstartTiming} from '../lib/stacks/snapstart-timing';
import {LambdaManagedInstancesStack} from '../lib/stacks/lmi';
import {ACCOUNT, getIdentifier, REGION} from '../config';
import {CapacityProviderStack} from "../lib/capacity-provider";
Expand Down Expand Up @@ -31,6 +32,9 @@ const stacks = [
new Snapstart(app, `integ-${identifier}-snapstart`, {
env,
}),
new SnapstartTiming(app, `integ-${identifier}-snapstart-timing`, {
env,
}),
new LambdaManagedInstancesStack(app, `integ-${identifier}-lmi`, {
env,
}),
Expand Down
61 changes: 61 additions & 0 deletions integration-tests/lambda/snapstart-timing-java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>example</groupId>
<artifactId>snapstart-timing-java</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>

<name>SnapStart Timing Test Lambda</name>
<description>Java Lambda function for testing SnapStart timestamp adjustment</description>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- OkHttp is auto-instrumented by dd-trace-java -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>function</finalName>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Lambda handler designed to test SnapStart timestamp adjustment.
*
* This handler uses OkHttp to make HTTP requests during invocation.
* OkHttp is auto-instrumented by dd-trace-java, creating spans.
* We verify that spans created during invocation have correct timestamps.
*/
public class Handler implements RequestHandler<Map<String, Object>, Map<String, Object>> {

private static final ObjectMapper objectMapper = new ObjectMapper();

// Initialize OkHttp client during class loading
private static final OkHttpClient httpClient = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.build();

@Override
public Map<String, Object> handleRequest(Map<String, Object> event, Context context) {
context.getLogger().log("SnapStart timing test handler invoked");

// Make HTTP request during invocation - this creates an OkHttp span
boolean invokeRequestSuccess = false;
int invokeStatusCode = 0;
try {
Request request = new Request.Builder()
.url("https://httpbin.org/get")
.build();
try (Response response = httpClient.newCall(request).execute()) {
invokeStatusCode = response.code();
invokeRequestSuccess = (invokeStatusCode == 200);
context.getLogger().log("HTTP request completed with status: " + invokeStatusCode);
}
} catch (Exception e) {
context.getLogger().log("HTTP request failed: " + e.getMessage());
}

Map<String, Object> body = new HashMap<>();
body.put("message", "Success");
body.put("requestId", context.getAwsRequestId());
body.put("invokeRequestSuccess", invokeRequestSuccess);
body.put("invokeStatusCode", invokeStatusCode);

Map<String, Object> response = new HashMap<>();
response.put("statusCode", 200);
try {
response.put("body", objectMapper.writeValueAsString(body));
} catch (Exception e) {
response.put("body", "{}");
}

return response;
}
}
50 changes: 50 additions & 0 deletions integration-tests/lib/stacks/snapstart-timing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import {
createLogGroup,
defaultDatadogEnvVariables,
defaultDatadogSecretPolicy,
getExtensionLayer,
getDefaultJavaLayer,
defaultJavaRuntime
} from '../util';

/**
* CDK stack for testing SnapStart timestamp adjustment.
*
* Creates a Java Lambda function with SnapStart enabled that makes HTTP requests
* during class initialization. These spans get captured in the SnapStart snapshot
* and may have stale timestamps after restore. The extension under test should
* adjust these timestamps to prevent 24+ hour trace durations.
*/
export class SnapstartTiming extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);

const javaLayer = getDefaultJavaLayer(this);
const extensionLayer = getExtensionLayer(this);

const functionName = `${id}-java`;
const fn = new lambda.Function(this, functionName, {
runtime: defaultJavaRuntime,
architecture: lambda.Architecture.ARM_64,
handler: 'example.Handler::handleRequest',
code: lambda.Code.fromAsset('./lambda/snapstart-timing-java/target/function.jar'),
functionName: functionName,
timeout: cdk.Duration.seconds(30),
memorySize: 512,
snapStart: lambda.SnapStartConf.ON_PUBLISHED_VERSIONS,
environment: {
...defaultDatadogEnvVariables,
DD_SERVICE: functionName,
AWS_LAMBDA_EXEC_WRAPPER: '/opt/datadog_wrapper',
DD_TRACE_ENABLED: 'true',
},
logGroup: createLogGroup(this, functionName)
});
fn.addToRolePolicy(defaultDatadogSecretPolicy);
fn.addLayers(extensionLayer);
fn.addLayers(javaLayer);
}
}
Loading
Loading