-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add support for server selection's deprioritized servers to all topologies. #1860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
282bf57
cb8905c
74ae21f
f39b7a0
2a4cd70
fa35dd9
7b613e4
7e30f2a
f20b482
09f24dd
733ee91
f324ab9
103c4b0
021c262
08b3466
98caa34
a6587fa
2bc1b2e
8a8bf0a
cf5c484
32609f6
7d3d862
eff9e23
78d685d
ff279de
2f06765
cdee4fa
12ceda7
b16bfb0
a7fc3fc
1d15a73
3e2c0cd
9a84fc0
36c83c3
9b81ebd
2df4580
db936a1
64dd4f4
cd67117
4310894
f57616d
0a4f04f
c736e6a
c97cb65
012aa6b
471cadb
6767707
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,9 @@ | |
| import static com.mongodb.connection.ServerDescription.MIN_DRIVER_SERVER_VERSION; | ||
| import static com.mongodb.connection.ServerDescription.MIN_DRIVER_WIRE_VERSION; | ||
| import static com.mongodb.internal.Locks.withInterruptibleLock; | ||
| import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE; | ||
| import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; | ||
| import static com.mongodb.internal.VisibleForTesting.AccessModifier.PROTECTED; | ||
| import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents; | ||
| import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; | ||
| import static com.mongodb.internal.logging.LogMessage.Component.SERVER_SELECTION; | ||
|
|
@@ -94,7 +96,8 @@ | |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; | ||
| import static java.util.stream.Collectors.toList; | ||
|
|
||
| abstract class BaseCluster implements Cluster { | ||
| @VisibleForTesting(otherwise = PROTECTED) | ||
| public abstract class BaseCluster implements Cluster { | ||
| private static final Logger LOGGER = Loggers.getLogger("cluster"); | ||
| private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster"); | ||
|
|
||
|
|
@@ -112,10 +115,11 @@ abstract class BaseCluster implements Cluster { | |
| private volatile boolean isClosed; | ||
| private volatile ClusterDescription description; | ||
|
|
||
| BaseCluster(final ClusterId clusterId, | ||
| final ClusterSettings settings, | ||
| final ClusterableServerFactory serverFactory, | ||
| final ClientMetadata clientMetadata) { | ||
| @VisibleForTesting(otherwise = PACKAGE) | ||
| protected BaseCluster(final ClusterId clusterId, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The value of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in: 08b3466 |
||
| final ClusterSettings settings, | ||
| final ClusterableServerFactory serverFactory, | ||
| final ClientMetadata clientMetadata) { | ||
| this.clusterId = notNull("clusterId", clusterId); | ||
| this.settings = notNull("settings", settings); | ||
| this.serverFactory = notNull("serverFactory", serverFactory); | ||
|
|
@@ -361,8 +365,7 @@ private static ServerSelector getCompleteServerSelector( | |
| final ClusterSettings settings) { | ||
| List<ServerSelector> selectors = Stream.of( | ||
| getRaceConditionPreFilteringSelector(serversSnapshot), | ||
| serverSelector, | ||
| serverDeprioritization.getServerSelector(), | ||
| serverDeprioritization.apply(serverSelector), | ||
| settings.getServerSelector(), // may be null | ||
| new LatencyMinimizingServerSelector(settings.getLocalThreshold(MILLISECONDS), MILLISECONDS), | ||
| AtMostTwoRandomServerSelector.instance(), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,6 @@ | |
| import com.mongodb.ServerAddress; | ||
| import com.mongodb.ServerApi; | ||
| import com.mongodb.connection.ClusterDescription; | ||
| import com.mongodb.connection.ClusterType; | ||
| import com.mongodb.connection.ServerDescription; | ||
| import com.mongodb.internal.IgnorableRequestContext; | ||
| import com.mongodb.internal.TimeoutContext; | ||
|
|
@@ -40,6 +39,7 @@ | |
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE; | ||
| import static java.util.stream.Collectors.toList; | ||
|
|
||
| /** | ||
|
|
@@ -76,7 +76,7 @@ public OperationContext(final RequestContext requestContext, final SessionContex | |
| null); | ||
| } | ||
|
|
||
| public static OperationContext simpleOperationContext( | ||
| static OperationContext simpleOperationContext( | ||
| final TimeoutSettings timeoutSettings, @Nullable final ServerApi serverApi) { | ||
| return new OperationContext( | ||
| IgnorableRequestContext.INSTANCE, | ||
|
|
@@ -113,6 +113,15 @@ public OperationContext withOperationName(final String operationName) { | |
| operationName, tracingSpan); | ||
| } | ||
|
|
||
| /** | ||
| * TODO-JAVA-6058: This method enables overriding the ServerDeprioritization state. | ||
| * It is a temporary solution to handle cases where deprioritization state persists across operations. | ||
| */ | ||
| public OperationContext withNewServerDeprioritization() { | ||
| return new OperationContext(id, requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), tracingManager, serverApi, | ||
| operationName, tracingSpan); | ||
| } | ||
|
|
||
| public long getId() { | ||
| return id; | ||
| } | ||
|
|
@@ -152,8 +161,7 @@ public void setTracingSpan(final Span tracingSpan) { | |
| this.tracingSpan = tracingSpan; | ||
| } | ||
|
|
||
| @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) | ||
| public OperationContext(final long id, | ||
| private OperationContext(final long id, | ||
| final RequestContext requestContext, | ||
| final SessionContext sessionContext, | ||
| final TimeoutContext timeoutContext, | ||
|
|
@@ -174,26 +182,6 @@ public OperationContext(final long id, | |
| this.tracingSpan = tracingSpan; | ||
| } | ||
|
|
||
| @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) | ||
| public OperationContext(final long id, | ||
| final RequestContext requestContext, | ||
| final SessionContext sessionContext, | ||
| final TimeoutContext timeoutContext, | ||
| final TracingManager tracingManager, | ||
| @Nullable final ServerApi serverApi, | ||
| @Nullable final String operationName) { | ||
| this.id = id; | ||
| this.serverDeprioritization = new ServerDeprioritization(); | ||
| this.requestContext = requestContext; | ||
| this.sessionContext = sessionContext; | ||
| this.timeoutContext = timeoutContext; | ||
| this.tracingManager = tracingManager; | ||
| this.serverApi = serverApi; | ||
| this.operationName = operationName; | ||
| this.tracingSpan = null; | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * @return The same {@link ServerDeprioritization} if called on the same {@link OperationContext}. | ||
| */ | ||
|
|
@@ -228,24 +216,27 @@ public static final class ServerDeprioritization { | |
| @Nullable | ||
| private ServerAddress candidate; | ||
| private final Set<ServerAddress> deprioritized; | ||
| private final DeprioritizingSelector selector; | ||
|
|
||
| private ServerDeprioritization() { | ||
| candidate = null; | ||
| deprioritized = new HashSet<>(); | ||
| selector = new DeprioritizingSelector(); | ||
| } | ||
|
|
||
| /** | ||
| * The returned {@link ServerSelector} tries to {@linkplain ServerSelector#select(ClusterDescription) select} | ||
| * only the {@link ServerDescription}s that do not have deprioritized {@link ServerAddress}es. | ||
| * If no such {@link ServerDescription} can be selected, then it selects {@link ClusterDescription#getServerDescriptions()}. | ||
| * The returned {@link ServerSelector} wraps the provided selector and attempts | ||
| * {@linkplain ServerSelector#select(ClusterDescription) server selection} in two passes: | ||
| * <ol> | ||
| * <li>First pass: selects using the wrapped selector with only non-deprioritized {@link ServerDescription}s.</li> | ||
| * <li>Second pass: if the first pass selects no {@link ServerDescription}s, | ||
| * selects using the wrapped selector again with all {@link ServerDescription}s, including deprioritized ones.</li> | ||
| * </ol> | ||
| */ | ||
| ServerSelector getServerSelector() { | ||
| return selector; | ||
| ServerSelector apply(final ServerSelector wrappedSelector) { | ||
| return new DeprioritizingSelector(wrappedSelector); | ||
| } | ||
|
|
||
| void updateCandidate(final ServerAddress serverAddress) { | ||
| @VisibleForTesting(otherwise = PACKAGE) | ||
| public void updateCandidate(final ServerAddress serverAddress) { | ||
| candidate = serverAddress; | ||
| } | ||
|
|
||
|
|
@@ -263,24 +254,40 @@ public void onAttemptFailure(final Throwable failure) { | |
| * which indeed may be used concurrently. {@link DeprioritizingSelector} does not need to be thread-safe. | ||
| */ | ||
| private final class DeprioritizingSelector implements ServerSelector { | ||
| private DeprioritizingSelector() { | ||
| private final ServerSelector wrappedSelector; | ||
|
|
||
| private DeprioritizingSelector(final ServerSelector wrappedSelector) { | ||
| this.wrappedSelector = wrappedSelector; | ||
| } | ||
|
|
||
| @Override | ||
| public List<ServerDescription> select(final ClusterDescription clusterDescription) { | ||
| List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions(); | ||
| if (!isEnabled(clusterDescription.getType())) { | ||
| return serverDescriptions; | ||
|
|
||
| // TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection. | ||
| if (serverDescriptions.size() == 1 || deprioritized.isEmpty()) { | ||
| return wrappedSelector.select(clusterDescription); | ||
| } | ||
|
|
||
| // TODO-JAVA-5908: Evaluate whether using a loop instead of Stream has a meaningful performance impact on server selection. | ||
| List<ServerDescription> nonDeprioritizedServerDescriptions = serverDescriptions | ||
| .stream() | ||
| .filter(serverDescription -> !deprioritized.contains(serverDescription.getAddress())) | ||
| .collect(toList()); | ||
|
Comment on lines
+268
to
276
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave a The open questions here are:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return nonDeprioritizedServerDescriptions.isEmpty() ? serverDescriptions : nonDeprioritizedServerDescriptions; | ||
| } | ||
|
|
||
| private boolean isEnabled(final ClusterType clusterType) { | ||
| return clusterType == ClusterType.SHARDED; | ||
| // TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection. | ||
| if (nonDeprioritizedServerDescriptions.isEmpty()) { | ||
| return wrappedSelector.select(clusterDescription); | ||
| } | ||
|
|
||
| List<ServerDescription> selected = wrappedSelector.select( | ||
| new ClusterDescription( | ||
| clusterDescription.getConnectionMode(), | ||
| clusterDescription.getType(), | ||
| nonDeprioritizedServerDescriptions, | ||
| clusterDescription.getClusterSettings(), | ||
| clusterDescription.getServerSettings())); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use the public ClusterDescription(final ClusterConnectionMode connectionMode, final ClusterType type,
@Nullable final MongoException srvResolutionException,
final List<ServerDescription> serverDescriptions,
@Nullable final ClusterSettings clusterSettings,
@Nullable final ServerSettings serverSettings) {constructor. |
||
| return selected.isEmpty() ? wrappedSelector.select(clusterDescription) : selected; | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of
otherwiseis incorrect here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in: 0a4f04f