diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 048dbd1352..3c75a33603 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -77,6 +77,12 @@ jobs: - name: Build driver run: uv sync + - name: Cache Scylla download + uses: actions/cache@v4 + with: + path: ~/.ccm/repository + key: scylla-${{ env.SCYLLA_VERSION }}-${{ runner.os }} + # This is to get honest accounting of test time vs download time vs build time. # Not strictly necessary for running tests. - name: Download Scylla diff --git a/pyproject.toml b/pyproject.toml index 7f60ed0b2a..1335027fcd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -121,6 +121,9 @@ log_level = "DEBUG" log_date_format = "%Y-%m-%d %H:%M:%S" xfail_strict = true addopts = "-rf" +markers = [ + "last: mark test to run last within its module group", +] [tool.setuptools_scm] version_file = "cassandra/_version.py" diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index a53e7aafa6..286561c291 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -442,7 +442,7 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None, else: log.debug("Using unnamed external cluster") if set_keyspace and start: - setup_keyspace(ipformat=ipformat, wait=False) + setup_keyspace(ipformat=ipformat) return if is_current_cluster(cluster_name, nodes, workloads): @@ -600,7 +600,7 @@ def execute_with_long_wait_retry(session, query, timeout=30): del tb tries += 1 - raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) + raise RuntimeError("Failed to execute query after 10 attempts: {0}".format(query)) def execute_with_retry_tolerant(session, query, retry_exceptions, escape_exception): @@ -632,11 +632,7 @@ def drop_keyspace_shutdown_cluster(keyspace_name, session, cluster): cluster.shutdown() -def setup_keyspace(ipformat=None, wait=True, protocol_version=None, port=9042): - # wait for nodes to startup - if wait: - time.sleep(10) - +def setup_keyspace(ipformat=None, protocol_version=None, port=9042): if protocol_version: _protocol_version = protocol_version else: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a682bcb608..5db8026675 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -17,7 +17,7 @@ def cleanup_clusters(): if not os.environ.get('DISABLE_CLUSTER_CLEANUP'): for cluster_name in [CLUSTER_NAME, SINGLE_NODE_CLUSTER_NAME, MULTIDC_CLUSTER_NAME, - 'shared_aware', 'sni_proxy', 'test_ip_change']: + 'cluster_tests', 'shared_aware', 'sni_proxy', 'test_ip_change', 'test_client_routes_replacement']: try: cluster = CCMClusterFactory.load(ccm_path, cluster_name) logging.debug("Using external CCM cluster {0}".format(cluster.name)) diff --git a/tests/integration/long/test_loadbalancingpolicies.py b/tests/integration/long/test_loadbalancingpolicies.py index fd8edde14c..072786dc23 100644 --- a/tests/integration/long/test_loadbalancingpolicies.py +++ b/tests/integration/long/test_loadbalancingpolicies.py @@ -45,7 +45,6 @@ class LoadBalancingPolicyTests(unittest.TestCase): def setUp(self): - remove_cluster() # clear ahead of test so it doesn't use one left in unknown state self.coordinator_stats = CoordinatorStats() self.prepared = None self.probe_cluster = None @@ -191,6 +190,7 @@ def test_token_aware_is_used_by_default(self): assert isinstance(cluster.profile_manager.default.load_balancing_policy, DCAwareRoundRobinPolicy) def test_roundrobin(self): + remove_cluster() use_singledc() keyspace = 'test_roundrobin' cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy()) @@ -228,6 +228,7 @@ def test_roundrobin(self): self.coordinator_stats.assert_query_count_equals(3, 6) def test_roundrobin_two_dcs(self): + remove_cluster() use_multidc([2, 2]) keyspace = 'test_roundrobin_two_dcs' cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy()) @@ -261,6 +262,7 @@ def test_roundrobin_two_dcs(self): self.coordinator_stats.assert_query_count_equals(5, 3) def test_roundrobin_two_dcs_2(self): + remove_cluster() use_multidc([2, 2]) keyspace = 'test_roundrobin_two_dcs_2' cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy()) @@ -294,6 +296,7 @@ def test_roundrobin_two_dcs_2(self): self.coordinator_stats.assert_query_count_equals(5, 3) def test_dc_aware_roundrobin_two_dcs(self): + remove_cluster() use_multidc([3, 2]) keyspace = 'test_dc_aware_roundrobin_two_dcs' cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc1')) @@ -311,6 +314,7 @@ def test_dc_aware_roundrobin_two_dcs(self): self.coordinator_stats.assert_query_count_equals(5, 0) def test_dc_aware_roundrobin_two_dcs_2(self): + remove_cluster() use_multidc([3, 2]) keyspace = 'test_dc_aware_roundrobin_two_dcs_2' cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc2')) @@ -328,6 +332,7 @@ def test_dc_aware_roundrobin_two_dcs_2(self): self.coordinator_stats.assert_query_count_equals(5, 6) def test_dc_aware_roundrobin_one_remote_host(self): + remove_cluster() use_multidc([2, 2]) keyspace = 'test_dc_aware_roundrobin_one_remote_host' cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc2', used_hosts_per_remote_dc=1)) @@ -410,6 +415,7 @@ def test_token_aware_prepared(self): self.token_aware(keyspace, True) def token_aware(self, keyspace, use_prepared=False): + remove_cluster() use_singledc() cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy())) self.addCleanup(cluster.shutdown) @@ -505,6 +511,7 @@ def test_token_aware_composite_key(self): assert results[0].i def test_token_aware_with_rf_2(self, use_prepared=False): + remove_cluster() use_singledc() keyspace = 'test_token_aware_with_rf_2' cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy())) @@ -617,6 +624,7 @@ def test_token_aware_with_transient_replication(self): @test_category policy """ + remove_cluster() # We can test this with a single dc when CASSANDRA-15670 is fixed use_multidc([3, 3]) @@ -647,6 +655,7 @@ def test_token_aware_with_transient_replication(self): def _set_up_shuffle_test(self, keyspace, replication_factor): + remove_cluster() use_singledc() cluster, session = self._cluster_session_with_lbp( TokenAwarePolicy(RoundRobinPolicy(), shuffle_replicas=True) @@ -678,6 +687,7 @@ def _check_query_order_changes(self, session, keyspace): self.coordinator_stats.reset_counts() def test_white_list(self): + remove_cluster() use_singledc() keyspace = 'test_white_list' @@ -723,6 +733,7 @@ def test_black_list_with_host_filter_policy(self): @test_category policy """ + remove_cluster() use_singledc() keyspace = 'test_black_list_with_hfp' ignored_address = (IP_FORMAT % 2) diff --git a/tests/integration/simulacron/test_connection.py b/tests/integration/simulacron/test_connection.py index 818d0b46b9..ceceea814f 100644 --- a/tests/integration/simulacron/test_connection.py +++ b/tests/integration/simulacron/test_connection.py @@ -23,7 +23,7 @@ from cassandra.policies import HostStateListener, RoundRobinPolicy, WhiteListRoundRobinPolicy from tests import connection_class, thread_pool_executor_class -from tests.util import late +from tests.util import late, wait_until_not_raised from tests.integration import requiressimulacron, libevtest from tests.integration.util import assert_quiescent_pool_state # important to import the patch PROTOCOL_VERSION from the simulacron module @@ -356,13 +356,15 @@ def test_retry_after_defunct(self): for _ in range(10): session.execute(query_to_prime) - # Might take some time to close the previous connections and reconnect - time.sleep(10) - assert_quiescent_pool_state(cluster) + # Wait for previous connections to close and pool to stabilize + wait_until_not_raised( + lambda: assert_quiescent_pool_state(cluster), + delay=1, max_attempts=30) clear_queries() - time.sleep(10) - assert_quiescent_pool_state(cluster) + wait_until_not_raised( + lambda: assert_quiescent_pool_state(cluster), + delay=1, max_attempts=30) def test_idle_connection_is_not_closed(self): """ diff --git a/tests/integration/simulacron/utils.py b/tests/integration/simulacron/utils.py index b6136e247a..2322319234 100644 --- a/tests/integration/simulacron/utils.py +++ b/tests/integration/simulacron/utils.py @@ -89,8 +89,13 @@ def start_simulacron(): SERVER_SIMULACRON.start() - # TODO improve this sleep, maybe check the logs like ccm - time.sleep(5) + # Poll the admin endpoint until simulacron is ready + def _check_simulacron_ready(): + opener = build_opener(HTTPHandler) + request = Request("http://127.0.0.1:8187/cluster") + opener.open(request, timeout=2) + + wait_until_not_raised(_check_simulacron_ready, delay=0.5, max_attempts=30) def stop_simulacron(): diff --git a/tests/integration/standard/conftest.py b/tests/integration/standard/conftest.py index 6028c2a06d..ea1dbafbdc 100644 --- a/tests/integration/standard/conftest.py +++ b/tests/integration/standard/conftest.py @@ -1,6 +1,73 @@ import pytest import logging +# Cluster topology groups for test ordering. +# Tests are sorted so that modules sharing the same CCM cluster run +# together, minimising expensive cluster teardown/restart cycles. +# Lower number = runs first. Modules not listed get a high default. +_MODULE_CLUSTER_ORDER = { + # Group 0: default 3-node singledc (CLUSTER_NAME = 'test_cluster') + "test_metadata": 0, + "test_policies": 0, + "test_control_connection": 0, + "test_routing": 0, + "test_prepared_statements": 0, + "test_metrics": 0, + "test_connection": 0, + "test_concurrent": 0, + "test_custom_payload": 0, + "test_query_paging": 0, + "test_single_interface": 0, + "test_rate_limit_exceeded": 0, + # Group 1: 'cluster_tests' (--smp 2, 3 nodes) + "test_cluster": 1, + "test_shard_aware": 1, + # Group 2: 'shared_aware' (--smp 2 --memory 2048M, 3 nodes) + "test_use_keyspace": 2, + "test_client_routes": 2, + # Group 3: single-node cluster + "test_types": 3, + "test_cython_protocol_handlers": 3, + "test_custom_protocol_handler": 3, + "test_row_factories": 3, + "test_udts": 3, + "test_client_warnings": 3, + "test_application_info": 3, + # Group 4: destructive / special clusters (run last) + "test_ip_change": 4, + "test_authentication": 4, + "test_authentication_misconfiguration": 4, + "test_custom_cluster": 4, + "test_query": 4, + # Group 5: tablets (destructive — decommissions a node) + "test_tablets": 5, + # Group 6: schema change + node kill (destructive — kills node2) + "test_concurrent_schema_change_and_node_kill": 6, + # Group 7: multi-dc (7 nodes — most expensive to create) + "test_rack_aware_policy": 7, +} + + +def pytest_collection_modifyitems(items): + """Sort tests so modules with the same cluster topology are adjacent. + + Uses the original collection index as tie-breaker so that the + definition order inside each file is preserved (important for tests + that depend on running order, e.g. destructive tablet tests). + """ + orig_order = {id(item): idx for idx, item in enumerate(items)} + + def _sort_key(item): + module_name = item.module.__name__.rsplit(".", 1)[-1] + return ( + _MODULE_CLUSTER_ORDER.get(module_name, 99), + item.fspath, + orig_order[id(item)], + ) + + items[:] = sorted(items, key=_sort_key) + + # from https://github.com/streamlit/streamlit/pull/5047/files def pytest_sessionfinish(): # We're not waiting for scriptrunner threads to cleanly close before ending the PyTest, @@ -10,4 +77,4 @@ def pytest_sessionfinish(): # * https://github.com/pytest-dev/pytest/issues/5282 # To prevent the exception from being raised on pytest_sessionfinish # we disable exception raising in logging module - logging.raiseExceptions = False \ No newline at end of file + logging.raiseExceptions = False diff --git a/tests/integration/standard/test_authentication.py b/tests/integration/standard/test_authentication.py index 0208909494..4a1bcd97d0 100644 --- a/tests/integration/standard/test_authentication.py +++ b/tests/integration/standard/test_authentication.py @@ -34,8 +34,12 @@ #This can be tested for remote hosts, but the cluster has to be configured accordingly #@local +_saved_scylla_ext_opts = None + def setup_module(): + global _saved_scylla_ext_opts + _saved_scylla_ext_opts = os.environ.get('SCYLLA_EXT_OPTS') os.environ['SCYLLA_EXT_OPTS'] = '--auth-superuser-name=cassandra --auth-superuser-salted-password=$6$x7IFjiX5VCpvNiFk$2IfjTvSyGL7zerpV.wbY7mJjaRCrJ/68dtT3UpT.sSmNYz1bPjtn3mH.kJKFvaZ2T4SbVeBijjmwGjcb83LlV/' if CASSANDRA_IP.startswith("127.0.0.") and not USE_CASS_EXTERNAL: use_singledc(start=False) @@ -49,13 +53,29 @@ def setup_module(): # PYTHON-1328 # - # Give the cluster enough time to startup (and perform necessary initialization) - # before executing the test. + # Wait for PasswordAuthenticator to finish initializing (creating the + # default superuser). Poll by attempting to authenticate rather than + # using a fixed sleep. if CASSANDRA_VERSION > Version('4.0-a'): - time.sleep(10) + from tests.util import wait_until_not_raised + + def _check_auth_ready(): + cluster = TestCluster(protocol_version=PROTOCOL_VERSION, + auth_provider=PlainTextAuthProvider('cassandra', 'cassandra')) + try: + session = cluster.connect() + session.execute("SELECT * FROM system.local WHERE key='local'") + finally: + cluster.shutdown() + + wait_until_not_raised(_check_auth_ready, delay=1, max_attempts=30) def teardown_module(): remove_cluster() # this test messes with config + if _saved_scylla_ext_opts is None: + os.environ.pop('SCYLLA_EXT_OPTS', None) + else: + os.environ['SCYLLA_EXT_OPTS'] = _saved_scylla_ext_opts class AuthenticationTests(unittest.TestCase): diff --git a/tests/integration/standard/test_client_routes.py b/tests/integration/standard/test_client_routes.py index a8a3c30f2c..b5e38dadd7 100644 --- a/tests/integration/standard/test_client_routes.py +++ b/tests/integration/standard/test_client_routes.py @@ -519,9 +519,21 @@ def assert_routes_direct(test, cluster, expected_node_ids, direct_port=9042): ) +_saved_scylla_ext_opts = None + + def setup_module(): + global _saved_scylla_ext_opts + _saved_scylla_ext_opts = os.environ.get('SCYLLA_EXT_OPTS') os.environ['SCYLLA_EXT_OPTS'] = "--smp 2 --memory 2048M" - use_cluster('test_client_routes', [3], start=True) + use_cluster('shared_aware', [3], start=True) + + +def teardown_module(): + if _saved_scylla_ext_opts is None: + os.environ.pop('SCYLLA_EXT_OPTS', None) + else: + os.environ['SCYLLA_EXT_OPTS'] = _saved_scylla_ext_opts @skip_scylla_version_lt(reason='scylladb/scylladb#26992 - system.client_routes is not yet supported', scylla_version="2026.1.0") @@ -1047,7 +1059,7 @@ def test_ssl_without_hostname_verification_through_nlb(self): def routes_visible(): with TestCluster( contact_points=["127.0.0.1"], - ssl_context=ssl_ctx, + ssl_context=ssl_ctx, connect_timeout=30, ) as c: session = c.connect() rs = session.execute( @@ -1059,7 +1071,7 @@ def routes_visible(): wait_until_not_raised( lambda: self.assertTrue(routes_visible()), - 0.5, 10, + 1, 30, ) with Cluster( @@ -1116,6 +1128,7 @@ class TestFullNodeReplacementThroughNlb(unittest.TestCase): @classmethod def setUpClass(cls): + cls._saved_scylla_ext_opts = os.environ.get('SCYLLA_EXT_OPTS') os.environ['SCYLLA_EXT_OPTS'] = "--smp 2 --memory 2048M" use_cluster('test_client_routes_replacement', [3], start=True) @@ -1133,6 +1146,10 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): cls.direct_cluster.shutdown() + if cls._saved_scylla_ext_opts is None: + os.environ.pop('SCYLLA_EXT_OPTS', None) + else: + os.environ['SCYLLA_EXT_OPTS'] = cls._saved_scylla_ext_opts def test_should_survive_full_node_replacement_through_nlb(self): """ diff --git a/tests/integration/standard/test_client_warnings.py b/tests/integration/standard/test_client_warnings.py index 781b5b7860..c18fa8cb1f 100644 --- a/tests/integration/standard/test_client_warnings.py +++ b/tests/integration/standard/test_client_warnings.py @@ -17,13 +17,13 @@ from cassandra.query import BatchStatement -from tests.integration import (use_singledc, PROTOCOL_VERSION, local, TestCluster, +from tests.integration import (use_single_node, PROTOCOL_VERSION, local, TestCluster, requires_custom_payload, xfail_scylla) from tests.util import assertRegex, assertDictEqual def setup_module(): - use_singledc() + use_single_node() @xfail_scylla('scylladb/scylladb#10196 - scylla does not report warnings') class ClientWarningTests(unittest.TestCase): diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index bf62f5df48..08b823d716 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -51,12 +51,24 @@ log = logging.getLogger(__name__) +_saved_scylla_ext_opts = None + + def setup_module(): - os.environ['SCYLLA_EXT_OPTS'] = "--smp 1" + global _saved_scylla_ext_opts + _saved_scylla_ext_opts = os.environ.get('SCYLLA_EXT_OPTS') + os.environ['SCYLLA_EXT_OPTS'] = "--smp 2" use_cluster("cluster_tests", [3], start=True, workloads=None) warnings.simplefilter("always") +def teardown_module(): + if _saved_scylla_ext_opts is None: + os.environ.pop('SCYLLA_EXT_OPTS', None) + else: + os.environ['SCYLLA_EXT_OPTS'] = _saved_scylla_ext_opts + + class IgnoredHostPolicy(RoundRobinPolicy): def __init__(self, ignored_hosts): @@ -720,10 +732,13 @@ def _warning_are_issued_when_auth(self, auth_provider): session = cluster.connect() assert session.execute("SELECT * from system.local WHERE key='local'") is not None - # Three conenctions to nodes plus the control connection + # Verify that auth warnings are issued for connections where + # auth is configured but the server does not send a challenge. + # At minimum one warning per node connection (3 for a 3-node + # cluster). The control connection and shard-aware connections + # may add more, so we only assert a lower bound. auth_warning = mock_handler.get_message_count('warning', "An authentication challenge was not sent") - assert auth_warning >= 4 - assert auth_warning == mock_handler.get_message_count("debug", "Got ReadyMessage on new connection") + assert auth_warning >= 3 def _wait_for_all_shard_connections(self, cluster, timeout=30): """Wait until all shard-aware connections are fully established.""" @@ -1121,8 +1136,7 @@ def test_stale_connections_after_shutdown(self): """ for _ in range(10): with TestCluster(protocol_version=3) as cluster: - cluster.connect().execute("SELECT * FROM system_schema.keyspaces") - time.sleep(1) + cluster.connect(wait_for_all_pools=True).execute("SELECT * FROM system_schema.keyspaces") with TestCluster(protocol_version=3) as cluster: session = cluster.connect() diff --git a/tests/integration/standard/test_concurrent_schema_change_and_node_kill.py b/tests/integration/standard/test_concurrent_schema_change_and_node_kill.py index aeda381c0d..910dcaa9fe 100644 --- a/tests/integration/standard/test_concurrent_schema_change_and_node_kill.py +++ b/tests/integration/standard/test_concurrent_schema_change_and_node_kill.py @@ -8,7 +8,7 @@ def setup_module(): - use_cluster('test_concurrent_schema_change_and_node_kill', [3], start=True) + use_cluster('test_schema_kill', [3], start=True) @local class TestConcurrentSchemaChangeAndNodeKill(unittest.TestCase): diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index 630e5e6ba0..df0f568c2c 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -32,6 +32,7 @@ from tests import is_monkey_patched from tests.integration import use_singledc, get_node, CASSANDRA_IP, local, \ requiresmallclockgranularity, greaterthancass20, TestCluster +from tests.util import wait_until try: import cassandra.io.asyncorereactor @@ -140,9 +141,10 @@ def test_heart_beat_timeout(self): # Wait for connections associated with this host go away self.wait_for_no_connections(host, self.cluster) - # Wait to seconds for the driver to be notified - time.sleep(2) - assert test_listener.host_down + # Wait for the driver to detect the host is down + wait_until( + lambda: test_listener.host_down, + delay=0.5, max_attempts=20) # Resume paused node finally: node.resume() diff --git a/tests/integration/standard/test_custom_protocol_handler.py b/tests/integration/standard/test_custom_protocol_handler.py index 239f7e7336..e123f2050e 100644 --- a/tests/integration/standard/test_custom_protocol_handler.py +++ b/tests/integration/standard/test_custom_protocol_handler.py @@ -20,7 +20,7 @@ ContinuousPagingOptions, NoHostAvailable) from cassandra import ProtocolVersion, ConsistencyLevel -from tests.integration import use_singledc, drop_keyspace_shutdown_cluster, \ +from tests.integration import use_single_node, drop_keyspace_shutdown_cluster, \ greaterthanorequalcass30, execute_with_long_wait_retry, greaterthanorequalcass3_10, \ TestCluster, greaterthanorequalcass40 from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES @@ -32,7 +32,7 @@ def setup_module(): - use_singledc() + use_single_node() update_datatypes() diff --git a/tests/integration/standard/test_cython_protocol_handlers.py b/tests/integration/standard/test_cython_protocol_handlers.py index f44d613c64..9c94b2ac77 100644 --- a/tests/integration/standard/test_cython_protocol_handlers.py +++ b/tests/integration/standard/test_cython_protocol_handlers.py @@ -12,7 +12,7 @@ from cassandra.protocol import ProtocolHandler, LazyProtocolHandler, NumpyProtocolHandler from cassandra.query import tuple_factory from tests import VERIFY_CYTHON -from tests.integration import use_singledc, notprotocolv1, \ +from tests.integration import use_single_node, notprotocolv1, \ drop_keyspace_shutdown_cluster, BasicSharedKeyspaceUnitTestCase, greaterthancass21, TestCluster from tests.integration.datatype_utils import update_datatypes from tests.integration.standard.utils import ( @@ -21,7 +21,7 @@ def setup_module(): - use_singledc() + use_single_node() update_datatypes() diff --git a/tests/integration/standard/test_ip_change.py b/tests/integration/standard/test_ip_change.py index 6d23d30e04..53debfa1f5 100644 --- a/tests/integration/standard/test_ip_change.py +++ b/tests/integration/standard/test_ip_change.py @@ -10,11 +10,22 @@ LOGGER = logging.getLogger(__name__) +_saved_scylla_ext_opts = None + def setup_module(): + global _saved_scylla_ext_opts + _saved_scylla_ext_opts = os.environ.get('SCYLLA_EXT_OPTS') os.environ['SCYLLA_EXT_OPTS'] = "--smp 2 --memory 2048M" use_cluster('test_ip_change', [3], start=True) + +def teardown_module(): + if _saved_scylla_ext_opts is None: + os.environ.pop('SCYLLA_EXT_OPTS', None) + else: + os.environ['SCYLLA_EXT_OPTS'] = _saved_scylla_ext_opts + @local class TestIpAddressChange(unittest.TestCase): @classmethod diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index 7b502d91c3..7ebdded141 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -25,6 +25,7 @@ from cassandra.cluster import NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from tests.integration import get_cluster, get_node, use_singledc, execute_until_pass, TestCluster +from tests.util import wait_until, wait_until_not_raised from cassandra import metrics from tests.integration import BasicSharedKeyspaceUnitTestCaseRF3WM, BasicExistingKeyspaceUnitTestCase, local @@ -75,8 +76,10 @@ def test_connection_error(self): self.session.execute(query) finally: get_cluster().start(wait_for_binary_proto=True, wait_other_notice=True) - # Give some time for the cluster to come back up, for the next test - time.sleep(5) + # Wait for the cluster to come back up for the next test + wait_until_not_raised( + lambda: self.session.execute("SELECT key FROM system.local WHERE key='local'"), + delay=0.5, max_attempts=30) assert self.cluster.metrics.stats.connection_errors > 0 @@ -156,7 +159,10 @@ def test_unavailable(self): # Sometimes this commands continues with the other nodes having not noticed # 1 is down, and a Timeout error is returned instead of an Unavailable get_node(1).stop(wait=True, wait_other_notice=True) - time.sleep(5) + wait_until( + lambda: not self.cluster.metadata.get_host('127.0.0.1') or + not self.cluster.metadata.get_host('127.0.0.1').is_up, + delay=0.5, max_attempts=30) try: # Test write query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL) @@ -171,8 +177,10 @@ def test_unavailable(self): assert self.cluster.metrics.stats.unavailables == 2 finally: get_node(1).start(wait_other_notice=True, wait_for_binary_proto=True) - # Give some time for the cluster to come back up, for the next test - time.sleep(5) + # Wait for the cluster to come back up for the next test + wait_until_not_raised( + lambda: self.session.execute("SELECT key FROM system.local WHERE key='local'"), + delay=0.5, max_attempts=30) self.cluster.shutdown() diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index 9cebc22b05..f9d3dc26bc 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -29,7 +29,7 @@ USE_CASS_EXTERNAL, greaterthanorequalcass40, TestCluster, xfail_scylla from tests import notwindows from tests.integration import greaterthanorequalcass30, get_node -from tests.util import assertListEqual +from tests.util import assertListEqual, wait_until import time import random @@ -1571,9 +1571,10 @@ def test_reprepare_after_host_is_down(self): get_node(1).start(wait_for_binary_proto=True, wait_other_notice=True) - # We wait for cluster._prepare_all_queries to be called - time.sleep(5) - assert 1 == mock_handler.get_message_count('debug', 'Preparing all known prepared statements') + # Wait for cluster._prepare_all_queries to be called + wait_until( + lambda: mock_handler.get_message_count('debug', 'Preparing all known prepared statements') >= 1, + delay=0.5, max_attempts=20) results = self.session.execute(prepared_statement, (1,), execution_profile="only_first") assert results.one() == (1, ) diff --git a/tests/integration/standard/test_rate_limit_exceeded.py b/tests/integration/standard/test_rate_limit_exceeded.py index 211f0c9930..ea7dfc7d61 100644 --- a/tests/integration/standard/test_rate_limit_exceeded.py +++ b/tests/integration/standard/test_rate_limit_exceeded.py @@ -4,13 +4,13 @@ from cassandra.cluster import Cluster from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy -from tests.integration import PROTOCOL_VERSION, use_cluster +from tests.integration import PROTOCOL_VERSION, use_singledc import pytest LOGGER = logging.getLogger(__name__) def setup_module(): - use_cluster('rate_limit', [3], start=True) + use_singledc() class TestRateLimitExceededException(unittest.TestCase): @classmethod diff --git a/tests/integration/standard/test_row_factories.py b/tests/integration/standard/test_row_factories.py index 187f35704a..818f11c061 100644 --- a/tests/integration/standard/test_row_factories.py +++ b/tests/integration/standard/test_row_factories.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from tests.integration import get_server_versions, use_singledc, \ +from tests.integration import get_server_versions, use_single_node, \ BasicSharedKeyspaceUnitTestCaseWFunctionTable, BasicSharedKeyspaceUnitTestCase, execute_until_pass, TestCluster import unittest @@ -24,7 +24,7 @@ def setup_module(): - use_singledc() + use_single_node() class NameTupleFactory(BasicSharedKeyspaceUnitTestCase): diff --git a/tests/integration/standard/test_shard_aware.py b/tests/integration/standard/test_shard_aware.py index 48d1aa3609..f4516ad9b4 100644 --- a/tests/integration/standard/test_shard_aware.py +++ b/tests/integration/standard/test_shard_aware.py @@ -27,13 +27,26 @@ from cassandra import OperationTimedOut, ConsistencyLevel from tests.integration import use_cluster, get_node, PROTOCOL_VERSION +from tests.util import wait_until_not_raised LOGGER = logging.getLogger(__name__) +_saved_scylla_ext_opts = None + + def setup_module(): + global _saved_scylla_ext_opts + _saved_scylla_ext_opts = os.environ.get('SCYLLA_EXT_OPTS') os.environ['SCYLLA_EXT_OPTS'] = "--smp 2" - use_cluster('shard_aware', [3], start=True) + use_cluster('cluster_tests', [3], start=True) + + +def teardown_module(): + if _saved_scylla_ext_opts is None: + os.environ.pop('SCYLLA_EXT_OPTS', None) + else: + os.environ['SCYLLA_EXT_OPTS'] = _saved_scylla_ext_opts class TestShardAwareIntegration(unittest.TestCase): @@ -178,11 +191,13 @@ def test_closing_connections(self): continue shard_id = random.choice(list(pool._connections.keys())) pool._connections.get(shard_id).close() - time.sleep(5) - self.query_data(self.session, verify_in_tracing=False) + wait_until_not_raised( + lambda: self.query_data(self.session, verify_in_tracing=False), + delay=0.5, max_attempts=30) - time.sleep(10) - self.query_data(self.session) + wait_until_not_raised( + lambda: self.query_data(self.session), + delay=0.5, max_attempts=60) @pytest.mark.skip def test_blocking_connections(self): @@ -212,13 +227,14 @@ def remove_iptables(): '--destination {node1_ip_address}/32 -j REJECT --reject-with icmp-port-unreachable' ).format(node1_ip_address=node1_ip_address, node1_port=node1_port).split(' ') ) - time.sleep(5) + time.sleep(2) # allow iptables rule to take effect try: self.query_data(self.session, verify_in_tracing=False) except OperationTimedOut: pass remove_iptables() - time.sleep(5) - self.query_data(self.session, verify_in_tracing=False) + wait_until_not_raised( + lambda: self.query_data(self.session, verify_in_tracing=False), + delay=0.5, max_attempts=30) self.query_data(self.session) diff --git a/tests/integration/standard/test_tablets.py b/tests/integration/standard/test_tablets.py index d9439e5c2c..8c8aebc1fb 100644 --- a/tests/integration/standard/test_tablets.py +++ b/tests/integration/standard/test_tablets.py @@ -1,11 +1,10 @@ -import time - import pytest from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy from tests.integration import PROTOCOL_VERSION, use_cluster, get_cluster +from tests.util import wait_until from tests.unit.test_host_connection_pool import LOGGER @@ -28,7 +27,7 @@ def teardown_class(cls): cls.cluster.shutdown() def verify_hosts_in_tracing(self, results, expected): - traces = results.get_query_trace() + traces = results.get_query_trace(max_wait=10) events = traces.events host_set = set() for event in events: @@ -54,7 +53,7 @@ def get_tablet_record(self, query): return metadata._tablets.get_tablet_for_key(query.keyspace, query.table, metadata.token_map.token_class.from_key(query.routing_key)) def verify_same_shard_in_tracing(self, results): - traces = results.get_query_trace() + traces = results.get_query_trace(max_wait=10) events = traces.events shard_set = set() for event in events: @@ -212,7 +211,10 @@ def test_tablets_invalidation_drop_ks(self): def drop_ks(_): # Drop and recreate ks and table to trigger tablets invalidation self.create_ks_and_cf(self.cluster.connect()) - time.sleep(3) + # Wait for tablet metadata to be refreshed + wait_until( + lambda: 'test1' in self.cluster.metadata.keyspaces, + delay=0.5, max_attempts=20) self.run_tablets_invalidation_test(drop_ks) @@ -233,7 +235,12 @@ def decommission_non_cc_node(rec): break else: assert False, "failed to find node to decommission" - time.sleep(10) + # Wait for decommission to complete and metadata to update + wait_until( + lambda: len([h for h in self.cluster.metadata.all_hosts() if h.is_up]) < 3, + delay=1, max_attempts=60) + # Tablet metadata invalidation may take additional time to propagate; + # run_tablets_invalidation_test will poll for the expected result. self.run_tablets_invalidation_test(decommission_non_cc_node) @@ -257,5 +264,7 @@ def run_tablets_invalidation_test(self, invalidate): invalidate(rec) - # Check if tablets information was purged - assert self.get_tablet_record(bound) is None, "tablet was not deleted, invalidation did not work" + # Wait for tablets information to be purged (invalidation is async) + wait_until( + lambda: self.get_tablet_record(bound) is None, + delay=0.5, max_attempts=20) diff --git a/tests/integration/standard/test_types.py b/tests/integration/standard/test_types.py index 1d66ce1ed9..559a6b3da0 100644 --- a/tests/integration/standard/test_types.py +++ b/tests/integration/standard/test_types.py @@ -38,7 +38,7 @@ from tests.unit.cython.utils import cythontest from tests.util import assertEqual -from tests.integration import use_singledc, execute_until_pass, notprotocolv1, \ +from tests.integration import use_single_node, execute_until_pass, notprotocolv1, \ BasicSharedKeyspaceUnitTestCase, greaterthancass21, lessthancass30, \ greaterthanorequalcass3_10, TestCluster, requires_composite_type, \ requires_vector_type @@ -48,7 +48,7 @@ def setup_module(): - use_singledc() + use_single_node() update_datatypes() diff --git a/tests/integration/standard/test_udts.py b/tests/integration/standard/test_udts.py index dd696ea0e9..e608a9610b 100644 --- a/tests/integration/standard/test_udts.py +++ b/tests/integration/standard/test_udts.py @@ -21,7 +21,7 @@ from cassandra.query import dict_factory from cassandra.util import OrderedMap -from tests.integration import use_singledc, execute_until_pass, \ +from tests.integration import use_single_node, execute_until_pass, \ BasicSegregatedKeyspaceUnitTestCase, greaterthancass20, lessthancass30, greaterthanorequalcass36, TestCluster from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES, PRIMITIVE_DATATYPES_KEYS, \ COLLECTION_TYPES, get_sample, get_collection_sample @@ -32,7 +32,7 @@ def setup_module(): - use_singledc() + use_single_node() update_datatypes() diff --git a/tests/integration/standard/test_use_keyspace.py b/tests/integration/standard/test_use_keyspace.py index 25e954b956..80e7cfe5f3 100644 --- a/tests/integration/standard/test_use_keyspace.py +++ b/tests/integration/standard/test_use_keyspace.py @@ -14,12 +14,23 @@ LOGGER = logging.getLogger(__name__) +_saved_scylla_ext_opts = None + def setup_module(): + global _saved_scylla_ext_opts + _saved_scylla_ext_opts = os.environ.get('SCYLLA_EXT_OPTS') os.environ['SCYLLA_EXT_OPTS'] = "--smp 2 --memory 2048M" use_cluster('shared_aware', [3], start=True) +def teardown_module(): + if _saved_scylla_ext_opts is None: + os.environ.pop('SCYLLA_EXT_OPTS', None) + else: + os.environ['SCYLLA_EXT_OPTS'] = _saved_scylla_ext_opts + + @local class TestUseKeyspace(unittest.TestCase): @classmethod diff --git a/tests/integration/upgrade/__init__.py b/tests/integration/upgrade/__init__.py index a1c751bcbd..fab6fed34a 100644 --- a/tests/integration/upgrade/__init__.py +++ b/tests/integration/upgrade/__init__.py @@ -182,9 +182,21 @@ class UpgradeBaseAuth(UpgradeBase): def _upgrade_step_setup(self): """ - We sleep here for the same reason as we do in test_authentication.py: - there seems to be some race, with some versions of C* taking longer to - get the auth (and default user) setup. Sleep here to give it a chance + Wait for PasswordAuthenticator to finish initializing (creating the + default superuser). Poll by attempting to authenticate rather than + using a fixed sleep. """ super(UpgradeBaseAuth, self)._upgrade_step_setup() - time.sleep(10) + + from cassandra.auth import PlainTextAuthProvider + from tests.util import wait_until_not_raised + + def _check_auth_ready(): + c = Cluster(auth_provider=PlainTextAuthProvider('cassandra', 'cassandra')) + try: + s = c.connect() + s.execute("SELECT * FROM system.local WHERE key='local'") + finally: + c.shutdown() + + wait_until_not_raised(_check_auth_ready, delay=1, max_attempts=30) diff --git a/tests/integration/upgrade/test_upgrade.py b/tests/integration/upgrade/test_upgrade.py index fec9a38604..45827723b3 100644 --- a/tests/integration/upgrade/test_upgrade.py +++ b/tests/integration/upgrade/test_upgrade.py @@ -19,11 +19,22 @@ from cassandra.cluster import ConsistencyLevel, Cluster, DriverException, ExecutionProfile from cassandra.policies import ConstantSpeculativeExecutionPolicy from tests.integration.upgrade import UpgradeBase, UpgradeBaseAuth, UpgradePath, upgrade_paths +from tests.util import wait_until import unittest import pytest +def _wait_for_control_connection(cluster_driver, timeout=60): + """Wait for the driver's control connection to be established.""" + wait_until( + lambda: cluster_driver.control_connection._connection is not None + and not cluster_driver.control_connection._connection.is_closed, + delay=1, + max_attempts=timeout, + ) + + # Previous Cassandra upgrade two_to_three_path = upgrade_paths([ UpgradePath("2.2.9-3.11", {"version": "2.2.9"}, {"version": "3.11.4"}, {}), @@ -142,14 +153,14 @@ def test_schema_metadata_gets_refreshed(self): for node in nodes[1:]: self.upgrade_node(node) # Wait for the control connection to reconnect - time.sleep(20) + _wait_for_control_connection(self.cluster_driver) with pytest.raises(DriverException): self.cluster_driver.refresh_schema_metadata(max_schema_agreement_wait=10) self.upgrade_node(nodes[0]) # Wait for the control connection to reconnect - time.sleep(20) + _wait_for_control_connection(self.cluster_driver) self.cluster_driver.refresh_schema_metadata(max_schema_agreement_wait=40) assert original_meta != self.cluster_driver.metadata.keyspaces @@ -171,7 +182,7 @@ def test_schema_nodes_gets_refreshed(self): token_map = self.cluster_driver.metadata.token_map self.upgrade_node(node) # Wait for the control connection to reconnect - time.sleep(20) + _wait_for_control_connection(self.cluster_driver) self.cluster_driver.refresh_nodes(force_token_rebuild=True) self._assert_same_token_map(token_map, self.cluster_driver.metadata.token_map)