Skip to content

Commit fd2d0ef

Browse files
committed
chore: add logs to tests
1 parent 3cb50e7 commit fd2d0ef

File tree

4 files changed

+45
-17
lines changed

4 files changed

+45
-17
lines changed

.github/workflows/integration_tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
fail-fast: false
2020
matrix:
2121
python-version: [ "3.11", "3.12", "3.13" ]
22-
environment: [ "mysql", "pg" ]
22+
environment: [ "pg" ]
2323

2424
steps:
2525
- name: 'Clone repository'
@@ -73,7 +73,7 @@ jobs:
7373
fail-fast: false
7474
matrix:
7575
python-version: [ "3.11", "3.12", "3.13" ]
76-
environment: [ "mysql", "pg" ]
76+
environment: [ "pg" ]
7777

7878
steps:
7979
- name: 'Clone repository'

aws_advanced_python_wrapper/read_write_splitting_plugin.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from copy import deepcopy
1818
from typing import TYPE_CHECKING, Any, Callable, Optional, Protocol, Set, Tuple
1919

20+
from aws_advanced_python_wrapper.sqlalchemy_driver_dialect import SqlAlchemyDriverDialect
21+
2022
if TYPE_CHECKING:
2123
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
2224
from aws_advanced_python_wrapper.host_list_provider import HostListProviderService
@@ -402,13 +404,20 @@ def _close_connection_if_idle(self, internal_conn: Optional[Connection]):
402404
current_conn = self._plugin_service.current_connection
403405
driver_dialect = self._plugin_service.driver_dialect
404406

407+
logger.debug(f"current connection during close if idle: {current_conn}: {self._plugin_service.current_host_info}")
405408
if internal_conn == current_conn:
406409
# Connection is in use, do not close
410+
logger.debug(f"current connection is in use. do not close")
407411
return
408412

409413
try:
410414
if self._is_connection_usable(internal_conn, driver_dialect):
411-
driver_dialect.execute(DbApiMethod.CONNECTION_CLOSE.method_name, lambda: internal_conn.close())
415+
if isinstance(driver_dialect, SqlAlchemyDriverDialect):
416+
logger.debug(f"Invalidating pooled connection {internal_conn}")
417+
driver_dialect.execute(DbApiMethod.CONNECTION_CLOSE.method_name, lambda: internal_conn.invalidate(soft=True))
418+
else:
419+
logger.debug(f"ReadWriteSplittingPlugin.ClosingIdleConnection: {internal_conn}")
420+
driver_dialect.execute(DbApiMethod.CONNECTION_CLOSE.method_name, lambda: internal_conn.close())
412421
except Exception:
413422
# Ignore exceptions during cleanup - connection might already be dead
414423
pass

tests/integration/container/test_custom_endpoint.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def rds_utils(self):
6161
return RdsTestUtility(region)
6262

6363
@pytest.fixture(scope='class')
64-
def props(self):
64+
def props_with_failover(self):
6565
p: Properties = Properties(
6666
{"plugins": "custom_endpoint,read_write_splitting,failover", "connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1"})
6767

@@ -77,6 +77,24 @@ def props(self):
7777

7878
return p
7979

80+
@pytest.fixture(scope='class')
81+
def props(self):
82+
p: Properties = Properties(
83+
{"plugins": "custom_endpoint,read_write_splitting", "connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1"})
84+
85+
features = TestEnvironment.get_current().get_features()
86+
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features \
87+
or TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in features:
88+
WrapperProperties.ENABLE_TELEMETRY.set(p, True)
89+
WrapperProperties.TELEMETRY_SUBMIT_TOPLEVEL.set(p, True)
90+
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features:
91+
WrapperProperties.TELEMETRY_TRACES_BACKEND.set(p, "XRAY")
92+
if TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in features:
93+
WrapperProperties.TELEMETRY_METRICS_BACKEND.set(p, "OTLP")
94+
95+
return p
96+
97+
8098
@pytest.fixture(scope='class', autouse=True)
8199
def setup_and_teardown(self):
82100
env_info = TestEnvironment.get_current().get_info()
@@ -221,13 +239,13 @@ def wait_until_endpoint_has_members(self, rds_client, expected_members: Set[str]
221239
duration_sec = (perf_counter_ns() - start_ns) / 1_000_000_000
222240
self.logger.debug(f"wait_until_endpoint_has_specified_members took {duration_sec} seconds.")
223241

224-
def test_custom_endpoint_failover(self, test_driver: TestDriver, conn_utils, props, rds_utils):
225-
props["failover_mode"] = "reader_or_writer"
242+
def test_custom_endpoint_failover(self, test_driver: TestDriver, conn_utils, props_with_failover, rds_utils):
243+
props_with_failover["failover_mode"] = "reader_or_writer"
226244

227245
target_driver_connect = DriverHelper.get_connect_func(test_driver)
228246
kwargs = conn_utils.get_connect_params()
229247
kwargs["host"] = self.endpoint_info["Endpoint"]
230-
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
248+
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props_with_failover)
231249

232250
endpoint_members = self.endpoint_info["StaticMembers"]
233251
instance_id = rds_utils.query_instance_id(conn)
@@ -281,7 +299,7 @@ def _setup_custom_endpoint_role(self, target_driver_connect, conn_kwargs, rds_ut
281299
self.logger.debug("Custom endpoint instance successfully set to role: " + host_role.name)
282300

283301
def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__with_reader_as_init_conn(
284-
self, test_driver: TestDriver, conn_utils, props, rds_utils):
302+
self, test_driver: TestDriver, conn_utils, props_with_failover, rds_utils):
285303
'''
286304
Will test for the following scenario:
287305
1. Initially connect to a reader instance via the custom endpoint.
@@ -297,13 +315,13 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
297315
kwargs["host"] = self.endpoint_info["Endpoint"]
298316
# This setting is not required for the test, but it allows us to also test re-creation of expired monitors since
299317
# it takes more than 30 seconds to modify the cluster endpoint (usually around 140s).
300-
props["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
301-
props["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
318+
props_with_failover["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
319+
props_with_failover["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
302320

303321
# Ensure that we are starting with a reader connection
304322
self._setup_custom_endpoint_role(target_driver_connect, kwargs, rds_utils, HostRole.READER)
305323

306-
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
324+
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props_with_failover)
307325
endpoint_members = self.endpoint_info["StaticMembers"]
308326
original_reader_id = rds_utils.query_instance_id(conn)
309327
assert original_reader_id in endpoint_members
@@ -350,28 +368,28 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
350368

351369
def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__with_writer_as_init_conn(
352370
self, test_driver: TestDriver, conn_utils, props, rds_utils):
353-
'''
371+
"""
354372
Will test for the following scenario:
355-
1. Iniitially connect to the writer instance via the custom endpoint.
373+
1. Initially connect to the writer instance via the custom endpoint.
356374
2. Attempt to switch to reader instance - should succeed, but will still use writer instance as reader.
357375
3. Modify the custom endpoint to add a reader instance as a static member.
358376
4. Switch to reader instance - should succeed.
359377
5. Switch back to writer instance - should succeed.
360378
6. Modify the custom endpoint to remove the reader instance as a static member.
361379
7. Attempt to switch to reader instance - should fail since the custom endpoint no longer has the reader instance.
362-
'''
380+
"""
363381

364382
target_driver_connect = DriverHelper.get_connect_func(test_driver)
365383
kwargs = conn_utils.get_connect_params()
366384
kwargs["host"] = self.endpoint_info["Endpoint"]
367385
# This setting is not required for the test, but it allows us to also test re-creation of expired monitors since
368386
# it takes more than 30 seconds to modify the cluster endpoint (usually around 140s).
369-
props["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
370-
props["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
387+
props_with_failover["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
388+
props_with_failover["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
371389

372390
# Ensure that we are starting with a writer connection
373391
self._setup_custom_endpoint_role(target_driver_connect, kwargs, rds_utils, HostRole.WRITER)
374-
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
392+
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props_with_failover)
375393

376394
endpoint_members = self.endpoint_info["StaticMembers"]
377395
original_writer_id = str(rds_utils.query_instance_id(conn))

tests/integration/container/test_read_write_splitting.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,7 @@ def test_pooled_connection__cluster_url_failover(
832832
TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
833833
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
834834
@disable_on_engines([DatabaseEngine.MYSQL])
835+
@pytest.mark.repeat(10)
835836
def test_pooled_connection__failover_failed(
836837
self,
837838
test_environment: TestEnvironment,

0 commit comments

Comments
 (0)