Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
282bf57
Add support for server selection's deprioritized servers to all topol…
vbabanin Jan 12, 2026
cb8905c
Remove global OperationContext in tests as it has mutable state.
vbabanin Jan 15, 2026
74ae21f
Merge branch 'main' into JAVA-6021
vbabanin Jan 15, 2026
f39b7a0
Merge branch 'main' into JAVA-6021
vbabanin Jan 17, 2026
2a4cd70
Add more test-cases.
vbabanin Jan 17, 2026
fa35dd9
Fix static checks.
vbabanin Jan 17, 2026
7b613e4
Allow invoking connect.
vbabanin Jan 18, 2026
7e30f2a
Merge branch 'main' into JAVA-6021
vbabanin Feb 10, 2026
f20b482
Bump specification commit.
vbabanin Feb 10, 2026
09f24dd
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
vbabanin Mar 4, 2026
733ee91
Update driver-core/src/test/unit/com/mongodb/connection/ServerSelecti…
vbabanin Mar 5, 2026
f324ab9
Update driver-core/src/test/unit/com/mongodb/connection/ServerSelecti…
vbabanin Mar 5, 2026
103c4b0
Use "create" prefix consistently.
vbabanin Mar 3, 2026
021c262
Remove craeteNewOperationContext.
vbabanin Mar 4, 2026
08b3466
Change visibility modifier.
vbabanin Mar 4, 2026
98caa34
Rename applyDeprioritization method to apply.
vbabanin Mar 4, 2026
a6587fa
Add TODO comments for performance optimization.
vbabanin Mar 4, 2026
2bc1b2e
Add empty line.
vbabanin Mar 4, 2026
8a8bf0a
Return ALL_SERVERS from selector.
vbabanin Mar 4, 2026
cf5c484
Make heartbeatFrequencyMS a local variable.
vbabanin Mar 4, 2026
32609f6
Revert to assumeFalse.
vbabanin Mar 4, 2026
7d3d862
Use Junit4.
vbabanin Mar 5, 2026
eff9e23
Convert comment to a class documentation comment.
vbabanin Mar 5, 2026
78d685d
Remove deprioritizedServerAddresses field.
vbabanin Mar 5, 2026
ff279de
Move inLatencyWindowServers to pre-try.
vbabanin Mar 5, 2026
2f06765
Remove unnecessary symbol.
vbabanin Mar 5, 2026
cdee4fa
Add assertion context.
vbabanin Mar 5, 2026
12ceda7
Remove redundant comment.
vbabanin Mar 5, 2026
b16bfb0
Use immediate timeout.
vbabanin Mar 5, 2026
a7fc3fc
Use MongoException instead of MongoConfigurationException.
vbabanin Mar 5, 2026
1d15a73
Remove createTestCluster method.
vbabanin Mar 5, 2026
3e2c0cd
Use com.mongodb.assertions.Assertions.fail for a Fake cluster.
vbabanin Mar 5, 2026
9a84fc0
Change message.
vbabanin Mar 5, 2026
36c83c3
Update driver-core/src/test/unit/com/mongodb/internal/connection/Serv…
vbabanin Mar 5, 2026
9b81ebd
Use MongoMockito.
vbabanin Mar 5, 2026
2df4580
Remove TODO.
vbabanin Mar 5, 2026
db936a1
Use withNewServerDeprioritization.
vbabanin Mar 5, 2026
64dd4f4
Make ServerDeprioritization private.
vbabanin Mar 5, 2026
cd67117
Remove unused methods.
vbabanin Mar 5, 2026
4310894
Reuse OperationContext.
vbabanin Mar 5, 2026
f57616d
Use a documentation comment.
vbabanin Mar 5, 2026
0a4f04f
Use VisibleForTesting(otherwise = PROTECTED).
vbabanin Mar 5, 2026
c736e6a
Reuse OperatioContext.
vbabanin Mar 5, 2026
c97cb65
Change comments.
vbabanin Mar 5, 2026
012aa6b
Change comments.
vbabanin Mar 5, 2026
471cadb
Merge branch 'main' into JAVA-6021
vbabanin Mar 5, 2026
6767707
Use createOperationContext.
vbabanin Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_SERVER_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_WIRE_VERSION;
import static com.mongodb.internal.Locks.withInterruptibleLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PROTECTED;
import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static com.mongodb.internal.logging.LogMessage.Component.SERVER_SELECTION;
Expand All @@ -94,7 +96,8 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toList;

abstract class BaseCluster implements Cluster {
@VisibleForTesting(otherwise = PROTECTED)
public abstract class BaseCluster implements Cluster {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of otherwise is incorrect here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in: 0a4f04f

private static final Logger LOGGER = Loggers.getLogger("cluster");
private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster");

Expand All @@ -112,10 +115,11 @@ abstract class BaseCluster implements Cluster {
private volatile boolean isClosed;
private volatile ClusterDescription description;

BaseCluster(final ClusterId clusterId,
final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
@VisibleForTesting(otherwise = PACKAGE)
protected BaseCluster(final ClusterId clusterId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of otherwise is incorrect here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in: 08b3466

final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
this.clusterId = notNull("clusterId", clusterId);
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
Expand Down Expand Up @@ -361,8 +365,7 @@ private static ServerSelector getCompleteServerSelector(
final ClusterSettings settings) {
List<ServerSelector> selectors = Stream.of(
getRaceConditionPreFilteringSelector(serversSnapshot),
serverSelector,
serverDeprioritization.getServerSelector(),
serverDeprioritization.apply(serverSelector),
settings.getServerSelector(), // may be null
new LatencyMinimizingServerSelector(settings.getLocalThreshold(MILLISECONDS), MILLISECONDS),
AtMostTwoRandomServerSelector.instance(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.mongodb.ServerAddress;
import com.mongodb.ServerApi;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.IgnorableRequestContext;
import com.mongodb.internal.TimeoutContext;
Expand All @@ -40,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE;
import static java.util.stream.Collectors.toList;

/**
Expand Down Expand Up @@ -76,7 +76,7 @@ public OperationContext(final RequestContext requestContext, final SessionContex
null);
}

public static OperationContext simpleOperationContext(
static OperationContext simpleOperationContext(
final TimeoutSettings timeoutSettings, @Nullable final ServerApi serverApi) {
return new OperationContext(
IgnorableRequestContext.INSTANCE,
Expand Down Expand Up @@ -113,6 +113,15 @@ public OperationContext withOperationName(final String operationName) {
operationName, tracingSpan);
}

/**
* TODO-JAVA-6058: This method enables overriding the ServerDeprioritization state.
* It is a temporary solution to handle cases where deprioritization state persists across operations.
*/
public OperationContext withNewServerDeprioritization() {
return new OperationContext(id, requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), tracingManager, serverApi,
operationName, tracingSpan);
}

public long getId() {
return id;
}
Expand Down Expand Up @@ -152,8 +161,7 @@ public void setTracingSpan(final Span tracingSpan) {
this.tracingSpan = tracingSpan;
}

@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public OperationContext(final long id,
private OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
Expand All @@ -174,26 +182,6 @@ public OperationContext(final long id,
this.tracingSpan = tracingSpan;
}

@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
final TracingManager tracingManager,
@Nullable final ServerApi serverApi,
@Nullable final String operationName) {
this.id = id;
this.serverDeprioritization = new ServerDeprioritization();
this.requestContext = requestContext;
this.sessionContext = sessionContext;
this.timeoutContext = timeoutContext;
this.tracingManager = tracingManager;
this.serverApi = serverApi;
this.operationName = operationName;
this.tracingSpan = null;
}


/**
* @return The same {@link ServerDeprioritization} if called on the same {@link OperationContext}.
*/
Expand Down Expand Up @@ -228,24 +216,27 @@ public static final class ServerDeprioritization {
@Nullable
private ServerAddress candidate;
private final Set<ServerAddress> deprioritized;
private final DeprioritizingSelector selector;

private ServerDeprioritization() {
candidate = null;
deprioritized = new HashSet<>();
selector = new DeprioritizingSelector();
}

/**
* The returned {@link ServerSelector} tries to {@linkplain ServerSelector#select(ClusterDescription) select}
* only the {@link ServerDescription}s that do not have deprioritized {@link ServerAddress}es.
* If no such {@link ServerDescription} can be selected, then it selects {@link ClusterDescription#getServerDescriptions()}.
* The returned {@link ServerSelector} wraps the provided selector and attempts
* {@linkplain ServerSelector#select(ClusterDescription) server selection} in two passes:
* <ol>
* <li>First pass: selects using the wrapped selector with only non-deprioritized {@link ServerDescription}s.</li>
* <li>Second pass: if the first pass selects no {@link ServerDescription}s,
* selects using the wrapped selector again with all {@link ServerDescription}s, including deprioritized ones.</li>
* </ol>
*/
ServerSelector getServerSelector() {
return selector;
ServerSelector apply(final ServerSelector wrappedSelector) {
return new DeprioritizingSelector(wrappedSelector);
}

void updateCandidate(final ServerAddress serverAddress) {
@VisibleForTesting(otherwise = PACKAGE)
public void updateCandidate(final ServerAddress serverAddress) {
candidate = serverAddress;
}

Expand All @@ -263,24 +254,40 @@ public void onAttemptFailure(final Throwable failure) {
* which indeed may be used concurrently. {@link DeprioritizingSelector} does not need to be thread-safe.
*/
private final class DeprioritizingSelector implements ServerSelector {
private DeprioritizingSelector() {
private final ServerSelector wrappedSelector;

private DeprioritizingSelector(final ServerSelector wrappedSelector) {
this.wrappedSelector = wrappedSelector;
}

@Override
public List<ServerDescription> select(final ClusterDescription clusterDescription) {
List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions();
if (!isEnabled(clusterDescription.getType())) {
return serverDescriptions;

// TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection.
if (serverDescriptions.size() == 1 || deprioritized.isEmpty()) {
return wrappedSelector.select(clusterDescription);
}

// TODO-JAVA-5908: Evaluate whether using a loop instead of Stream has a meaningful performance impact on server selection.
List<ServerDescription> nonDeprioritizedServerDescriptions = serverDescriptions
.stream()
.filter(serverDescription -> !deprioritized.contains(serverDescription.getAddress()))
.collect(toList());
Comment on lines +268 to 276
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave a TODO-JAVA-XXXX comment linking this code (the if optimization as well as the use of Stream) to the existing performance ticket about server selection. We should also mention in the description of that ticket that there are TODO-JAVA-XXXX comments that needs to be addressed.

The open questions here are:

  • whether the if optimization is worth itl
  • whether we should use a loop instead of using Stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nonDeprioritizedServerDescriptions.isEmpty() ? serverDescriptions : nonDeprioritizedServerDescriptions;
}

private boolean isEnabled(final ClusterType clusterType) {
return clusterType == ClusterType.SHARDED;
// TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection.
if (nonDeprioritizedServerDescriptions.isEmpty()) {
return wrappedSelector.select(clusterDescription);
}

List<ServerDescription> selected = wrappedSelector.select(
new ClusterDescription(
clusterDescription.getConnectionMode(),
clusterDescription.getType(),
nonDeprioritizedServerDescriptions,
clusterDescription.getClusterSettings(),
clusterDescription.getServerSettings()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the

public ClusterDescription(final ClusterConnectionMode connectionMode, final ClusterType type,
                          @Nullable final MongoException srvResolutionException,
                          final List<ServerDescription> serverDescriptions,
                          @Nullable final ClusterSettings clusterSettings,
                          @Nullable final ServerSettings serverSettings) {

constructor.

return selected.isEmpty() ? wrappedSelector.select(clusterDescription) : selected;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T
final int maxWireVersion) {
this.changeStreamOperation = changeStreamOperation;
this.binding = binding.retain();
this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride);
this.initialOperationContext = operationContext
.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride)
.withNewServerDeprioritization();
this.wrapped = wrapped;
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
Expand Down
Loading