From 0ef5929ab0ab4d3e8fa68aa1411fa806104f1dc6 Mon Sep 17 00:00:00 2001 From: El Fellah Meryem <104535388+merra3012@users.noreply.github.com> Date: Thu, 30 Apr 2026 16:14:42 +0100 Subject: [PATCH] fix: preserve latest Vertex AI session events under clock skew Stop filtering getSession events by session updateTime because updateTime is server-side and Event.timestamp is client-side, which can silently drop valid latest events. Add regression coverage for skewed timestamps and explicit getSession config filtering behavior. Made-with: Cursor --- .../adk/sessions/VertexAiSessionService.java | 26 ++---- .../sessions/VertexAiSessionServiceTest.java | 84 ++++++++++++++++++- 2 files changed, 90 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/com/google/adk/sessions/VertexAiSessionService.java b/core/src/main/java/com/google/adk/sessions/VertexAiSessionService.java index 99e7e3479..a9ca38e7a 100644 --- a/core/src/main/java/com/google/adk/sessions/VertexAiSessionService.java +++ b/core/src/main/java/com/google/adk/sessions/VertexAiSessionService.java @@ -225,7 +225,7 @@ public Maybe getSession( if (events.isEmpty()) { return sessionBuilder.build(); } - events = filterEvents(events, updateTimestamp, config); + events = filterEvents(events, config); return sessionBuilder.events(events).build(); }) .toMaybe(); @@ -233,15 +233,11 @@ public Maybe getSession( } private static List filterEvents( - List originalEvents, - @Nullable Instant updateTimestamp, - Optional config) { + List originalEvents, Optional config) { + // Do not pre-filter by session updateTime: updateTime comes from Vertex backend clock, + // while Event.timestamp is client-side. Comparing them can drop valid latest events. List events = originalEvents.stream() - .filter( - event -> - updateTimestamp == null - || Instant.ofEpochMilli(event.timestamp()).isBefore(updateTimestamp)) .sorted(Comparator.comparing(Event::timestamp)) .collect(toCollection(ArrayList::new)); @@ -253,16 +249,10 @@ private static List filterEvents( } } else if (config.get().afterTimestamp().isPresent()) { Instant afterTimestamp = config.get().afterTimestamp().get(); - int i = events.size() - 1; - while (i >= 0) { - if (Instant.ofEpochMilli(events.get(i).timestamp()).isBefore(afterTimestamp)) { - break; - } - i -= 1; - } - if (i >= 0) { - events = events.subList(i, events.size()); - } + events = + events.stream() + .filter(event -> !Instant.ofEpochMilli(event.timestamp()).isBefore(afterTimestamp)) + .collect(toCollection(ArrayList::new)); } } return events; diff --git a/core/src/test/java/com/google/adk/sessions/VertexAiSessionServiceTest.java b/core/src/test/java/com/google/adk/sessions/VertexAiSessionServiceTest.java index dd62263d7..e8caa8b8f 100644 --- a/core/src/test/java/com/google/adk/sessions/VertexAiSessionServiceTest.java +++ b/core/src/test/java/com/google/adk/sessions/VertexAiSessionServiceTest.java @@ -70,6 +70,17 @@ public class VertexAiSessionServiceTest { }\ """; + private static final String MOCK_SESSION_STRING_5 = + """ + { + "name" : "projects/test-project/locations/test-location/reasoningEngines/123/sessions/5", + "createTime" : "2026-04-29T11:00:05.000000Z", + "userId" : "skew_user", + "updateTime" : "2026-04-29T11:00:05.940523Z", + "sessionState" : {} + }\ + """; + private static final String MOCK_EVENT_STRING = """ [ @@ -103,6 +114,36 @@ public class VertexAiSessionServiceTest { ] """; + private static final String MOCK_EVENT_STRING_5 = + """ + [ + { + "name" : "projects/test-project/locations/test-location/reasoningEngines/123/sessions/5/events/500", + "invocationId" : "500", + "author" : "skew_user", + "timestamp" : "2026-04-29T11:00:05.900000Z", + "content" : { + "role" : "user", + "parts" : [ + { "text" : "before-update-time" } + ] + } + }, + { + "name" : "projects/test-project/locations/test-location/reasoningEngines/123/sessions/5/events/501", + "invocationId" : "501", + "author" : "model", + "timestamp" : "2026-04-29T11:00:06.103000Z", + "content" : { + "role" : "model", + "parts" : [ + { "text" : "after-update-time" } + ] + } + } + ] + """; + @SuppressWarnings("unchecked") private static Session getMockSession() throws Exception { Map sessionJson = @@ -154,8 +195,9 @@ public void setUp() throws Exception { ImmutableMap.of( "1", MOCK_SESSION_STRING_1, "2", MOCK_SESSION_STRING_2, - "3", MOCK_SESSION_STRING_3)); - eventMap = new HashMap<>(ImmutableMap.of("1", MOCK_EVENT_STRING)); + "3", MOCK_SESSION_STRING_3, + "5", MOCK_SESSION_STRING_5)); + eventMap = new HashMap<>(ImmutableMap.of("1", MOCK_EVENT_STRING, "5", MOCK_EVENT_STRING_5)); MockitoAnnotations.openMocks(this); vertexAiSessionService = @@ -352,6 +394,44 @@ public void listEmptySession_success() { .isEmpty(); } + @Test + public void getSession_clockSkewWithUpdateTime_doesNotDropRecentEvents() { + Session session = + vertexAiSessionService.getSession("123", "skew_user", "5", Optional.empty()).blockingGet(); + + assertThat(session.events()).hasSize(2); + ImmutableList eventIds = + session.events().stream().map(Event::id).collect(toImmutableList()); + assertThat(eventIds).containsExactly("500", "501").inOrder(); + } + + @Test + public void getSession_afterTimestamp_filtersAtOrAfterThreshold() { + Instant threshold = Instant.parse("2026-04-29T11:00:06.103000Z"); + GetSessionConfig config = GetSessionConfig.builder().afterTimestamp(threshold).build(); + + Session session = + vertexAiSessionService + .getSession("123", "skew_user", "5", Optional.of(config)) + .blockingGet(); + + assertThat(session.events()).hasSize(1); + assertThat(session.events().get(0).id()).isEqualTo("501"); + } + + @Test + public void getSession_numRecentEvents_returnsLatestEvents() { + GetSessionConfig config = GetSessionConfig.builder().numRecentEvents(1).build(); + + Session session = + vertexAiSessionService + .getSession("123", "skew_user", "5", Optional.of(config)) + .blockingGet(); + + assertThat(session.events()).hasSize(1); + assertThat(session.events().get(0).id()).isEqualTo("501"); + } + @Test public void appendEvent_withStateRemoved_updatesSessionState() { String userId = "userB";