Skip to content

Commit 0a34412

Browse files
committed
chore: refactor read write splitting
1 parent f913149 commit 0a34412

14 files changed

Lines changed: 523 additions & 299 deletions

.github/workflows/main.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ on:
88
pull_request:
99
branches:
1010
- "*"
11+
- dev/v3
1112

1213
permissions:
1314
contents: read

common/lib/connection_info.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { ClientWrapper } from "./client_wrapper";
18+
19+
export class ConnectionInfo {
20+
private readonly _client: ClientWrapper;
21+
private readonly _isPooled: boolean;
22+
23+
constructor(client: ClientWrapper, isPooled: boolean) {
24+
this._client = client;
25+
this._isPooled = isPooled;
26+
}
27+
28+
get client(): ClientWrapper {
29+
return this._client;
30+
}
31+
32+
get isPooled(): boolean {
33+
return this._isPooled;
34+
}
35+
}

common/lib/connection_provider.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
import { HostRole } from "./host_role";
1818
import { HostInfo } from "./host_info";
1919
import { PluginService } from "./plugin_service";
20-
import { ClientWrapper } from "./client_wrapper";
20+
import { ConnectionInfo } from "./connection_info";
2121

2222
export interface ConnectionProvider {
23-
connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper>;
23+
connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ConnectionInfo>;
2424
acceptsUrl(hostInfo: HostInfo, props: Map<string, any>): boolean;
2525
acceptsStrategy(role: HostRole, strategy: string): boolean;
2626
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo;

common/lib/driver_connection_provider.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import { logger } from "../logutils";
3030
import { ClientWrapper } from "./client_wrapper";
3131
import { RoundRobinHostSelector } from "./round_robin_host_selector";
3232
import { DriverDialect } from "./driver_dialect/driver_dialect";
33+
import { ConnectionInfo } from "./connection_info";
3334

3435
export class DriverConnectionProvider implements ConnectionProvider {
3536
private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
@@ -46,7 +47,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
4647
return true;
4748
}
4849

49-
async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper> {
50+
async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ConnectionInfo> {
5051
let resultTargetClient;
5152
const resultProps = new Map(props);
5253
resultProps.set(WrapperProperties.HOST.name, hostInfo.host);
@@ -92,7 +93,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
9293
resultTargetClient = driverDialect.connect(hostInfo, resultProps);
9394
}
9495
pluginService.attachErrorListener(resultTargetClient);
95-
return resultTargetClient;
96+
return new ConnectionInfo(resultTargetClient, false);
9697
}
9798

9899
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {

common/lib/internal_pooled_connection_provider.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import { LeastConnectionsHostSelector } from "./least_connections_host_selector"
3939
import { PoolClientWrapper } from "./pool_client_wrapper";
4040
import { logger } from "../logutils";
4141
import { SlidingExpirationCacheWithCleanupTask } from "./utils/sliding_expiration_cache_with_cleanup_task";
42+
import { ConnectionInfo } from "./connection_info";
4243

4344
export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
4445
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
@@ -79,7 +80,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
7980
return RdsUrlType.RDS_INSTANCE === urlType;
8081
}
8182

82-
async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper> {
83+
async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ConnectionInfo> {
8384
const resultProps = new Map(props);
8485
resultProps.set(WrapperProperties.HOST.name, hostInfo.host);
8586
if (hostInfo.isPortSpecified()) {
@@ -122,7 +123,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
122123

123124
const poolClient = await this.getPoolConnection(connectionHostInfo, props);
124125
pluginService.attachErrorListener(poolClient);
125-
return poolClient;
126+
return new ConnectionInfo(poolClient, true);
126127
}
127128

128129
async getPoolConnection(hostInfo: HostInfo, props: Map<string, string>) {

common/lib/plugin_service.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ export interface PluginService extends ErrorHandler {
150150
getStatus<T>(clazz: any, key: string): T;
151151

152152
isPluginInUse(plugin: any): boolean;
153+
154+
isPooledClient(): boolean;
155+
156+
setIsPooledClient(isPooledClient: boolean): void;
153157
}
154158

155159
export class PluginServiceImpl implements PluginService, HostListProviderService {
@@ -172,6 +176,8 @@ export class PluginServiceImpl implements PluginService, HostListProviderService
172176
protected static readonly statusesExpiringCache: CacheMap<string, any> = new CacheMap();
173177
protected static readonly DEFAULT_STATUS_CACHE_EXPIRE_NANO: number = 3_600_000_000_000; // 60 minutes
174178

179+
protected pooledClient: boolean | null = null;
180+
175181
constructor(
176182
container: PluginServiceManagerContainer,
177183
client: AwsClient,
@@ -782,4 +788,12 @@ export class PluginServiceImpl implements PluginService, HostListProviderService
782788
isPluginInUse(plugin: any) {
783789
return this.pluginServiceManagerContainer.pluginManager!.isPluginInUse(plugin);
784790
}
791+
792+
isPooledClient(): boolean | null {
793+
return this.pooledClient;
794+
}
795+
796+
setIsPooledClient(isPooledClient: boolean): void {
797+
this.pooledClient = isPooledClient;
798+
}
785799
}

common/lib/plugins/default_plugin.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { AwsWrapperError } from "../utils/errors";
2929
import { HostAvailability } from "../host_availability/host_availability";
3030
import { ClientWrapper } from "../client_wrapper";
3131
import { TelemetryTraceLevel } from "../utils/telemetry/telemetry_trace_level";
32+
import { ConnectionInfo } from "../connection_info";
3233

3334
export class DefaultPlugin extends AbstractConnectionPlugin {
3435
id: string = uniqueId("_defaultPlugin");
@@ -79,10 +80,11 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
7980
TelemetryTraceLevel.NESTED
8081
);
8182

82-
const result = await telemetryContext.start(async () => await connProvider.connect(hostInfo, this.pluginService, props));
83+
const result: ConnectionInfo = await telemetryContext.start(async () => await connProvider.connect(hostInfo, this.pluginService, props));
8384
this.pluginService.setAvailability(hostInfo.allAliases, HostAvailability.AVAILABLE);
84-
await this.pluginService.updateDialect(result);
85-
return result;
85+
this.pluginService.setIsPooledClient(result.isPooled);
86+
await this.pluginService.updateDialect(result.client);
87+
return result.client;
8688
}
8789

8890
override async execute<Type>(methodName: string, methodFunc: () => Promise<Type>): Promise<Type> {

0 commit comments

Comments
 (0)