Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Hook for automatically registering AutoContextMemory integration with ReActAgent.
Expand Down Expand Up @@ -230,8 +229,7 @@ private Mono<PreReasoningEvent> handlePreReasoning(PreReasoningEvent event) {
ignored -> {
event.setInputMessages(buildInputMessages(event, autoContextMemory));
return event;
})
.subscribeOn(Schedulers.boundedElastic());
});
}

private List<Msg> buildInputMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,15 @@ public boolean compressIfNeeded() {
return false;
}

Mono<Boolean> compressIfNeededAsync() {
/**
* Asynchronously compresses the working memory on a bounded elastic scheduler.
*
* <p>This is the preferred entry point for reactive callers such as WebFlux pipelines,
* because the synchronous compression logic contains blocking model-stream waits.
*
* @return a Mono that completes with whether compression was performed
*/
public Mono<Boolean> compressIfNeededAsync() {
return Mono.fromCallable(this::compressIfNeeded).subscribeOn(Schedulers.boundedElastic());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -588,6 +589,42 @@ void testPreReasoningEventOnNonBlockingScheduler() {
"Result should include system prompt plus compressed memory messages");
}

@Test
@DisplayName("Should offload PreReasoningEvent compression from non-blocking scheduler")
void testPreReasoningEventOffloadsCompressionFromNonBlockingScheduler() {
RecordingAutoContextMemory recordingMemory =
new RecordingAutoContextMemory(createCompressionConfig(), mockModel);
recordingMemory.addMessage(
Msg.builder()
.role(MsgRole.USER)
.name("user")
.content(TextBlock.builder().text("Latest request").build())
.build());

ReActAgent agent =
ReActAgent.builder()
.name("TestAgent")
.model(mockModel)
.memory(recordingMemory)
.toolkit(toolkit)
.build();

PreReasoningEvent event =
new PreReasoningEvent(
agent, "test-model", null, new ArrayList<>(recordingMemory.getMessages()));

PreReasoningEvent result =
Mono.defer(() -> hook.onEvent(event)).subscribeOn(Schedulers.parallel()).block();

assertNotNull(result);
assertTrue(
recordingMemory.wasCompressCalled(),
"Hook should trigger async compression before reasoning");
assertFalse(
recordingMemory.wasCompressCalledOnNonBlockingThread(),
"Hook should offload compression from Reactor non-blocking threads");
}

@Test
@DisplayName("Should preserve existing system prompt on non-blocking scheduler")
void testPreReasoningEventPreservesSystemPromptOnNonBlockingScheduler() {
Expand Down Expand Up @@ -684,6 +721,31 @@ private void populateCompressibleConversation(AutoContextMemory memory, int roun
}
}

private static final class RecordingAutoContextMemory extends AutoContextMemory {

private final AtomicBoolean compressCalled = new AtomicBoolean(false);
private final AtomicBoolean compressCalledOnNonBlockingThread = new AtomicBoolean(false);

RecordingAutoContextMemory(AutoContextConfig config, Model model) {
super(config, model);
}

@Override
public boolean compressIfNeeded() {
compressCalled.set(true);
compressCalledOnNonBlockingThread.set(Schedulers.isInNonBlockingThread());
return false;
}

boolean wasCompressCalled() {
return compressCalled.get();
}

boolean wasCompressCalledOnNonBlockingThread() {
return compressCalledOnNonBlockingThread.get();
}
}

/**
* Simple Model implementation for testing compression.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -131,6 +132,23 @@ void testCompressionTriggeredByMessageCount() {
"Messages should be compressed or model should be called");
}

@Test
@DisplayName("Should offload async compression from non-blocking callers")
void testCompressIfNeededAsyncOnNonBlockingScheduler() {
RecordingAutoContextMemory asyncMemory = new RecordingAutoContextMemory(config, testModel);

Boolean compressed =
Mono.defer(asyncMemory::compressIfNeededAsync)
.subscribeOn(Schedulers.parallel())
.block();

assertNotNull(compressed);
assertTrue(asyncMemory.wasCompressCalled(), "Async compression should invoke compression");
assertFalse(
asyncMemory.wasCompressCalledOnNonBlockingThread(),
"Async compression should move blocking work off Reactor non-blocking threads");
}

@Test
@DisplayName("Should call summaryPreviousRoundConversation when summarizing previous rounds")
void testSummaryPreviousRoundConversation() {
Expand Down Expand Up @@ -970,37 +988,6 @@ void testCompressToolsInvocationFullCoverage() {
"Should have tool compression event with plan-aware hint");
}

@Test
@DisplayName("Should compress asynchronously on non-blocking scheduler")
void testCompressIfNeededAsyncOnNonBlockingScheduler() {
AutoContextConfig asyncConfig =
AutoContextConfig.builder()
.msgThreshold(5)
.maxToken(10000)
.tokenRatio(0.9)
.lastKeep(2)
.minConsecutiveToolMessages(10)
.largePayloadThreshold(10000)
.minCompressionTokenThreshold(0)
.build();
TestModel asyncModel = new TestModel("Conversation summary");
AutoContextMemory asyncMemory = new AutoContextMemory(asyncConfig, asyncModel);

addCompressibleConversation(asyncMemory, 3);
asyncMemory.addMessage(createTextMessage("Latest request", MsgRole.USER));
int initialCount = asyncMemory.getMessages().size();

Boolean compressed =
Mono.defer(asyncMemory::compressIfNeededAsync)
.subscribeOn(Schedulers.parallel())
.block();

assertTrue(Boolean.TRUE.equals(compressed));
assertTrue(asyncModel.getCallCount() >= 1, "Compression should invoke the model");
assertTrue(asyncMemory.getMessages().size() < initialCount);
assertFalse(asyncMemory.getOffloadContext().isEmpty());
}

@Test
@DisplayName("Should allow repeated async compression calls")
void testCompressIfNeededAsyncMultipleCalls() {
Expand Down Expand Up @@ -1059,6 +1046,31 @@ private Msg createTextMessage(String text, MsgRole role) {
.build();
}

private static final class RecordingAutoContextMemory extends AutoContextMemory {

private final AtomicBoolean compressCalled = new AtomicBoolean(false);
private final AtomicBoolean compressCalledOnNonBlockingThread = new AtomicBoolean(false);

RecordingAutoContextMemory(AutoContextConfig config, Model model) {
super(config, model);
}

@Override
public boolean compressIfNeeded() {
compressCalled.set(true);
compressCalledOnNonBlockingThread.set(Schedulers.isInNonBlockingThread());
return false;
}

boolean wasCompressCalled() {
return compressCalled.get();
}

boolean wasCompressCalledOnNonBlockingThread() {
return compressCalledOnNonBlockingThread.get();
}
}

private Msg createToolUseMessage(String toolName, String callId) {
return Msg.builder()
.role(MsgRole.ASSISTANT)
Expand Down
Loading