diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 585b0eae..b2e480fb 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -5,7 +5,6 @@ on: push: branches: - main - - dev/v3 paths-ignore: - "**/*.md" - "**/*.jpg" @@ -14,6 +13,9 @@ on: - "docs/**" - "ISSUE_TEMPLATE/**" - "**/remove-old-artifacts.yml" + pull_request: + branches: + - dev/v3 permissions: id-token: write # This is required for requesting the JWT diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 69a7d034..5cbeaea7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -8,6 +8,7 @@ on: pull_request: branches: - "*" + - dev/v3 permissions: contents: read diff --git a/common/lib/connection_info.ts b/common/lib/connection_info.ts new file mode 100644 index 00000000..2d195120 --- /dev/null +++ b/common/lib/connection_info.ts @@ -0,0 +1,35 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ClientWrapper } from "./client_wrapper"; + +export class ConnectionInfo { + private readonly _client: ClientWrapper; + private readonly _isPooled: boolean; + + constructor(client: ClientWrapper, isPooled: boolean) { + this._client = client; + this._isPooled = isPooled; + } + + get client(): ClientWrapper { + return this._client; + } + + get isPooled(): boolean { + return this._isPooled; + } +} diff --git a/common/lib/connection_provider.ts b/common/lib/connection_provider.ts index 8da16f5d..9f60b31b 100644 --- a/common/lib/connection_provider.ts +++ b/common/lib/connection_provider.ts @@ -17,10 +17,10 @@ import { HostRole } from "./host_role"; import { HostInfo } from "./host_info"; import { PluginService } from "./plugin_service"; -import { ClientWrapper } from "./client_wrapper"; +import { ConnectionInfo } from "./connection_info"; export interface ConnectionProvider { - connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise; + connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise; acceptsUrl(hostInfo: HostInfo, props: Map): boolean; acceptsStrategy(role: HostRole, strategy: string): boolean; getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map): HostInfo; diff --git a/common/lib/driver_connection_provider.ts b/common/lib/driver_connection_provider.ts index 7affca52..6a4cd034 100644 --- a/common/lib/driver_connection_provider.ts +++ b/common/lib/driver_connection_provider.ts @@ -30,6 +30,7 @@ import { logger } from "../logutils"; import { ClientWrapper } from "./client_wrapper"; import { RoundRobinHostSelector } from "./round_robin_host_selector"; import { DriverDialect } from "./driver_dialect/driver_dialect"; +import { ConnectionInfo } from "./connection_info"; export class DriverConnectionProvider implements ConnectionProvider { private static readonly acceptedStrategies: Map = new Map([ @@ -46,7 +47,7 @@ export class DriverConnectionProvider implements ConnectionProvider { return true; } - async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { + async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { let resultTargetClient; const resultProps = new Map(props); resultProps.set(WrapperProperties.HOST.name, hostInfo.host); @@ -92,7 +93,7 @@ export class DriverConnectionProvider implements ConnectionProvider { resultTargetClient = driverDialect.connect(hostInfo, resultProps); } pluginService.attachErrorListener(resultTargetClient); - return resultTargetClient; + return new ConnectionInfo(resultTargetClient, false); } getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map): HostInfo { diff --git a/common/lib/internal_pooled_connection_provider.ts b/common/lib/internal_pooled_connection_provider.ts index 300442ad..fe028fe9 100644 --- a/common/lib/internal_pooled_connection_provider.ts +++ b/common/lib/internal_pooled_connection_provider.ts @@ -39,6 +39,7 @@ import { LeastConnectionsHostSelector } from "./least_connections_host_selector" import { PoolClientWrapper } from "./pool_client_wrapper"; import { logger } from "../logutils"; import { SlidingExpirationCacheWithCleanupTask } from "./utils/sliding_expiration_cache_with_cleanup_task"; +import { ConnectionInfo } from "./connection_info"; export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources { static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes @@ -79,7 +80,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide return RdsUrlType.RDS_INSTANCE === urlType; } - async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { + async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { const resultProps = new Map(props); resultProps.set(WrapperProperties.HOST.name, hostInfo.host); if (hostInfo.isPortSpecified()) { @@ -122,7 +123,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide const poolClient = await this.getPoolConnection(connectionHostInfo, props); pluginService.attachErrorListener(poolClient); - return poolClient; + return new ConnectionInfo(poolClient, true); } async getPoolConnection(hostInfo: HostInfo, props: Map) { diff --git a/common/lib/plugin_service.ts b/common/lib/plugin_service.ts index e2aae506..9aa5e732 100644 --- a/common/lib/plugin_service.ts +++ b/common/lib/plugin_service.ts @@ -150,6 +150,10 @@ export interface PluginService extends ErrorHandler { getStatus(clazz: any, key: string): T; isPluginInUse(plugin: any): boolean; + + isPooledClient(): boolean; + + setIsPooledClient(isPooledClient: boolean): void; } export class PluginServiceImpl implements PluginService, HostListProviderService { @@ -172,6 +176,8 @@ export class PluginServiceImpl implements PluginService, HostListProviderService protected static readonly statusesExpiringCache: CacheMap = new CacheMap(); protected static readonly DEFAULT_STATUS_CACHE_EXPIRE_NANO: number = 3_600_000_000_000; // 60 minutes + protected pooledClient: boolean | null = null; + constructor( container: PluginServiceManagerContainer, client: AwsClient, @@ -782,4 +788,12 @@ export class PluginServiceImpl implements PluginService, HostListProviderService isPluginInUse(plugin: any) { return this.pluginServiceManagerContainer.pluginManager!.isPluginInUse(plugin); } + + isPooledClient(): boolean | null { + return this.pooledClient; + } + + setIsPooledClient(isPooledClient: boolean): void { + this.pooledClient = isPooledClient; + } } diff --git a/common/lib/plugins/default_plugin.ts b/common/lib/plugins/default_plugin.ts index ddfcf5f9..3af9b3fc 100644 --- a/common/lib/plugins/default_plugin.ts +++ b/common/lib/plugins/default_plugin.ts @@ -29,6 +29,7 @@ import { AwsWrapperError } from "../utils/errors"; import { HostAvailability } from "../host_availability/host_availability"; import { ClientWrapper } from "../client_wrapper"; import { TelemetryTraceLevel } from "../utils/telemetry/telemetry_trace_level"; +import { ConnectionInfo } from "../connection_info"; export class DefaultPlugin extends AbstractConnectionPlugin { id: string = uniqueId("_defaultPlugin"); @@ -79,10 +80,11 @@ export class DefaultPlugin extends AbstractConnectionPlugin { TelemetryTraceLevel.NESTED ); - const result = await telemetryContext.start(async () => await connProvider.connect(hostInfo, this.pluginService, props)); + const result: ConnectionInfo = await telemetryContext.start(async () => await connProvider.connect(hostInfo, this.pluginService, props)); this.pluginService.setAvailability(hostInfo.allAliases, HostAvailability.AVAILABLE); - await this.pluginService.updateDialect(result); - return result; + this.pluginService.setIsPooledClient(result.isPooled); + await this.pluginService.updateDialect(result.client); + return result.client; } override async execute(methodName: string, methodFunc: () => Promise): Promise { diff --git a/common/lib/plugins/read_write_splitting/abstract_read_write_splitting_plugin.ts b/common/lib/plugins/read_write_splitting/abstract_read_write_splitting_plugin.ts new file mode 100644 index 00000000..19e36a1b --- /dev/null +++ b/common/lib/plugins/read_write_splitting/abstract_read_write_splitting_plugin.ts @@ -0,0 +1,318 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { CanReleaseResources } from "../../can_release_resources"; +import { AbstractConnectionPlugin } from "../../abstract_connection_plugin"; +import { ClientWrapper } from "../../client_wrapper"; +import { PluginService } from "../../plugin_service"; +import { HostListProviderService } from "../../host_list_provider_service"; +import { HostInfo } from "../../host_info"; +import { HostChangeOptions } from "../../host_change_options"; +import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; +import { Messages } from "../../utils/messages"; +import { logger } from "../../../logutils"; +import { HostRole } from "../../host_role"; +import { SqlMethodUtils } from "../../utils/sql_method_utils"; +import { FailoverError } from "../../utils/errors"; +import { WrapperProperties } from "../../wrapper_property"; +import { PoolClientWrapper } from "../../pool_client_wrapper"; +import { convertMsToNanos, getTimeInNanos, logAndThrowError } from "../../utils/utils"; +import { CacheItem } from "../../utils/cache_item"; + +export abstract class AbstractReadWriteSplittingPlugin extends AbstractConnectionPlugin implements CanReleaseResources { + private static readonly subscribedMethods: Set = new Set(["initHostPr,ovider", "connect", "notifyConnectionChanged", "query"]); + + protected _hostListProviderService: HostListProviderService | undefined; + protected pluginService: PluginService; + protected readonly _properties: Map; + protected readerHostInfo?: HostInfo = undefined; + protected writerHostInfo?: HostInfo = undefined; + protected isReaderClientFromInternalPool: boolean = false; + protected isWriterClientFromInternalPool: boolean = false; + + protected writerTargetClient: ClientWrapper | undefined; + protected readerCacheItem: CacheItem; + protected readonly readerSelectorStrategy: string = ""; + + private _inReadWriteSplit = false; + + protected constructor(pluginService: PluginService, properties: Map) { + super(); + this.pluginService = pluginService; + this._properties = properties; + this.readerSelectorStrategy = WrapperProperties.READER_HOST_SELECTOR_STRATEGY.get(properties); + } + + override getSubscribedMethods(): Set { + return AbstractReadWriteSplittingPlugin.subscribedMethods; + } + + override initHostProvider( + hostInfo: HostInfo, + props: Map, + hostListProviderService: HostListProviderService, + initHostProviderFunc: () => void + ) { + this._hostListProviderService = hostListProviderService; + initHostProviderFunc(); + } + + override notifyConnectionChanged(changes: Set): Promise { + try { + this.updateInternalClientInfo(); + } catch (e) { + // pass + } + if (this._inReadWriteSplit) { + return Promise.resolve(OldConnectionSuggestionAction.PRESERVE); + } + return Promise.resolve(OldConnectionSuggestionAction.NO_OPINION); + } + + updateInternalClientInfo(): void { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + const currentHost = this.pluginService.getCurrentHostInfo(); + if (currentHost === null || currentTargetClient === null) { + return; + } + + if (this.shouldUpdateWriterClient(currentTargetClient, currentHost)) { + this.setWriterClient(currentTargetClient, currentHost); + } else if (this.shouldUpdateReaderClient(currentTargetClient, currentHost)) { + this.setReaderClient(currentTargetClient, currentHost); + } + } + + setWriterClient(writerTargetClient: ClientWrapper | undefined, writerHostInfo: HostInfo): void { + this.writerTargetClient = writerTargetClient; + this.writerHostInfo = writerHostInfo; + logger.debug(Messages.get("ReadWriteSplittingPlugin.setWriterClient", writerHostInfo.getHostAndPort())); + } + + async setReaderClient(readerTargetClient: ClientWrapper | undefined, readerHost: HostInfo): Promise { + await this.closeReaderClientIfIdle(); + this.readerCacheItem = new CacheItem(readerTargetClient, this.getKeepAliveTimeout(this.isReaderClientFromInternalPool)); + this.readerHostInfo = readerHost; + logger.debug(Messages.get("ReadWriteSplittingPlugin.setReaderClient", readerHost.getHostAndPort())); + } + + async switchClientIfRequired(readOnly: boolean) { + const currentClient = this.pluginService.getCurrentClient(); + if (!(await currentClient.isValid())) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyOnClosedClient", currentClient.targetClient?.id ?? "undefined client")); + } + + await this.refreshAndStoreTopology(currentClient.targetClient); + + const currentHost = this.pluginService.getCurrentHostInfo(); + if (currentHost == null) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.unavailableHostInfo")); + } else if (readOnly) { + if (!this.pluginService.isInTransaction() && currentHost.role != HostRole.READER) { + try { + await this.switchToReaderTargetClient(); + } catch (error: any) { + if (!(await currentClient.isValid())) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", error.message)); + } + logger.warn(Messages.get("ReadWriteSplittingPlugin.fallbackToWriter", currentHost.url)); + } + } + } else if (currentHost.role != HostRole.WRITER) { + if (this.pluginService.isInTransaction()) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction")); + } + try { + await this.switchToWriterTargetClient(); + } catch (error: any) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToWriter", error.message)); + } + } + } + + override async execute(methodName: string, executeFunc: () => Promise, methodArgs: any): Promise { + const statement = SqlMethodUtils.parseMethodArgs(methodArgs, this.pluginService.getDriverDialect()); + const statements = SqlMethodUtils.parseMultiStatementQueries(statement); + + const updateReadOnly: boolean | undefined = SqlMethodUtils.doesSetReadOnly(statements, this.pluginService.getDialect()); + if (updateReadOnly !== undefined) { + try { + await this.switchClientIfRequired(updateReadOnly); + } catch (error) { + await this.closeIdleClients(); + throw error; + } + } + + try { + return await executeFunc(); + } catch (error: any) { + if (error instanceof FailoverError) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.failoverErrorWhileExecutingCommand", methodName)); + await this.closeIdleClients(); + } else { + logger.debug(Messages.get("ReadWriteSplittingPlugin.errorWhileExecutingCommand", methodName, error.message)); + } + + throw error; + } + } + + async getNewWriterClient(writerHost: HostInfo) { + const props = new Map(this._properties); + props.set(WrapperProperties.HOST.name, writerHost.host); + try { + const copyProps = new Map(props); + WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.set(copyProps, false); + const targetClient = await this.pluginService.connect(writerHost, copyProps, this); + this.isWriterClientFromInternalPool = targetClient instanceof PoolClientWrapper; + this.setWriterClient(targetClient, writerHost); + await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); + } catch (any) { + logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToWriter", writerHost.url)); + } + } + + async switchCurrentTargetClientTo(newTargetClient: ClientWrapper | undefined, newClientHost: HostInfo | undefined) { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + + if (currentTargetClient === newTargetClient) { + return; + } + if (newClientHost && newTargetClient) { + try { + await this.pluginService.setCurrentClient(newTargetClient, newClientHost); + logger.debug(Messages.get("ReadWriteSplittingPlugin.settingCurrentClient", newTargetClient.id, newClientHost.url)); + } catch (error) { + // pass + } + } + } + + async switchToWriterTargetClient() { + const currentHost = this.pluginService.getCurrentHostInfo(); + const currentClient = this.pluginService.getCurrentClient(); + if (this.isWriter(currentHost) && (await this.isTargetClientUsable(currentClient.targetClient))) { + // Already connected to writer. + return; + } + this._inReadWriteSplit = true; + if (!(await this.isTargetClientUsable(this.writerTargetClient))) { + await this.initializeWriterClient(); + } else { + await this.switchCurrentTargetClientTo(this.writerTargetClient, this.writerHostInfo); + } + + if (this.isReaderClientFromInternalPool) { + await this.closeReaderClientIfIdle(); + } + + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", this.writerHostInfo.getHostAndPort())); + } + + async switchToReaderTargetClient() { + const currentHost = this.pluginService.getCurrentHostInfo(); + const currentClient = this.pluginService.getCurrentClient(); + if (currentHost !== null && currentHost?.role === HostRole.READER && currentClient) { + // Already connected to reader. + return; + } + + await this.closeReaderIfNecessary(); + + this._inReadWriteSplit = true; + if (this.readerCacheItem == null || !(await this.isTargetClientUsable(this.readerCacheItem.get()))) { + await this.initializeReaderClient(); + } else { + try { + await this.switchCurrentTargetClientTo(this.readerCacheItem.get(), this.readerHostInfo); + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this.readerHostInfo.getHostAndPort())); + } catch (error: any) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", this.readerHostInfo.getHostAndPort(), error.message)); + await this.closeReaderClientIfIdle(); + await this.initializeReaderClient(); + } + } + if (this.isWriterClientFromInternalPool) { + await this.closeWriterClientIfIdle(); + } + } + + async isTargetClientUsable(targetClient: ClientWrapper | undefined): Promise { + if (!targetClient) { + return Promise.resolve(false); + } + return await this.pluginService.isClientValid(targetClient); + } + + async closeWriterClientIfIdle() { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + try { + if ( + this.writerTargetClient != null && + this.writerTargetClient !== currentTargetClient && + (await this.isTargetClientUsable(this.writerTargetClient)) + ) { + await this.pluginService.abortTargetClient(this.writerTargetClient); + } + } catch (error) { + // ignore + } + this.writerTargetClient = undefined; + } + + async closeReaderClientIfIdle(): Promise { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + const readerClient = this.readerCacheItem?.get(true); + try { + if (readerClient != null && readerClient !== currentTargetClient && (await this.isTargetClientUsable(readerClient))) { + await this.pluginService.abortTargetClient(readerClient); + } + } catch (error) { + // ignore + } + this.readerCacheItem = null; + this.readerHostInfo = undefined; + } + + async closeIdleClients() { + logger.debug(Messages.get("ReadWriteSplittingPlugin.closingInternalClients")); + await this.closeReaderClientIfIdle(); + await this.closeWriterClientIfIdle(); + } + + protected getKeepAliveTimeout(isPooledClient: boolean): bigint { + if (isPooledClient) { + return BigInt(0); + } + const keepAliveMs = WrapperProperties.CACHED_READER_KEEP_ALIVE_TIMEOUT.get(this._properties); + + return keepAliveMs > 0 ? getTimeInNanos() + convertMsToNanos(keepAliveMs) : BigInt(0); + } + + async releaseResources() { + await this.closeIdleClients(); + } + + protected abstract shouldUpdateReaderClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean; + protected abstract shouldUpdateWriterClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean; + protected abstract isWriter(currentHost: HostInfo): boolean; + protected abstract isReader(currentHost: HostInfo): boolean; + protected abstract refreshAndStoreTopology(currentClient: ClientWrapper | undefined): Promise; + protected abstract initializeWriterClient(): Promise; + protected abstract initializeReaderClient(): Promise; + protected abstract closeReaderIfNecessary(): Promise; +} diff --git a/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts b/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts index c48fd009..bd232540 100644 --- a/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts +++ b/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts @@ -14,38 +14,21 @@ * limitations under the License. */ -import { AbstractConnectionPlugin } from "../../abstract_connection_plugin"; -import { - HostInfo, - FailoverError, - HostRole -} from "../../index"; +import { HostInfo, HostRole } from "../../index"; import { PluginService } from "../../plugin_service"; import { HostListProviderService } from "../../host_list_provider_service"; -import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; -import { HostChangeOptions } from "../../host_change_options"; -import { WrapperProperties } from "../../wrapper_property"; import { Messages } from "../../utils/messages"; -import { logger } from "../../../logutils"; -import { SqlMethodUtils } from "../../utils/sql_method_utils"; import { ClientWrapper } from "../../client_wrapper"; -import { getWriter, logAndThrowError } from "../../utils/utils"; +import { containsHostAndPort, getWriter, logAndThrowError, logTopology } from "../../utils/utils"; import { CanReleaseResources } from "../../can_release_resources"; +import { AbstractReadWriteSplittingPlugin } from "./abstract_read_write_splitting_plugin"; +import { WrapperProperties } from "../../wrapper_property"; import { PoolClientWrapper } from "../../pool_client_wrapper"; +import { logger } from "../../../logutils"; +import { CacheItem } from "../../utils/cache_item"; -export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implements CanReleaseResources { - private static readonly subscribedMethods: Set = new Set(["initHostProvider", "connect", "notifyConnectionChanged", "query"]); - private readonly readerSelectorStrategy: string = ""; - - private _hostListProviderService: HostListProviderService | undefined; - private pluginService: PluginService; - private readonly _properties: Map; - private _readerHostInfo?: HostInfo = undefined; - private isReaderClientFromInternalPool: boolean = false; - private isWriterClientFromInternalPool: boolean = false; - private _inReadWriteSplit = false; - writerTargetClient: ClientWrapper | undefined; - readerTargetClient: ClientWrapper | undefined; +export class ReadWriteSplittingPlugin extends AbstractReadWriteSplittingPlugin implements CanReleaseResources { + protected hosts: HostInfo[] = []; constructor(pluginService: PluginService, properties: Map); constructor( @@ -62,53 +45,10 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement writerClient?: ClientWrapper, readerClient?: ClientWrapper ) { - super(); - this.pluginService = pluginService; - this._properties = properties; - this.readerSelectorStrategy = WrapperProperties.READER_HOST_SELECTOR_STRATEGY.get(properties); + super(pluginService, properties); this._hostListProviderService = hostListProviderService; this.writerTargetClient = writerClient; - this.readerTargetClient = readerClient; - } - - override getSubscribedMethods(): Set { - return ReadWriteSplittingPlugin.subscribedMethods; - } - - override initHostProvider( - hostInfo: HostInfo, - props: Map, - hostListProviderService: HostListProviderService, - initHostProviderFunc: () => void - ) { - this._hostListProviderService = hostListProviderService; - initHostProviderFunc(); - } - - override notifyConnectionChanged(changes: Set): Promise { - try { - this.updateInternalClientInfo(); - } catch (e) { - // pass - } - if (this._inReadWriteSplit) { - return Promise.resolve(OldConnectionSuggestionAction.PRESERVE); - } - return Promise.resolve(OldConnectionSuggestionAction.NO_OPINION); - } - - updateInternalClientInfo(): void { - const currentTargetClient = this.pluginService.getCurrentClient().targetClient; - const currentHost = this.pluginService.getCurrentHostInfo(); - if (currentHost === null || currentTargetClient === null) { - return; - } - - if (currentHost.role === HostRole.WRITER) { - this.setWriterClient(currentTargetClient, currentHost); - } else { - this.setReaderClient(currentTargetClient, currentHost); - } + this.readerCacheItem = new CacheItem(readerClient, BigInt(0)); } override async connect( @@ -143,145 +83,73 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement return result; } - override async execute(methodName: string, executeFunc: () => Promise, methodArgs: any): Promise { - const statement = SqlMethodUtils.parseMethodArgs(methodArgs, this.pluginService.getDriverDialect()); - const statements = SqlMethodUtils.parseMultiStatementQueries(statement); - - const updateReadOnly: boolean | undefined = SqlMethodUtils.doesSetReadOnly(statements, this.pluginService.getDialect()); - if (updateReadOnly !== undefined) { - try { - await this.switchClientIfRequired(updateReadOnly); - } catch (error) { - await this.closeIdleClients(); - throw error; - } - } - - try { - return await executeFunc(); - } catch (error: any) { - if (error instanceof FailoverError) { - logger.debug(Messages.get("ReadWriteSplittingPlugin.failoverErrorWhileExecutingCommand", methodName)); - await this.closeIdleClients(); - } else { - logger.debug(Messages.get("ReadWriteSplittingPlugin.errorWhileExecutingCommand", methodName, error.message)); - } - - throw error; - } - } - - setWriterClient(writerTargetClient: ClientWrapper | undefined, writerHostInfo: HostInfo): void { - this.writerTargetClient = writerTargetClient; - logger.debug(Messages.get("ReadWriteSplittingPlugin.setWriterClient", writerHostInfo.url)); + protected isWriter(currentHost: HostInfo): boolean { + return HostRole.WRITER === currentHost.role; } - setReaderClient(readerTargetClient: ClientWrapper | undefined, readerHost: HostInfo): void { - this.readerTargetClient = readerTargetClient; - this._readerHostInfo = readerHost; - logger.debug(Messages.get("ReadWriteSplittingPlugin.setReaderClient", readerHost.url)); + protected isReader(currentHost: HostInfo): boolean { + return HostRole.READER === currentHost.role; } - async getNewWriterClient(writerHost: HostInfo) { - const props = new Map(this._properties); - props.set(WrapperProperties.HOST.name, writerHost.host); - try { - const copyProps = new Map(props); - WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.set(copyProps, false); - const targetClient = await this.pluginService.connect(writerHost, copyProps, this); - this.isWriterClientFromInternalPool = targetClient instanceof PoolClientWrapper; - this.setWriterClient(targetClient, writerHost); - await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); - } catch (any) { - logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToWriter", writerHost.url)); - } - } - - async switchClientIfRequired(readOnly: boolean) { - const currentClient = this.pluginService.getCurrentClient(); - if (!(await currentClient.isValid())) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyOnClosedClient", currentClient.targetClient?.id ?? "undefined client")); - } + protected async refreshAndStoreTopology(currentClient: ClientWrapper | undefined): Promise { try { await this.pluginService.refreshHostList(); } catch { // pass } - const hosts: HostInfo[] = this.pluginService.getHosts(); - if (hosts == null || hosts.length === 0) { + this.hosts = this.pluginService.getHosts(); + if (this.hosts == null || this.hosts.length === 0) { logAndThrowError(Messages.get("ReadWriteSplittingPlugin.emptyHostList")); } - const currentHost = this.pluginService.getCurrentHostInfo(); - if (currentHost == null) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.unavailableHostInfo")); - } else if (readOnly) { - if (!this.pluginService.isInTransaction() && currentHost.role != HostRole.READER) { - try { - await this.switchToReaderTargetClient(hosts); - } catch (error: any) { - if (!(await currentClient.isValid())) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", error.message)); - } - logger.warn(Messages.get("ReadWriteSplittingPlugin.fallbackToWriter", currentHost.url)); - } - } - } else if (currentHost.role != HostRole.WRITER) { - if (this.pluginService.isInTransaction()) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction")); - } - try { - await this.switchToWriterTargetClient(hosts); - } catch (error: any) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToWriter", error.message)); - } - } - } - async switchCurrentTargetClientTo(newTargetClient: ClientWrapper | undefined, newClientHost: HostInfo | undefined) { - const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + this.writerHostInfo = getWriter(this.hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); + } - if (currentTargetClient === newTargetClient) { - return; - } - if (newClientHost && newTargetClient) { - try { - await this.pluginService.setCurrentClient(newTargetClient, newClientHost); - logger.debug(Messages.get("ReadWriteSplittingPlugin.settingCurrentClient", newTargetClient.id, newClientHost.url)); - } catch (error) { - // pass - } - } + override async initializeWriterClient(): Promise { + const client: ClientWrapper = await this.pluginService.connect(this.writerHostInfo, this._properties, this); + this.isWriterClientFromInternalPool = this.pluginService.isPooledClient(); + this.setWriterClient(client, this.writerHostInfo); + await this.switchCurrentTargetClientTo(this.writerTargetClient, this.writerHostInfo); } - async initializeReaderClient(hosts: HostInfo[]) { - if (hosts.length === 1) { - const writerHost = getWriter(hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); + override async initializeReaderClient() { + if (this.hosts.length === 1) { + const writerHost = getWriter(this.hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); if (writerHost) { if (!(await this.isTargetClientUsable(this.writerTargetClient))) { await this.getNewWriterClient(writerHost); } - logger.warn(Messages.get("ReadWriteSplittingPlugin.noReadersFound", writerHost.url)); + logger.warn(Messages.get("ReadWriteSplittingPlugin.noReadersFound", writerHost.getHostAndPort())); } } else { await this.getNewReaderClient(); + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this.readerHostInfo.getHostAndPort())); } } - async getNewReaderClient() { + override shouldUpdateReaderClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean { + return this.isReader(host); + } + + override shouldUpdateWriterClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean { + return this.isWriter(host); + } + + protected async getNewReaderClient() { let targetClient = undefined; let readerHost: HostInfo | undefined = undefined; - const connectAttempts = this.pluginService.getHosts().length; + + const hostCandidates: HostInfo[] = this.getReaderHostCandidates(); + + const connectAttempts = hostCandidates.length * 2; for (let i = 0; i < connectAttempts; i++) { const host = this.pluginService.getHostInfoByStrategy(HostRole.READER, this.readerSelectorStrategy); if (host) { - const props = new Map(this._properties); - props.set(WrapperProperties.HOST.name, host.host); - try { - const copyProps = new Map(props); - WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.set(copyProps, false); + const copyProps = new Map(this._properties); + copyProps.set(WrapperProperties.HOST.name, host.host); targetClient = await this.pluginService.connect(host, copyProps, this); this.isReaderClientFromInternalPool = targetClient instanceof PoolClientWrapper; readerHost = host; @@ -296,98 +164,18 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement return; } logger.debug(Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader", readerHost.url)); - this.setReaderClient(targetClient, readerHost); - await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); + await this.setReaderClient(targetClient, readerHost); + await this.switchCurrentTargetClientTo(this.readerCacheItem.get(), this.readerHostInfo); } - async switchToWriterTargetClient(hosts: HostInfo[]) { - const currentHost = this.pluginService.getCurrentHostInfo(); - const currentClient = this.pluginService.getCurrentClient(); - if (currentHost !== null && currentHost?.role === HostRole.WRITER && (await currentClient.isValid())) { - return; + protected async closeReaderIfNecessary(): Promise { + if (this.readerHostInfo != null && containsHostAndPort(this.hosts, this.readerHostInfo.getHostAndPort())) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.previousReaderNotAllowed", this.readerHostInfo.toString(), logTopology(this.hosts, ""))); } - this._inReadWriteSplit = true; - const writerHost = getWriter(hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); - if (!writerHost) { - return; - } - if (!(await this.isTargetClientUsable(this.writerTargetClient))) { - await this.getNewWriterClient(writerHost); - } else if (this.writerTargetClient) { - await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); - } - - logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", writerHost.url)); - if (this.isReaderClientFromInternalPool) { - await this.closeTargetClientIfIdle(this.readerTargetClient); - } - } - - async switchToReaderTargetClient(hosts: HostInfo[]) { - const currentHost = this.pluginService.getCurrentHostInfo(); - const currentClient = this.pluginService.getCurrentClient(); - if (currentHost !== null && currentHost?.role === HostRole.READER && currentClient) { - return; - } - - if (this._readerHostInfo && !hosts.some((hostInfo: HostInfo) => hostInfo.host === this._readerHostInfo?.host)) { - // The old reader cannot be used anymore because it is no longer in the list of allowed hosts. - await this.closeTargetClientIfIdle(this.readerTargetClient); - } - - this._inReadWriteSplit = true; - if (!(await this.isTargetClientUsable(this.readerTargetClient))) { - await this.initializeReaderClient(hosts); - } else if (this.readerTargetClient != null && this._readerHostInfo != null) { - try { - await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); - logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this._readerHostInfo.url)); - } catch (error: any) { - logger.debug(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", this._readerHostInfo.url)); - await this.pluginService.abortTargetClient(this.readerTargetClient); - this.readerTargetClient = undefined; - this._readerHostInfo = undefined; - await this.initializeReaderClient(hosts); - } - } - if (this.isWriterClientFromInternalPool) { - await this.closeTargetClientIfIdle(this.writerTargetClient); - } - } - - async isTargetClientUsable(targetClient: ClientWrapper | undefined): Promise { - if (!targetClient) { - return Promise.resolve(false); - } - return await this.pluginService.isClientValid(targetClient); - } - - async closeTargetClientIfIdle(internalTargetClient: ClientWrapper | undefined) { - const currentTargetClient = this.pluginService.getCurrentClient().targetClient; - try { - if (internalTargetClient != null && internalTargetClient !== currentTargetClient && (await this.isTargetClientUsable(internalTargetClient))) { - await this.pluginService.abortTargetClient(internalTargetClient); - - if (internalTargetClient === this.writerTargetClient) { - this.writerTargetClient = undefined; - } - if (internalTargetClient === this.readerTargetClient) { - this.readerTargetClient = undefined; - this._readerHostInfo = undefined; - } - } - } catch (error) { - // ignore - } - } - - async closeIdleClients() { - logger.debug(Messages.get("ReadWriteSplittingPlugin.closingInternalClients")); - await this.closeTargetClientIfIdle(this.readerTargetClient); - await this.closeTargetClientIfIdle(this.writerTargetClient); + await this.closeWriterClientIfIdle(); } - async releaseResources() { - await this.closeIdleClients(); + protected getReaderHostCandidates(): HostInfo[] | undefined { + return this.pluginService.getHosts(); } } diff --git a/common/lib/utils/cache_item.ts b/common/lib/utils/cache_item.ts new file mode 100644 index 00000000..1abf2b76 --- /dev/null +++ b/common/lib/utils/cache_item.ts @@ -0,0 +1,38 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export class CacheItem { + constructor( + readonly item: T, + readonly expirationTime: bigint + ) {} + + isExpired(): boolean { + if (this.expirationTime <= 0) { + // No expiration time. + return false; + } + return process.hrtime.bigint() > this.expirationTime; + } + + get(returnExpired: boolean = false): T | null { + return this.isExpired() && !returnExpired ? null : this.item; + } + + toString(): string { + return `CacheItem [item=${this.item}, expirationTime=${this.expirationTime}]`; + } +} diff --git a/common/lib/utils/messages.ts b/common/lib/utils/messages.ts index 5c0af393..fc145146 100644 --- a/common/lib/utils/messages.ts +++ b/common/lib/utils/messages.ts @@ -116,7 +116,7 @@ const MESSAGES: Record = { "AuroraStaleDnsHelper.staleDnsDetected": "Stale DNS data detected. Opening a connection to '%s'.", "ReadWriteSplittingPlugin.setReadOnlyOnClosedClient": "setReadOnly cannot be called on a closed client '%s'.", "ReadWriteSplittingPlugin.errorSwitchingToCachedReader": - "An error occurred while trying to switch to a cached reader client: '%s'. The driver will attempt to establish a new reader client.", + "An error occurred while trying to switch to a cached reader client: '%s'. Error message: '%s'. The driver will attempt to establish a new reader client.", "ReadWriteSplittingPlugin.errorSwitchingToReader": "An error occurred while trying to switch to a reader client: '%s'.", "ReadWriteSplittingPlugin.errorSwitchingToWriter": "An error occurred while trying to switch to a writer client: '%s'.", "ReadWriteSplittingPlugin.closingInternalClients": "Closing all internal clients except for the current one.", @@ -137,6 +137,7 @@ const MESSAGES: Record = { "ReadWriteSplittingPlugin.failoverErrorWhileExecutingCommand": "Detected a failover error while executing a command: '%s'", "ReadWriteSplittingPlugin.noReadersAvailable": "The plugin was unable to establish a reader client to any reader instance.", "ReadWriteSplittingPlugin.successfullyConnectedToReader": "Successfully connected to a new reader host: '%s'", + "ReadWriteSplittingPlugin.previousReaderNotAllowed": "The previous reader connection cannot be used because it is no longer in the list of allowed hosts. Previous reader: %s. Allowed hosts: %s", "ReadWriteSplittingPlugin.failedToConnectToReader": "Failed to connect to reader host: '%s'", "ReadWriteSplittingPlugin.unsupportedHostSelectorStrategy": "Unsupported host selection strategy '%s' specified in plugin configuration parameter 'readerHostSelectorStrategy'. Please visit the Read/Write Splitting Plugin documentation for all supported strategies.", diff --git a/common/lib/utils/utils.ts b/common/lib/utils/utils.ts index 32dd0a47..244967c8 100644 --- a/common/lib/utils/utils.ts +++ b/common/lib/utils/utils.ts @@ -117,6 +117,14 @@ export function isDialectTopologyAware(dialect: any): dialect is TopologyAwareDa return dialect; } +export function containsHostAndPort(hosts: HostInfo[] | null | undefined, hostAndPort: string): boolean { + if (!hosts || hosts.length === 0) { + return false; + } + + return hosts.some((host) => host.hostAndPort === hostAndPort); +} + export class Pair { private readonly _left: K; private readonly _right: V; diff --git a/common/lib/wrapper_property.ts b/common/lib/wrapper_property.ts index 215714c7..3718693f 100644 --- a/common/lib/wrapper_property.ts +++ b/common/lib/wrapper_property.ts @@ -473,6 +473,13 @@ export class WrapperProperties { false ); + static readonly CACHED_READER_KEEP_ALIVE_TIMEOUT = new WrapperProperty( + "cachedReaderKeepAliveTimeoutMs", + "The time in milliseconds to keep a reader connection alive in the cache. " + + "Default value 0 means the Wrapper will keep reusing the same cached reader connection.", + 0 + ); + private static readonly PREFIXES = [ WrapperProperties.MONITORING_PROPERTY_PREFIX, ClusterTopologyMonitorImpl.MONITORING_PROPERTY_PREFIX, diff --git a/tests/unit/read_write_splitting.test.ts b/tests/unit/read_write_splitting.test.ts index aba156d7..3a707b35 100644 --- a/tests/unit/read_write_splitting.test.ts +++ b/tests/unit/read_write_splitting.test.ts @@ -82,6 +82,16 @@ const mockExecuteFuncThrowsFailoverSuccessError = jest.fn(() => { throw new FailoverSuccessError("test"); }); +class TestReadWriteSplitting extends ReadWriteSplittingPlugin { + getWriterTargetClient(): ClientWrapper | undefined { + return this.writerTargetClient; + } + + getReaderTargetClient(): ClientWrapper | undefined { + return this.readerCacheItem?.get(); + } +} + describe("reader write splitting test", () => { beforeEach(() => { when(mockPluginService.getHostListProvider()).thenReturn(instance(mockHostListProvider)); @@ -116,7 +126,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockReaderWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -127,7 +137,7 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.refreshHostList()).once(); verify(mockPluginService.setCurrentClient(mockReaderWrapper, readerHost1)).once(); - expect(target.readerTargetClient).toBe(mockReaderWrapper); + expect(target.getReaderTargetClient()).toBe(mockReaderWrapper); }); it("test set read only false", async () => { @@ -140,7 +150,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockWriterWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -150,7 +160,7 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(false); verify(mockPluginService.setCurrentClient(mockWriterWrapper, writerHost)).once(); - expect(target.writerTargetClient).toEqual(mockWriterWrapper); + expect(target.getWriterTargetClient()).toEqual(mockWriterWrapper); }); it("test set read only true already on reader", async () => { @@ -164,7 +174,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockReaderWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -174,8 +184,8 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(mockReaderWrapper); - expect(target.writerTargetClient).toEqual(undefined); + expect(target.getReaderTargetClient()).toEqual(mockReaderWrapper); + expect(target.getWriterTargetClient()).toEqual(undefined); }); it("test set read only false already on reader", async () => { @@ -188,7 +198,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockReaderWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -198,8 +208,8 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(false); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.writerTargetClient).toEqual(mockWriterWrapper); - expect(target.readerTargetClient).toEqual(undefined); + expect(target.getWriterTargetClient()).toEqual(mockWriterWrapper); + expect(target.getReaderTargetClient()).toEqual(undefined); }); it("test set read only true one host", async () => { @@ -214,7 +224,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(anything(), anything(), anything())).thenReturn(Promise.resolve(mockWriterWrapper)); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -225,8 +235,8 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(undefined); - expect(target.writerTargetClient).toEqual(mockWriterWrapper); + expect(target.getReaderTargetClient()).toEqual(undefined); + expect(target.getWriterTargetClient()).toEqual(mockWriterWrapper); }); it("test connect incorrect host role", async () => { @@ -242,7 +252,7 @@ describe("reader write splitting test", () => { when(mockHostListProviderService.isStaticHostListProvider()).thenReturn(false); when(mockHostListProviderService.getHostListProvider()).thenReturn(mockHostListProviderInstance); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -264,7 +274,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); when(await mockPluginService.connect(writerHost, properties)).thenReject(); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -287,7 +297,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(readerHost1 || readerHost2, properties)).thenReject(); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -297,7 +307,7 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(undefined); + expect(target.getReaderTargetClient()).toEqual(undefined); }); it("test set read only on closed connection", async () => { @@ -308,7 +318,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentClient()).thenReturn(instance(mockClosedWriterClient)); when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -318,7 +328,7 @@ describe("reader write splitting test", () => { await expect(async () => await target.switchClientIfRequired(true)).rejects.toThrow(AwsWrapperError); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(undefined); + expect(target.getReaderTargetClient()).toEqual(undefined); }); it("test execute failover to new writer", async () => { @@ -330,7 +340,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentClient()).thenReturn(mockNewWriterClient); when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(await mockPluginService.isClientValid(mockWriterWrapper)).thenReturn(true); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -352,7 +362,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentClient()).thenReturn(mockWriterClient); when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -372,7 +382,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.acceptsStrategy(anything(), anything())).thenReturn(true); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -394,7 +404,7 @@ describe("reader write splitting test", () => { when(mockPluginService.acceptsStrategy(anything(), anything())).thenReturn(true); when(mockHostListProviderService.isStaticHostListProvider()).thenReturn(false); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -431,7 +441,7 @@ describe("reader write splitting test", () => { const spyTarget = instance(target); await spyTarget.switchClientIfRequired(true); await spyTarget.switchClientIfRequired(false); - verify(target.closeTargetClientIfIdle(poolClientWrapper)).once(); + verify(target.closeWriterClientIfIdle()).once(); }); it("test pooled writer connection after set read only", async () => { @@ -480,6 +490,6 @@ describe("reader write splitting test", () => { await spyTarget.switchClientIfRequired(false); await spyTarget.switchClientIfRequired(true); - verify(target.closeTargetClientIfIdle(poolClientWrapper)).twice(); + verify(target.closeWriterClientIfIdle()).twice(); }); });