Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ public Flux<ReactiveSessionInformation> getAllSessions(Object principal) {
@Override
public Mono<Void> saveSessionInformation(ReactiveSessionInformation information) {
this.sessionById.put(information.getSessionId(), information);
this.sessionIdsByPrincipal.computeIfAbsent(information.getPrincipal(), (key) -> new CopyOnWriteArraySet<>())
.add(information.getSessionId());
// Add the session id inside the compute so that it cannot race with the key
// removal performed by removeSessionInformation (which could otherwise drop a
// concurrently added session). This mirrors the blocking SessionRegistryImpl.
this.sessionIdsByPrincipal.compute(information.getPrincipal(), (key, sessionsUsedByPrincipal) -> {
if (sessionsUsedByPrincipal == null) {
sessionsUsedByPrincipal = new CopyOnWriteArraySet<>();
}
sessionsUsedByPrincipal.add(information.getSessionId());
return sessionsUsedByPrincipal;
});
return Mono.empty();
}

Expand All @@ -73,13 +81,14 @@ public Mono<ReactiveSessionInformation> getSessionInformation(String sessionId)
public Mono<ReactiveSessionInformation> removeSessionInformation(String sessionId) {
return getSessionInformation(sessionId).doOnNext((sessionInformation) -> {
this.sessionById.remove(sessionId);
Set<String> sessionsUsedByPrincipal = this.sessionIdsByPrincipal.get(sessionInformation.getPrincipal());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sessionsUsedByPrincipal-> sessions

@junhyeong9812 junhyeong9812 Jun 15, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that sessionsUsedByPrincipal here is the compute/computeIfPresent remapping parameter. I kept that name intentionally to mirror the blocking SessionRegistryImpl, which uses the exact same compute/computeIfPresent shape and the same sessionsUsedByPrincipal parameter name — this PR is largely about aligning the reactive registry with that sibling, so I'd lean towards keeping it for consistency. That said, it's purely stylistic, so I'm happy to shorten it to sessions in both lambdas if maintainers prefer.

if (sessionsUsedByPrincipal != null) {
sessionsUsedByPrincipal.remove(sessionId);
if (sessionsUsedByPrincipal.isEmpty()) {
this.sessionIdsByPrincipal.remove(sessionInformation.getPrincipal());
}
}
// Remove and prune atomically so the principal key is dropped only while its
// set is empty; otherwise a session added concurrently could be lost. Mirrors
// the blocking SessionRegistryImpl.
this.sessionIdsByPrincipal.computeIfPresent(sessionInformation.getPrincipal(),
(key, sessionsUsedByPrincipal) -> {
sessionsUsedByPrincipal.remove(sessionId);
return sessionsUsedByPrincipal.isEmpty() ? null : sessionsUsedByPrincipal;
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private Set<String> addSessionToSet(String principal, String sessionId, Set<String> existing) {
    if (existing == null) {
        existing = new CopyOnWriteArraySet<>();
    }
    existing.add(sessionId);
    return existing;
}

private Set<String> removeSessionFromSet(Set<String> sessions, String sessionId) {
    sessions.remove(sessionId);
    return sessions.isEmpty() ? null : sessions;
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking the time to review this, I appreciate it! I'd prefer to keep the add/remove logic inline here, though. This change is specifically about bringing the reactive registry into line with the blocking SessionRegistryImpl, which performs the exact same mutation inline inside compute/computeIfPresent (same shape, same sessionsUsedByPrincipal parameter name). Keeping it inline mirrors that sibling and keeps the diff minimal.

Extracting also has a couple of downsides here: the proposed addSessionToSet(String principal, ...) leaves principal unused, and pulling the body out of the remapping function makes the "this must run inside the atomic compute block" intent a bit less visible (it doesn't break atomicity as long as the helper is used as the remapping function, but it's easier to misuse later). Happy to revisit if a maintainer prefers the extracted form.

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -103,4 +107,55 @@ void updateLastAccessTimeThenUpdated() {
assertThat(saved.getLastAccessTime()).isAfter(lastAccessTimeBefore);
}

// Removing the last session of a principal must not race with a concurrent save for
// the same principal. Previously remove pruned the principal key with a non-atomic
// isEmpty()-then-remove, which could drop a session added concurrently.
@Test
void saveAndRemoveConcurrentlyThenAddedSessionNotLost() throws Exception {
Authentication authentication = TestAuthentication.authenticatedUser();
Object principal = authentication.getPrincipal();
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
for (int i = 0; i < 1000; i++) {
String existing = "existing-" + i;
String added = "added-" + i;
this.sessionRegistry
.saveSessionInformation(new ReactiveSessionInformation(principal, existing, this.now))
.block();
CountDownLatch start = new CountDownLatch(1);
Future<?> remove = executor.submit(() -> {
awaitUninterruptibly(start);
this.sessionRegistry.removeSessionInformation(existing).block();
});
Future<?> save = executor.submit(() -> {
awaitUninterruptibly(start);
this.sessionRegistry
.saveSessionInformation(new ReactiveSessionInformation(principal, added, this.now))
.block();
});
start.countDown();
remove.get();
save.get();
List<ReactiveSessionInformation> sessions = this.sessionRegistry.getAllSessions(principal)
.collectList()
.block();
assertThat(sessions).extracting(ReactiveSessionInformation::getSessionId).contains(added);
this.sessionRegistry.removeSessionInformation(added).block();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this iterative block can be declared as a function - would be modular

@junhyeong9812 junhyeong9812 Jun 15, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I left the loop body inline because it's a single save/remove-concurrently/assert race scenario run under load, and only this one test uses it — keeping the setup, the concurrent actions, and the assertion together makes the race being exercised easier to audit, and a failure points straight at the relevant lines. I can extract it into a helper if you feel strongly, but I'm not sure it adds much here.

}
}
finally {
executor.shutdownNow();
}
}

private static void awaitUninterruptibly(CountDownLatch latch) {
try {
latch.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}

}