Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RLTest/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ def do_normal_conn(self, line):
'--cluster_node_timeout', default=5000,
help='sets the node timeout on cluster in milliseconds')

parser.add_argument(
'--cluster-start-timeout', default=40, type=int,
help='timeout in seconds to wait for cluster to be ready (default 40 seconds). '
'Increase for large shard counts (e.g., 99 shards).')

parser.add_argument(
'--cluster_credentials',
help='enterprise cluster cluster_credentials "username:password", relevent only when running with cluster_existing-env')
Expand Down Expand Up @@ -538,6 +543,7 @@ def __init__(self):
Defaults.tls_passphrase = self.args.tls_passphrase
Defaults.oss_password = self.args.oss_password
Defaults.cluster_node_timeout = self.args.cluster_node_timeout
Defaults.cluster_start_timeout = self.args.cluster_start_timeout
Defaults.enable_debug_command = True if self.args.allow_unsafe else self.args.enable_debug_command
Defaults.enable_protected_configs = True if self.args.allow_unsafe else self.args.enable_protected_configs
Defaults.enable_module_command = True if self.args.allow_unsafe else self.args.enable_module_command
Expand Down
4 changes: 3 additions & 1 deletion RLTest/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class Defaults:
randomize_ports = False
oss_password = None
cluster_node_timeout = None
cluster_start_timeout = 40
curr_test_name = None
port = 6379
enable_debug_command = False
Expand Down Expand Up @@ -369,7 +370,8 @@ def getEnvKwargs(self):
'terminateRetries': self.terminateRetries,
'terminateRetrySecs': self.terminateRetrySecs,
'redisConfigFile': self.redisConfigFile,
'dualTLS': self.dualTLS
'dualTLS': self.dualTLS,
'clusterStartTimeout': Defaults.cluster_start_timeout
}
return kwargs

Expand Down
65 changes: 54 additions & 11 deletions RLTest/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import time
from RLTest.utils import Colors

# Interval in seconds between status updates during cluster wait
CLUSTER_STATUS_INTERVAL_SEC = 5


class ClusterEnv(object):
def __init__(self, **kwargs):
Expand All @@ -23,6 +26,7 @@ def __init__(self, **kwargs):
self.protocol = kwargs.get('protocol', 2)
self.terminateRetries = kwargs.get('terminateRetries', None)
self.terminateRetrySecs = kwargs.get('terminateRetrySecs', None)
self.clusterStartTimeout = kwargs.pop('clusterStartTimeout', 40)
Comment thread
fcostaoliveira marked this conversation as resolved.
startPort = kwargs.pop('port', 10000)
totalRedises = self.shardsCount * (2 if useSlaves else 1)
randomizePorts = kwargs.pop('randomizePorts', False)
Expand Down Expand Up @@ -50,7 +54,8 @@ def getInformationBeforeDispose(self):
def getInformationAfterDispose(self):
return [shard.getInformationAfterDispose() for shard in self.shards]

def _agreeOk(self):
def _countOk(self):
"""Returns count of shards reporting cluster_state:ok"""
ok = 0
for shard in self.shards:
con = shard.getConnection()
Expand All @@ -61,9 +66,10 @@ def _agreeOk(self):
continue
if 'cluster_state:ok' in str(status):
ok += 1
return ok == len(self.shards)
return ok

def _agreeSlots(self):
def _countAgreeSlots(self):
"""Returns count of shards that agree on slots view"""
ok = 0
first_view = None
for shard in self.shards:
Expand All @@ -77,35 +83,68 @@ def _agreeSlots(self):
first_view = slots_view
if slots_view == first_view:
ok += 1
return ok == len(self.shards)
return ok

def waitCluster(self, timeout_sec=40, verbose=True):
if timeout_sec < CLUSTER_STATUS_INTERVAL_SEC:
Comment thread
fcostaoliveira marked this conversation as resolved.
Outdated
raise ValueError(
"Cluster timeout (%d seconds) must be at least %d seconds (status interval)" %
(timeout_sec, CLUSTER_STATUS_INTERVAL_SEC))

def waitCluster(self, timeout_sec=40):
st = time.time()
last_status_time = st
total_shards = len(self.shards)

if verbose:
print(Colors.Yellow('Waiting for cluster to be ready (timeout: %d seconds, %d shards)...' %
(timeout_sec, total_shards)))

while st + timeout_sec > time.time():
if self._agreeOk() and self._agreeSlots():
ok_count = self._countOk()
slots_count = self._countAgreeSlots()

if ok_count == total_shards and slots_count == total_shards:
Comment thread
fcostaoliveira marked this conversation as resolved.
elapsed = time.time() - st
if verbose:
print(Colors.Green('Cluster is ready after %.1f seconds' % elapsed))
for shard in self.shards:
try:
shard.getConnection().execute_command('SEARCH.CLUSTERREFRESH')
except Exception:
pass
return

# Print periodic status update
now = time.time()
if verbose and (now - last_status_time) >= CLUSTER_STATUS_INTERVAL_SEC:
elapsed = now - st
print(Colors.Yellow(' Cluster wait: %.1fs elapsed - %d/%d shards OK, %d/%d agree on slots...' %
(elapsed, ok_count, total_shards, slots_count, total_shards)))
last_status_time = now

time.sleep(0.1)
raise RuntimeError(
"Cluster OK wait loop timed out after %s seconds" % timeout_sec)

def startEnv(self, masters=True, slaves=True):
if self.envIsUp == True:
return # env is already up

total_shards = len(self.shards)
print(Colors.Yellow('Starting cluster with %d shards...' % total_shards))

try:
for shard in self.shards:
for i, shard in enumerate(self.shards):
shard.startEnv(masters, slaves)
except Exception:
print(Colors.Yellow(' Started shard %d/%d' % (i + 1, total_shards)))
except Exception as e:
print(Colors.Bred('Error starting shard %d: %s' % (i + 1, str(e))))
print(Colors.Bred('Stopping all shards...'))
for shard in self.shards:
shard.stopEnv()
raise

print(Colors.Yellow('Configuring cluster topology...'))
slots_per_node = int(16384 / len(self.shards)) + 1
for i, shard in enumerate(self.shards):
con = shard.getConnection()
Expand All @@ -121,10 +160,14 @@ def startEnv(self, masters=True, slaves=True):
try:
con.execute_command('CLUSTER', 'ADDSLOTS', *(str(x)
for x in range(start_slot, end_slot)))
except Exception:
pass
except Exception as e:
print(Colors.Bred(' Error assigning slots %d-%d to shard %d: %s' %
(start_slot, end_slot - 1, i + 1, str(e))))

self.waitCluster()
print(Colors.Yellow(' Configured shard %d/%d (slots %d-%d)' %
(i + 1, total_shards, start_slot, min(end_slot - 1, 16383))))

self.waitCluster(timeout_sec=self.clusterStartTimeout)
self.envIsUp = True
self.envIsHealthy = True

Expand Down