-
Notifications
You must be signed in to change notification settings - Fork 6.3k
Make InMemoryReactiveSessionRegistry updates atomic #19339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,8 +59,16 @@ public Flux<ReactiveSessionInformation> getAllSessions(Object principal) { | |
| @Override | ||
| public Mono<Void> saveSessionInformation(ReactiveSessionInformation information) { | ||
| this.sessionById.put(information.getSessionId(), information); | ||
| this.sessionIdsByPrincipal.computeIfAbsent(information.getPrincipal(), (key) -> new CopyOnWriteArraySet<>()) | ||
| .add(information.getSessionId()); | ||
| // Add the session id inside the compute so that it cannot race with the key | ||
| // removal performed by removeSessionInformation (which could otherwise drop a | ||
| // concurrently added session). This mirrors the blocking SessionRegistryImpl. | ||
| this.sessionIdsByPrincipal.compute(information.getPrincipal(), (key, sessionsUsedByPrincipal) -> { | ||
| if (sessionsUsedByPrincipal == null) { | ||
| sessionsUsedByPrincipal = new CopyOnWriteArraySet<>(); | ||
| } | ||
| sessionsUsedByPrincipal.add(information.getSessionId()); | ||
| return sessionsUsedByPrincipal; | ||
| }); | ||
| return Mono.empty(); | ||
| } | ||
|
|
||
|
|
@@ -73,13 +81,14 @@ public Mono<ReactiveSessionInformation> getSessionInformation(String sessionId) | |
| public Mono<ReactiveSessionInformation> removeSessionInformation(String sessionId) { | ||
| return getSessionInformation(sessionId).doOnNext((sessionInformation) -> { | ||
| this.sessionById.remove(sessionId); | ||
| Set<String> sessionsUsedByPrincipal = this.sessionIdsByPrincipal.get(sessionInformation.getPrincipal()); | ||
| if (sessionsUsedByPrincipal != null) { | ||
| sessionsUsedByPrincipal.remove(sessionId); | ||
| if (sessionsUsedByPrincipal.isEmpty()) { | ||
| this.sessionIdsByPrincipal.remove(sessionInformation.getPrincipal()); | ||
| } | ||
| } | ||
| // Remove and prune atomically so the principal key is dropped only while its | ||
| // set is empty; otherwise a session added concurrently could be lost. Mirrors | ||
| // the blocking SessionRegistryImpl. | ||
| this.sessionIdsByPrincipal.computeIfPresent(sessionInformation.getPrincipal(), | ||
| (key, sessionsUsedByPrincipal) -> { | ||
| sessionsUsedByPrincipal.remove(sessionId); | ||
| return sessionsUsedByPrincipal.isEmpty() ? null : sessionsUsedByPrincipal; | ||
| }); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for taking the time to review this, I appreciate it! I'd prefer to keep the add/remove logic inline here, though. This change is specifically about bringing the reactive registry into line with the blocking Extracting also has a couple of downsides here: the proposed |
||
| }); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,10 @@ | |
| import java.time.LocalDate; | ||
| import java.time.ZoneOffset; | ||
| import java.util.List; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
|
|
@@ -103,4 +107,55 @@ void updateLastAccessTimeThenUpdated() { | |
| assertThat(saved.getLastAccessTime()).isAfter(lastAccessTimeBefore); | ||
| } | ||
|
|
||
| // Removing the last session of a principal must not race with a concurrent save for | ||
| // the same principal. Previously remove pruned the principal key with a non-atomic | ||
| // isEmpty()-then-remove, which could drop a session added concurrently. | ||
| @Test | ||
| void saveAndRemoveConcurrentlyThenAddedSessionNotLost() throws Exception { | ||
| Authentication authentication = TestAuthentication.authenticatedUser(); | ||
| Object principal = authentication.getPrincipal(); | ||
| ExecutorService executor = Executors.newFixedThreadPool(2); | ||
| try { | ||
| for (int i = 0; i < 1000; i++) { | ||
| String existing = "existing-" + i; | ||
| String added = "added-" + i; | ||
| this.sessionRegistry | ||
| .saveSessionInformation(new ReactiveSessionInformation(principal, existing, this.now)) | ||
| .block(); | ||
| CountDownLatch start = new CountDownLatch(1); | ||
| Future<?> remove = executor.submit(() -> { | ||
| awaitUninterruptibly(start); | ||
| this.sessionRegistry.removeSessionInformation(existing).block(); | ||
| }); | ||
| Future<?> save = executor.submit(() -> { | ||
| awaitUninterruptibly(start); | ||
| this.sessionRegistry | ||
| .saveSessionInformation(new ReactiveSessionInformation(principal, added, this.now)) | ||
| .block(); | ||
| }); | ||
| start.countDown(); | ||
| remove.get(); | ||
| save.get(); | ||
| List<ReactiveSessionInformation> sessions = this.sessionRegistry.getAllSessions(principal) | ||
| .collectList() | ||
| .block(); | ||
| assertThat(sessions).extracting(ReactiveSessionInformation::getSessionId).contains(added); | ||
| this.sessionRegistry.removeSessionInformation(added).block(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this iterative block can be declared as a function - would be modular
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I left the loop body inline because it's a single save/remove-concurrently/assert race scenario run under load, and only this one test uses it — keeping the setup, the concurrent actions, and the assertion together makes the race being exercised easier to audit, and a failure points straight at the relevant lines. I can extract it into a helper if you feel strongly, but I'm not sure it adds much here. |
||
| } | ||
| } | ||
| finally { | ||
| executor.shutdownNow(); | ||
| } | ||
| } | ||
|
|
||
| private static void awaitUninterruptibly(CountDownLatch latch) { | ||
| try { | ||
| latch.await(); | ||
| } | ||
| catch (InterruptedException ex) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sessionsUsedByPrincipal->sessionsUh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that
sessionsUsedByPrincipalhere is thecompute/computeIfPresentremapping parameter. I kept that name intentionally to mirror the blockingSessionRegistryImpl, which uses the exact samecompute/computeIfPresentshape and the samesessionsUsedByPrincipalparameter name — this PR is largely about aligning the reactive registry with that sibling, so I'd lean towards keeping it for consistency. That said, it's purely stylistic, so I'm happy to shorten it tosessionsin both lambdas if maintainers prefer.