Skip to content

Commit 916a7e8

Browse files
committed
(improvement) unit tests for benchmarking query planning.
Not a very scientific one, but reasonable to get some measurements in terms of how different optimizations work. Example run (on scylladb#650 branch): ykaul@ykaul:~/github/python-driver$ pytest -s tests/unit/test_policy_performance.py /usr/lib/python3.14/site-packages/pytest_asyncio/plugin.py:211: PytestDeprecationWarning: The configuration option "asyncio_default_fixture_loop_scope" is unset. The event loop scope for asynchronous fixtures will default to the fixture caching scope. Future versions of pytest-asyncio will default the loop scope for asynchronous fixtures to function scope. Set the default fixture loop scope explicitly in order to avoid unexpected behavior in the future. Valid fixture loop scopes are: "function", "class", "module", "package", "session" warnings.warn(PytestDeprecationWarning(_DEFAULT_FIXTURE_LOOP_SCOPE_UNSET)) ============================================================================================================ test session starts ============================================================================================================= platform linux -- Python 3.14.2, pytest-8.3.5, pluggy-1.6.0 rootdir: /home/ykaul/github/python-driver configfile: pyproject.toml plugins: asyncio-1.1.0, anyio-4.12.1 asyncio: mode=Mode.STRICT, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function collected 4 items tests/unit/test_policy_performance.py Pinned to CPU 0 .... === Performance Benchmarks === Policy | Ops | Time (s) | Kops/s ---------------------------------------------------------------------- DCAware | 100000 | 0.2328 | 429 RackAware | 100000 | 0.3637 | 274 TokenAware(DCAware) | 100000 | 1.5884 | 62 TokenAware(RackAware) | 100000 | 1.6816 | 59 ---------------------------------------------------------------------- Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent d1418c7 commit 916a7e8

1 file changed

Lines changed: 187 additions & 0 deletions

File tree

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import unittest
2+
import time
3+
import uuid
4+
import struct
5+
import os
6+
import statistics
7+
from unittest.mock import Mock
8+
9+
from cassandra.policies import (
10+
DCAwareRoundRobinPolicy,
11+
RackAwareRoundRobinPolicy,
12+
TokenAwarePolicy
13+
)
14+
from cassandra.pool import Host
15+
from cassandra.cluster import SimpleConvictionPolicy
16+
17+
# Mock for Connection/EndPoint since Host expects it
18+
class MockEndPoint(object):
19+
__slots__ = ('address',)
20+
21+
def __init__(self, address):
22+
self.address = address
23+
def __str__(self):
24+
return self.address
25+
26+
class MockStatement(object):
27+
__slots__ = ('routing_key', 'keyspace', 'table')
28+
29+
def __init__(self, routing_key, keyspace="ks", table="tbl"):
30+
self.routing_key = routing_key
31+
self.keyspace = keyspace
32+
self.table = table
33+
34+
def is_lwt(self):
35+
return False
36+
37+
class MockTokenMap(object):
38+
__slots__ = ('token_class',)
39+
def __init__(self):
40+
self.token_class = Mock()
41+
self.token_class.from_key = lambda k: k
42+
43+
class MockTablets(object):
44+
__slots__ = ()
45+
def get_tablet_for_key(self, keyspace, table, key):
46+
return None
47+
48+
class MockMetadata(object):
49+
__slots__ = ('_tablets', 'token_map', 'get_replicas_func')
50+
def __init__(self, get_replicas_func):
51+
self._tablets = MockTablets()
52+
self.token_map = MockTokenMap()
53+
self.get_replicas_func = get_replicas_func
54+
55+
def can_support_partitioner(self):
56+
return True
57+
58+
def get_replicas(self, keyspace, key):
59+
return self.get_replicas_func(keyspace, key)
60+
61+
class MockCluster(object):
62+
__slots__ = ('metadata',)
63+
def __init__(self, metadata):
64+
self.metadata = metadata
65+
66+
class TestPolicyPerformance(unittest.TestCase):
67+
@classmethod
68+
def setUpClass(cls):
69+
if hasattr(os, 'sched_setaffinity'):
70+
try:
71+
# Pin to the first available CPU
72+
cpu = list(os.sched_getaffinity(0))[0]
73+
os.sched_setaffinity(0, {cpu})
74+
print(f"Pinned to CPU {cpu}")
75+
except Exception as e:
76+
print(f"Could not pin CPU: {e}")
77+
78+
# 1. Topology: 5 DCs, 3 Racks/DC, 3 Nodes/Rack = 45 Nodes
79+
cls.hosts = []
80+
cls.hosts_map = {} # host_id -> Host
81+
cls.replicas_map = {} # routing_key -> list of replica hosts
82+
83+
# Deterministic generation
84+
dcs = ['dc{}'.format(i) for i in range(5)]
85+
racks = ['rack{}'.format(i) for i in range(3)]
86+
nodes_per_rack = 3
87+
88+
ip_counter = 0
89+
subnet_counter = 0
90+
for dc in dcs:
91+
for rack in racks:
92+
subnet_counter += 1
93+
for node_idx in range(nodes_per_rack):
94+
ip_counter += 1
95+
address = "127.0.{}.{}".format(subnet_counter, node_idx + 1)
96+
h_id = uuid.UUID(int=ip_counter)
97+
h = Host(MockEndPoint(address), SimpleConvictionPolicy, host_id=h_id)
98+
h.set_location_info(dc, rack)
99+
cls.hosts.append(h)
100+
cls.hosts_map[h_id] = h
101+
102+
# 2. Queries: 100,000 deterministic queries
103+
cls.query_count = 100000
104+
cls.queries = []
105+
cls.results = []
106+
# We'll use simple packed integers as routing keys
107+
for i in range(cls.query_count):
108+
key = struct.pack('>I', i)
109+
cls.queries.append(MockStatement(routing_key=key))
110+
111+
# Pre-calculate replicas for TokenAware:
112+
# Deterministically pick 3 replicas based on the key index
113+
# This simulates the metadata.get_replicas behavior
114+
# We pick index i, i+1, i+2 mod 45
115+
replicas = []
116+
for r in range(3):
117+
idx = (i + r) % len(cls.hosts)
118+
replicas.append(cls.hosts[idx])
119+
cls.replicas_map[key] = replicas
120+
121+
def _get_replicas_side_effect(self, keyspace, key):
122+
return self.replicas_map.get(key, [])
123+
124+
def _setup_cluster_mock(self):
125+
metadata = MockMetadata(self._get_replicas_side_effect)
126+
return MockCluster(metadata)
127+
128+
def _run_benchmark(self, name, policy):
129+
# Setup
130+
cluster = self._setup_cluster_mock()
131+
policy.populate(cluster, self.hosts)
132+
133+
# Warmup
134+
for _ in range(100):
135+
list(policy.make_query_plan(working_keyspace="ks", query=self.queries[0]))
136+
137+
# Run multiple iterations to reduce noise
138+
iterations = 5
139+
timings = []
140+
141+
for _ in range(iterations):
142+
start_time = time.perf_counter()
143+
for q in self.queries:
144+
# We consume the iterator to ensure full plan generation cost is paid
145+
for _ in policy.make_query_plan(working_keyspace="ks", query=q):
146+
pass
147+
end_time = time.perf_counter()
148+
timings.append(end_time - start_time)
149+
150+
# Use median to filter outliers
151+
duration = statistics.median(timings)
152+
153+
count = len(self.queries)
154+
ops_per_sec = count / duration
155+
kops = int(ops_per_sec / 1000)
156+
157+
self.results.append((name, count, duration, kops))
158+
return ops_per_sec
159+
160+
@classmethod
161+
def tearDownClass(cls):
162+
print("\n\n=== Performance Benchmarks ===")
163+
print(f"{'Policy':<30} | {'Ops':<10} | {'Time (s)':<10} | {'Kops/s':<10}")
164+
print("-" * 70)
165+
for name, count, duration, kops in cls.results:
166+
print(f"{name:<30} | {count:<10} | {duration:<10.4f} | {kops:<10}")
167+
print("-" * 70)
168+
169+
def test_dc_aware(self):
170+
# Local DC = dc0, 1 remote host per DC
171+
policy = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1)
172+
self._run_benchmark("DCAware", policy)
173+
174+
def test_rack_aware(self):
175+
# Local DC = dc0, Local Rack = rack0, 1 remote host per DC
176+
policy = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1)
177+
self._run_benchmark("RackAware", policy)
178+
179+
def test_token_aware_wrapping_dc_aware(self):
180+
child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1)
181+
policy = TokenAwarePolicy(child, shuffle_replicas=False) # False for strict determinism in test if needed
182+
self._run_benchmark("TokenAware(DCAware)", policy)
183+
184+
def test_token_aware_wrapping_rack_aware(self):
185+
child = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1)
186+
policy = TokenAwarePolicy(child, shuffle_replicas=False)
187+
self._run_benchmark("TokenAware(RackAware)", policy)

0 commit comments

Comments
 (0)