From 04e7bef19f734e96526643f8539168b7c7779d67 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sun, 15 Mar 2026 18:33:54 +0200 Subject: [PATCH 1/2] (improvement) row_parser: cache ParseDesc for prepared statements Cache the ParseDesc object constructed in recv_results_rows() so that repeated executions of the same prepared statement skip the list comprehensions, ColDesc construction, and make_deserializers() call. The cache is keyed by id(column_metadata). For prepared statements the result_metadata list is stored on PreparedStatement and reused, so id() is stable. On cache hit we verify object identity (cached_ref is column_metadata) and that session-level settings (column_encryption_policy, protocol_version) still match. Implementation details: - _get_or_build_parse_desc: cached path, used only when column_metadata comes from result_metadata (prepared statements with stable id()). - _build_parse_desc: uncached path, used for inline metadata from non-prepared queries that creates a fresh list every execution. - Cache is bounded to 256 entries; cleared entirely when full. - Returns only (desc, column_names, column_types) to the caller, avoiding exposure of internal cache fields. - Thread-safety: dict get/set are atomic in CPython (GIL), but concurrent cache misses may cause redundant construction (benign). A clear_parse_desc_cache() and get_parse_desc_cache_size() function are exposed for testing. ## Benchmark results (median, pytest-benchmark) ### ParseDesc construction only (reference benchmarks) | Columns | **Before** (original) | **After** (with cache) | |---------|-----------------------|------------------------| | 5 cols | 3,966 ns | 191 ns | | 10 cols | 5,730 ns | 175 ns | | 20 cols | 9,266 ns | 166 ns | | 50 cols | 19,388 ns | 193 ns | ### Full pipeline integration (recv_results_rows through Cython) | Scenario | **Before** (original) | **After** (with cache) | |-------------------|-----------------------|------------------------| | 1 row x 10 col | 40,867 ns | 2,977 ns | | 100 rows x 5 col | 145,584 ns | 73,206 ns | | 1000 rows x 5 col | 1,099,825 ns | 999,517 ns | For small result sets (single-row lookups common with prepared statements), ParseDesc construction is a large fraction of the total response-path cost. Caching eliminates it entirely after the first execution. All 623 unit tests pass (16 skipped - pre-existing). --- benchmarks/test_parse_desc_cache_benchmark.py | 542 ++++++++++++++++++ cassandra/row_parser.pyx | 102 +++- 2 files changed, 639 insertions(+), 5 deletions(-) create mode 100644 benchmarks/test_parse_desc_cache_benchmark.py diff --git a/benchmarks/test_parse_desc_cache_benchmark.py b/benchmarks/test_parse_desc_cache_benchmark.py new file mode 100644 index 0000000000..fdd391d266 --- /dev/null +++ b/benchmarks/test_parse_desc_cache_benchmark.py @@ -0,0 +1,542 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Benchmarks for ParseDesc construction with and without caching. + +The ParseDesc is built on every response in recv_results_rows(). For prepared +statements the column_metadata list is the same object every time, so caching +the ParseDesc (keyed by id(column_metadata)) avoids repeated list +comprehensions, ColDesc construction, and make_deserializers() calls. + +There are two benchmark tiers: + +1. **Integration benchmarks** (test_integration_*): Exercise the actual Cython + _get_or_build_parse_desc function through the real recv_results_rows closure + returned by make_recv_results_rows(). These use a mock ResultMessage with a + binary buffer simulating the prepared statement path (NO_METADATA_FLAG set). + +2. **Isolated benchmarks** (test_parse_desc_*, test_full_pipeline_*): Measure + ParseDesc construction and row parsing using a pure-Python cache replica. + Useful for understanding the breakdown of costs but do not exercise the + actual Cython cache code path. + +Run with: + pytest benchmarks/test_parse_desc_cache_benchmark.py -v +""" + +import io +import struct +import pytest + +from cassandra import cqltypes +from cassandra.policies import ColDesc +from cassandra.parsing import ParseDesc +from cassandra.deserializers import make_deserializers +from cassandra.bytesio import BytesIOReader +from cassandra.obj_parser import ListParser +from cassandra.row_parser import ( + clear_parse_desc_cache, + get_parse_desc_cache_size, + make_recv_results_rows, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _build_column_metadata(ncols, cql_type=cqltypes.UTF8Type): + """Build a column_metadata list like the driver produces.""" + return [("ks", "tbl", "col_%d" % i, cql_type) for i in range(ncols)] + + +def _build_uncached_parse_desc( + column_metadata, column_encryption_policy, protocol_version +): + """Original uncached ParseDesc construction (baseline).""" + column_names = [md[2] for md in column_metadata] + column_types = [md[3] for md in column_metadata] + desc = ParseDesc( + column_names, + column_types, + column_encryption_policy, + [ColDesc(md[0], md[1], md[2]) for md in column_metadata], + make_deserializers(column_types), + protocol_version, + ) + return desc, column_names, column_types + + +def _build_binary_rows(nrows, ncols, col_value=b"hello world"): + """ + Build a binary buffer matching the Cassandra row format: + int32(rowcount) + for each row: + for each col: int32(len) + bytes + """ + parts = [struct.pack(">i", nrows)] + col_cell = struct.pack(">i", len(col_value)) + col_value + row_data = col_cell * ncols + for _ in range(nrows): + parts.append(row_data) + return b"".join(parts) + + +# --------------------------------------------------------------------------- +# Integration helpers — exercise the actual Cython recv_results_rows +# --------------------------------------------------------------------------- + +# NO_METADATA_FLAG as defined in ResultMessage +_NO_METADATA_FLAG = 0x0004 + + +class _MockResultMessage: + """ + Minimal mock of ResultMessage for the prepared-statement path. + + When NO_METADATA_FLAG is set in the binary stream, recv_results_metadata + reads just the flags + colcount and returns, leaving column_metadata as + None. The closure then falls through to result_metadata (the prepared + statement's stored metadata). + """ + + column_metadata = None + column_names = None + column_types = None + parsed_rows = None + paging_state = None + continuous_paging_seq = None + continuous_paging_last = None + result_metadata_id = None + + def recv_results_metadata(self, f, user_type_map): + """Simulate the prepared-statement path (NO_METADATA_FLAG is set).""" + # Read flags + colcount just like the real recv_results_metadata does + _flags = struct.unpack(">i", f.read(4))[0] + _colcount = struct.unpack(">i", f.read(4))[0] + # NO_METADATA_FLAG is set, so return immediately — column_metadata stays None + + +def _build_integration_binary_buf(nrows, ncols, col_value=b"hello world"): + """ + Build a full binary buffer for the integration benchmark. + + Format for the prepared-statement path: + int32(flags=NO_METADATA_FLAG) -- read by recv_results_metadata + int32(colcount) -- read by recv_results_metadata + int32(rowcount) -- read by BytesIOReader in parse_rows + for each row: + for each col: int32(len) + bytes + """ + parts = [] + parts.append(struct.pack(">i", _NO_METADATA_FLAG)) # flags + parts.append(struct.pack(">i", ncols)) # colcount + parts.append(struct.pack(">i", nrows)) # rowcount + col_cell = struct.pack(">i", len(col_value)) + col_value + row_data = col_cell * ncols + for _ in range(nrows): + parts.append(row_data) + return b"".join(parts) + + +# The actual Cython recv_results_rows closure — this calls _get_or_build_parse_desc internally +_cython_recv_results_rows = make_recv_results_rows(ListParser()) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _clear_cython_cache(): + """Ensure the Cython ParseDesc cache is empty before and after each test.""" + clear_parse_desc_cache() + yield + clear_parse_desc_cache() + + +@pytest.fixture() +def py_cache(): + """Provide a fresh pure-Python cache dict for each test that needs it.""" + return {} + + +# --------------------------------------------------------------------------- +# Pure-Python cache replica (reference implementation) +# Useful for understanding cost breakdown but does NOT exercise the actual +# Cython cdef inline _get_or_build_parse_desc function. +# --------------------------------------------------------------------------- + +_PY_CACHE_MAX_SIZE = 256 + + +def _cached_parse_desc_py( + column_metadata, column_encryption_policy, protocol_version, cache +): + """Pure-Python replica of the Cython cache for reference comparison.""" + cache_key = id(column_metadata) + cached = cache.get(cache_key) + if cached is not None: + if ( + cached[0] is column_metadata + and cached[1] is column_encryption_policy + and cached[2] == protocol_version + ): + return cached[3], cached[4], cached[5] + + column_names = [md[2] for md in column_metadata] + column_types = [md[3] for md in column_metadata] + desc = ParseDesc( + column_names, + column_types, + column_encryption_policy, + [ColDesc(md[0], md[1], md[2]) for md in column_metadata], + make_deserializers(column_types), + protocol_version, + ) + + if len(cache) >= _PY_CACHE_MAX_SIZE: + cache.clear() + + cache[cache_key] = ( + column_metadata, + column_encryption_policy, + protocol_version, + desc, + column_names, + column_types, + ) + return desc, column_names, column_types + + +# --------------------------------------------------------------------------- +# Integration benchmarks: actual Cython recv_results_rows +# These exercise the real _get_or_build_parse_desc cdef inline function. +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("nrows,ncols", [(1, 10), (100, 5), (1000, 5)]) +def test_integration_cython_cached(benchmark, nrows, ncols): + """Integration: Cython recv_results_rows with ParseDesc cache hit (prepared stmt).""" + col_meta = _build_column_metadata(ncols) + binary_buf = _build_integration_binary_buf(nrows, ncols) + + # Warm the Cython cache with the same col_meta object + warmup = _MockResultMessage() + _cython_recv_results_rows(warmup, io.BytesIO(binary_buf), 4, {}, col_meta, None) + + def run(): + msg = _MockResultMessage() + _cython_recv_results_rows(msg, io.BytesIO(binary_buf), 4, {}, col_meta, None) + return msg.parsed_rows + + rows = benchmark(run) + assert len(rows) == nrows + assert len(rows[0]) == ncols + + +@pytest.mark.parametrize("nrows,ncols", [(1, 10), (100, 5), (1000, 5)]) +def test_integration_cython_uncached(benchmark, nrows, ncols): + """Integration: Cython recv_results_rows with cache miss (fresh metadata each call).""" + binary_buf = _build_integration_binary_buf(nrows, ncols) + + def run(): + # Fresh metadata list each call forces a cache miss + fresh_meta = _build_column_metadata(ncols) + msg = _MockResultMessage() + _cython_recv_results_rows(msg, io.BytesIO(binary_buf), 4, {}, fresh_meta, None) + return msg.parsed_rows + + rows = benchmark(run) + assert len(rows) == nrows + assert len(rows[0]) == ncols + + +# --------------------------------------------------------------------------- +# Integration correctness tests: verify the actual Cython cache behavior +# --------------------------------------------------------------------------- + + +def test_integration_cython_cache_hit(): + """Cython cache returns same column_names/column_types on repeated calls.""" + col_meta = _build_column_metadata(5) + binary_buf = _build_integration_binary_buf(1, 5) + + msg1 = _MockResultMessage() + _cython_recv_results_rows(msg1, io.BytesIO(binary_buf), 4, {}, col_meta, None) + + msg2 = _MockResultMessage() + _cython_recv_results_rows(msg2, io.BytesIO(binary_buf), 4, {}, col_meta, None) + + # Same col_meta object -> cache hit -> same column_names/types objects + assert msg1.column_names is msg2.column_names + assert msg1.column_types is msg2.column_types + + +def test_integration_cython_cache_miss_different_metadata(): + """Different metadata list objects produce cache misses.""" + binary_buf = _build_integration_binary_buf(1, 5) + + col_meta_a = _build_column_metadata(5) + col_meta_b = _build_column_metadata(5) # same shape but different list object + + msg_a = _MockResultMessage() + _cython_recv_results_rows(msg_a, io.BytesIO(binary_buf), 4, {}, col_meta_a, None) + + msg_b = _MockResultMessage() + _cython_recv_results_rows(msg_b, io.BytesIO(binary_buf), 4, {}, col_meta_b, None) + + # Different list objects -> different id() -> cache miss + assert msg_a.column_names is not msg_b.column_names + # But values are equivalent + assert msg_a.column_names == msg_b.column_names + + +def test_integration_cython_cache_invalidation_protocol_version(): + """Changed protocol_version invalidates the Cython cache entry.""" + col_meta = _build_column_metadata(5) + binary_buf = _build_integration_binary_buf(1, 5) + + msg_v4 = _MockResultMessage() + _cython_recv_results_rows(msg_v4, io.BytesIO(binary_buf), 4, {}, col_meta, None) + + msg_v5 = _MockResultMessage() + _cython_recv_results_rows(msg_v5, io.BytesIO(binary_buf), 5, {}, col_meta, None) + + # Same col_meta but different protocol_version -> cache miss -> different objects + assert msg_v4.column_names is not msg_v5.column_names + + +def test_integration_cython_clear_cache(): + """clear_parse_desc_cache() invalidates cached entries.""" + col_meta = _build_column_metadata(5) + binary_buf = _build_integration_binary_buf(1, 5) + + msg1 = _MockResultMessage() + _cython_recv_results_rows(msg1, io.BytesIO(binary_buf), 4, {}, col_meta, None) + + clear_parse_desc_cache() + + msg2 = _MockResultMessage() + _cython_recv_results_rows(msg2, io.BytesIO(binary_buf), 4, {}, col_meta, None) + + # After cache clear, new ParseDesc is built -> different column_names object + assert msg1.column_names is not msg2.column_names + assert msg1.column_names == msg2.column_names + + +def test_integration_cython_parsed_rows_correctness(): + """Integration: verify parsed row data is correct through the Cython path.""" + ncols = 5 + nrows = 3 + col_meta = _build_column_metadata(ncols) + binary_buf = _build_integration_binary_buf(nrows, ncols, col_value=b"test_val") + + msg = _MockResultMessage() + _cython_recv_results_rows(msg, io.BytesIO(binary_buf), 4, {}, col_meta, None) + + assert len(msg.parsed_rows) == nrows + for row in msg.parsed_rows: + assert len(row) == ncols + for val in row: + assert val == "test_val" + assert msg.column_names == ["col_%d" % i for i in range(ncols)] + + +def test_integration_cython_cache_bounded_size(): + """Cython cache evicts entries when exceeding max size.""" + # Fill the cache with many distinct metadata lists + binary_buf = _build_integration_binary_buf(1, 5) + meta_lists = [_build_column_metadata(5) for _ in range(300)] + + for meta in meta_lists: + msg = _MockResultMessage() + _cython_recv_results_rows(msg, io.BytesIO(binary_buf), 4, {}, meta, None) + + # Cache should have been evicted at least once + cache_size = get_parse_desc_cache_size() + assert cache_size <= 256, ( + "Cache should be bounded to 256 entries, got %d" % cache_size + ) + + +# --------------------------------------------------------------------------- +# Isolated benchmarks: ParseDesc construction (reference, pure-Python) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("ncols", [5, 10, 20]) +def test_parse_desc_build_uncached(benchmark, ncols): + """Reference: build ParseDesc from scratch every time (original code path).""" + col_meta = _build_column_metadata(ncols) + + def run(): + return _build_uncached_parse_desc(col_meta, None, 4) + + result = benchmark(run) + desc, names, types = result + assert len(names) == ncols + assert len(desc.colnames) == ncols + + +@pytest.mark.parametrize("ncols", [5, 10, 20]) +def test_parse_desc_build_cached(benchmark, ncols, py_cache): + """Reference: cached second calls return cached ParseDesc (pure-Python replica).""" + col_meta = _build_column_metadata(ncols) + + # Warm the cache + _cached_parse_desc_py(col_meta, None, 4, py_cache) + + def run(): + return _cached_parse_desc_py(col_meta, None, 4, py_cache) + + result = benchmark(run) + desc, names, types = result + assert len(names) == ncols + assert len(desc.colnames) == ncols + + +# --------------------------------------------------------------------------- +# Isolated benchmarks: Full parse_rows pipeline (reference, pure-Python) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("nrows,ncols", [(1, 10), (100, 5), (1000, 5)]) +def test_full_pipeline_uncached(benchmark, nrows, ncols): + """Reference: build ParseDesc from scratch + parse rows (pure-Python desc).""" + col_meta = _build_column_metadata(ncols) + binary_buf = _build_binary_rows(nrows, ncols) + parser = ListParser() + + def run(): + desc, names, types = _build_uncached_parse_desc(col_meta, None, 4) + reader = BytesIOReader(binary_buf) + return parser.parse_rows(reader, desc) + + rows = benchmark(run) + assert len(rows) == nrows + assert len(rows[0]) == ncols + + +@pytest.mark.parametrize("nrows,ncols", [(1, 10), (100, 5), (1000, 5)]) +def test_full_pipeline_cached(benchmark, nrows, ncols, py_cache): + """Reference: cached ParseDesc + parse rows (pure-Python cache replica).""" + col_meta = _build_column_metadata(ncols) + binary_buf = _build_binary_rows(nrows, ncols) + parser = ListParser() + + # Warm cache + _cached_parse_desc_py(col_meta, None, 4, py_cache) + + def run(): + desc, names, types = _cached_parse_desc_py(col_meta, None, 4, py_cache) + reader = BytesIOReader(binary_buf) + return parser.parse_rows(reader, desc) + + rows = benchmark(run) + assert len(rows) == nrows + assert len(rows[0]) == ncols + + +# --------------------------------------------------------------------------- +# Isolated benchmarks: ParseDesc only (reference, varying column counts) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("ncols", [5, 10, 20, 50]) +def test_parse_desc_only_uncached(benchmark, ncols): + """Reference: isolated ParseDesc construction — uncached.""" + col_meta = _build_column_metadata(ncols) + + benchmark(_build_uncached_parse_desc, col_meta, None, 4) + + +@pytest.mark.parametrize("ncols", [5, 10, 20, 50]) +def test_parse_desc_only_cached(benchmark, ncols, py_cache): + """Reference: isolated ParseDesc construction — cached (pure-Python replica).""" + col_meta = _build_column_metadata(ncols) + _cached_parse_desc_py(col_meta, None, 4, py_cache) # warm + + benchmark(_cached_parse_desc_py, col_meta, None, 4, py_cache) + + +# --------------------------------------------------------------------------- +# Reference correctness tests (pure-Python replica) +# --------------------------------------------------------------------------- + + +def test_cached_same_result_as_uncached(py_cache): + """Verify pure-Python cached path produces identical results to uncached.""" + col_meta = _build_column_metadata(10) + + desc_u, names_u, types_u = _build_uncached_parse_desc(col_meta, None, 4) + desc_c, names_c, types_c = _cached_parse_desc_py(col_meta, None, 4, py_cache) + + assert names_u == names_c + assert types_u == types_c + assert len(desc_u.colnames) == len(desc_c.colnames) + assert desc_u.protocol_version == desc_c.protocol_version + + # Second call should be cache hit and return the same desc object + desc_c2, names_c2, types_c2 = _cached_parse_desc_py(col_meta, None, 4, py_cache) + assert desc_c2 is desc_c # same object from cache + + +def test_cache_invalidation_on_different_metadata(py_cache): + """Different column_metadata list should produce a new ParseDesc (pure-Python).""" + col_meta_a = _build_column_metadata(5) + col_meta_b = _build_column_metadata(10) + + desc_a, _, _ = _cached_parse_desc_py(col_meta_a, None, 4, py_cache) + desc_b, _, _ = _cached_parse_desc_py(col_meta_b, None, 4, py_cache) + + assert desc_a is not desc_b + assert len(desc_a.colnames) == 5 + assert len(desc_b.colnames) == 10 + + +def test_cache_invalidation_on_protocol_version_change(py_cache): + """Changed protocol_version should miss the cache (pure-Python).""" + col_meta = _build_column_metadata(5) + desc_v4, _, _ = _cached_parse_desc_py(col_meta, None, 4, py_cache) + desc_v5, _, _ = _cached_parse_desc_py(col_meta, None, 5, py_cache) + + assert desc_v4 is not desc_v5 + + +def test_clear_parse_desc_cache(): + """Verify the Cython cache can be cleared.""" + clear_parse_desc_cache() # should not raise + + +def test_full_pipeline_correctness(py_cache): + """End-to-end: parse rows with cached ParseDesc produces correct data (pure-Python).""" + ncols = 5 + nrows = 3 + col_meta = _build_column_metadata(ncols) + binary_buf = _build_binary_rows(nrows, ncols, col_value=b"test_val") + parser = ListParser() + + desc, names, types = _cached_parse_desc_py(col_meta, None, 4, py_cache) + reader = BytesIOReader(binary_buf) + rows = parser.parse_rows(reader, desc) + + assert len(rows) == nrows + for row in rows: + assert len(row) == ncols + for val in row: + assert val == "test_val" diff --git a/cassandra/row_parser.pyx b/cassandra/row_parser.pyx index 88277a4593..41182cd7e1 100644 --- a/cassandra/row_parser.pyx +++ b/cassandra/row_parser.pyx @@ -19,6 +19,92 @@ from cassandra.deserializers import make_deserializers include "ioutils.pyx" +# Maximum number of entries in the ParseDesc cache. Each entry corresponds +# to a distinct PreparedStatement's result_metadata. 256 should be more +# than enough for most applications; when exceeded the entire cache is +# cleared (simple eviction strategy that avoids per-entry bookkeeping). +cdef int _PARSE_DESC_CACHE_MAX_SIZE = 256 + +# Cache for ParseDesc objects keyed by id(column_metadata). +# For prepared statements, result_metadata is stored on PreparedStatement +# and reused across executions, so id() is stable. The cache is only +# populated on the prepared-statement path (where column_metadata comes from +# result_metadata); inline metadata from non-prepared queries is always fresh +# and must not be cached to avoid unbounded growth. +# +# Cache value: (column_metadata_ref, column_encryption_policy_ref, +# protocol_version, desc, column_names, column_types) +# +# Thread safety: individual dict get/set operations are atomic under +# CPython's GIL and under free-threaded builds (PEP 703, which uses +# per-object locks on dicts). The get-miss-build-set sequence is NOT +# transactionally atomic: two threads may both miss and both build a +# ParseDesc for the same key, with the last write winning. This is +# benign -- no data corruption occurs, only redundant construction work +# on the first concurrent miss. No additional locking is needed. +cdef dict _parse_desc_cache = {} + +cdef inline tuple _get_or_build_parse_desc(object column_metadata, object column_encryption_policy, int protocol_version): + """Look up or build a ParseDesc for the given column_metadata (cached path). + + Returns (desc, column_names, column_types). + """ + cdef object cache_key = id(column_metadata) + cdef object cached_or_none = _parse_desc_cache.get(cache_key) + + if cached_or_none is not None: + # Verify identity -- the object at this id must be the same list + # and session-level settings must match. + cached = cached_or_none + if (cached[0] is column_metadata and + cached[1] is column_encryption_policy and + cached[2] == protocol_version): + return (cached[3], cached[4], cached[5]) # hit: (desc, names, types) + + # Cache miss -- build everything + cdef list column_names = [md[2] for md in column_metadata] + cdef list column_types = [md[3] for md in column_metadata] + cdef object desc = ParseDesc( + column_names, column_types, column_encryption_policy, + [ColDesc(md[0], md[1], md[2]) for md in column_metadata], + make_deserializers(column_types), protocol_version) + + # Simple bounded eviction: if the cache is too large, clear it entirely. + # This avoids per-entry bookkeeping (LRU lists, timestamps) that would + # add overhead on the hot cache-hit path. In practice the cache holds + # one entry per prepared statement, so 256 is generous. + if len(_parse_desc_cache) >= _PARSE_DESC_CACHE_MAX_SIZE: + _parse_desc_cache.clear() + + _parse_desc_cache[cache_key] = (column_metadata, column_encryption_policy, + protocol_version, desc, column_names, column_types) + return (desc, column_names, column_types) + + +cdef inline tuple _build_parse_desc(object column_metadata, object column_encryption_policy, int protocol_version): + """Build a ParseDesc without caching (for non-prepared inline metadata). + + Returns (desc, column_names, column_types). + """ + cdef list column_names = [md[2] for md in column_metadata] + cdef list column_types = [md[3] for md in column_metadata] + cdef object desc = ParseDesc( + column_names, column_types, column_encryption_policy, + [ColDesc(md[0], md[1], md[2]) for md in column_metadata], + make_deserializers(column_types), protocol_version) + return (desc, column_names, column_types) + + +def clear_parse_desc_cache(): + """Clear the ParseDesc cache. Exposed for testing.""" + _parse_desc_cache.clear() + + +def get_parse_desc_cache_size(): + """Return the current number of entries in the ParseDesc cache. Exposed for testing.""" + return len(_parse_desc_cache) + + def make_recv_results_rows(ColumnParser colparser): def recv_results_rows(self, f, int protocol_version, user_type_map, result_metadata, column_encryption_policy): """ @@ -29,12 +115,18 @@ def make_recv_results_rows(ColumnParser colparser): column_metadata = self.column_metadata or result_metadata - self.column_names = [md[2] for md in column_metadata] - self.column_types = [md[3] for md in column_metadata] + # Only use the cache for prepared statements (self.column_metadata is + # None, so column_metadata comes from result_metadata which is a + # stable list stored on PreparedStatement). Inline metadata from + # non-prepared queries creates a fresh list every time and would + # cause unbounded cache growth. + if self.column_metadata is None and result_metadata is not None: + desc, self.column_names, self.column_types = _get_or_build_parse_desc( + column_metadata, column_encryption_policy, protocol_version) + else: + desc, self.column_names, self.column_types = _build_parse_desc( + column_metadata, column_encryption_policy, protocol_version) - desc = ParseDesc(self.column_names, self.column_types, column_encryption_policy, - [ColDesc(md[0], md[1], md[2]) for md in column_metadata], - make_deserializers(self.column_types), protocol_version) reader = BytesIOReader(f.read()) try: self.parsed_rows = colparser.parse_rows(reader, desc) From 203080ec4b72cc1e95a3aea66d9e7ca311fb94b2 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Fri, 20 Mar 2026 19:15:02 +0200 Subject: [PATCH 2/2] (improvement) row_parser: address PR review feedback - Fix copyright header in benchmark file (DataStax -> ScyllaDB) - Add pytest.importorskip guard for pytest-benchmark in benchmark file - Add unit tests for ParseDesc cache under tests/unit/cython: cache hit, miss, protocol version invalidation, clear, bounded eviction, correctness --- benchmarks/test_parse_desc_cache_benchmark.py | 9 +- tests/unit/cython/test_parse_desc_cache.py | 207 ++++++++++++++++++ 2 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 tests/unit/cython/test_parse_desc_cache.py diff --git a/benchmarks/test_parse_desc_cache_benchmark.py b/benchmarks/test_parse_desc_cache_benchmark.py index fdd391d266..d45e821be1 100644 --- a/benchmarks/test_parse_desc_cache_benchmark.py +++ b/benchmarks/test_parse_desc_cache_benchmark.py @@ -1,4 +1,4 @@ -# Copyright DataStax, Inc. +# Copyright ScyllaDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,6 +40,13 @@ import struct import pytest +# Skip the entire module when pytest-benchmark is not installed. +# The benchmark fixture is provided by the pytest-benchmark plugin which +# is not in the project's dev dependencies. This guard prevents +# "fixture 'benchmark' not found" errors when running bare `pytest` from +# the repo root. +pytest.importorskip("pytest_benchmark") + from cassandra import cqltypes from cassandra.policies import ColDesc from cassandra.parsing import ParseDesc diff --git a/tests/unit/cython/test_parse_desc_cache.py b/tests/unit/cython/test_parse_desc_cache.py new file mode 100644 index 0000000000..256d98656e --- /dev/null +++ b/tests/unit/cython/test_parse_desc_cache.py @@ -0,0 +1,207 @@ +# Copyright ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for the ParseDesc cache in row_parser.pyx. + +Validates cache hit/miss behavior, protocol_version invalidation, cache +clearing, and bounded eviction — all exercised through the actual Cython +_get_or_build_parse_desc function via make_recv_results_rows(). +""" + +import io +import struct +import unittest + +from tests.unit.cython.utils import cythontest + +try: + from cassandra.row_parser import ( + clear_parse_desc_cache, + get_parse_desc_cache_size, + make_recv_results_rows, + ) + from cassandra.obj_parser import ListParser + + _HAS_ROW_PARSER = True + _recv_results_rows = make_recv_results_rows(ListParser()) +except ImportError: + _HAS_ROW_PARSER = False + _recv_results_rows = None + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _build_column_metadata(ncols): + """Build a column_metadata list like the driver produces.""" + from cassandra import cqltypes + + return [("ks", "tbl", "col_%d" % i, cqltypes.UTF8Type) for i in range(ncols)] + + +# NO_METADATA_FLAG as defined in ResultMessage +_NO_METADATA_FLAG = 0x0004 + + +class _MockResultMessage: + """Minimal mock of ResultMessage for the prepared-statement path.""" + + column_metadata = None + column_names = None + column_types = None + parsed_rows = None + paging_state = None + continuous_paging_seq = None + continuous_paging_last = None + result_metadata_id = None + + def recv_results_metadata(self, f, user_type_map): + """Simulate the prepared-statement path (NO_METADATA_FLAG is set).""" + _flags = struct.unpack(">i", f.read(4))[0] + _colcount = struct.unpack(">i", f.read(4))[0] + + +def _build_binary_buf(nrows, ncols, col_value=b"hello world"): + """Build a full binary buffer for the prepared-statement path.""" + parts = [] + parts.append(struct.pack(">i", _NO_METADATA_FLAG)) + parts.append(struct.pack(">i", ncols)) + parts.append(struct.pack(">i", nrows)) + col_cell = struct.pack(">i", len(col_value)) + col_value + row_data = col_cell * ncols + for _ in range(nrows): + parts.append(row_data) + return b"".join(parts) + + +def _recv(binary_buf, col_meta, protocol_version=4, ce_policy=None): + """Run recv_results_rows and return the MockResultMessage.""" + msg = _MockResultMessage() + _recv_results_rows( + msg, io.BytesIO(binary_buf), protocol_version, {}, col_meta, ce_policy + ) + return msg + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class ParseDescCacheTest(unittest.TestCase): + """Tests for the Cython ParseDesc cache in row_parser.pyx.""" + + def setUp(self): + if _HAS_ROW_PARSER: + clear_parse_desc_cache() + + def tearDown(self): + if _HAS_ROW_PARSER: + clear_parse_desc_cache() + + @cythontest + def test_cache_hit_returns_same_objects(self): + """Repeated calls with the same col_meta object should return + identical column_names and column_types objects (cache hit).""" + col_meta = _build_column_metadata(5) + buf = _build_binary_buf(1, 5) + + msg1 = _recv(buf, col_meta) + msg2 = _recv(buf, col_meta) + + self.assertIs(msg1.column_names, msg2.column_names) + self.assertIs(msg1.column_types, msg2.column_types) + + @cythontest + def test_cache_miss_different_metadata(self): + """Different metadata list objects should produce cache misses.""" + buf = _build_binary_buf(1, 5) + col_meta_a = _build_column_metadata(5) + col_meta_b = _build_column_metadata(5) + + msg_a = _recv(buf, col_meta_a) + msg_b = _recv(buf, col_meta_b) + + self.assertIsNot(msg_a.column_names, msg_b.column_names) + self.assertEqual(msg_a.column_names, msg_b.column_names) + + @cythontest + def test_protocol_version_invalidates_cache(self): + """Changed protocol_version should invalidate the cache entry.""" + col_meta = _build_column_metadata(5) + buf = _build_binary_buf(1, 5) + + msg_v4 = _recv(buf, col_meta, protocol_version=4) + msg_v5 = _recv(buf, col_meta, protocol_version=5) + + self.assertIsNot(msg_v4.column_names, msg_v5.column_names) + + @cythontest + def test_clear_cache_invalidates_entries(self): + """clear_parse_desc_cache() should invalidate cached entries.""" + col_meta = _build_column_metadata(5) + buf = _build_binary_buf(1, 5) + + msg1 = _recv(buf, col_meta) + clear_parse_desc_cache() + msg2 = _recv(buf, col_meta) + + self.assertIsNot(msg1.column_names, msg2.column_names) + self.assertEqual(msg1.column_names, msg2.column_names) + + @cythontest + def test_cache_bounded_size(self): + """Cache should evict entries when exceeding the max size (256).""" + buf = _build_binary_buf(1, 5) + meta_lists = [_build_column_metadata(5) for _ in range(300)] + + for meta in meta_lists: + _recv(buf, meta) + + cache_size = get_parse_desc_cache_size() + self.assertLessEqual( + cache_size, + 256, + "Cache should be bounded to 256 entries, got %d" % cache_size, + ) + + @cythontest + def test_parsed_rows_correctness(self): + """Verify parsed row data is correct through the cached path.""" + ncols, nrows = 5, 3 + col_meta = _build_column_metadata(ncols) + buf = _build_binary_buf(nrows, ncols, col_value=b"test_val") + + msg = _recv(buf, col_meta) + + self.assertEqual(len(msg.parsed_rows), nrows) + for row in msg.parsed_rows: + self.assertEqual(len(row), ncols) + for val in row: + self.assertEqual(val, "test_val") + self.assertEqual(msg.column_names, ["col_%d" % i for i in range(ncols)]) + + @cythontest + def test_get_cache_size(self): + """get_parse_desc_cache_size() reports correct count.""" + self.assertEqual(get_parse_desc_cache_size(), 0) + + col_meta = _build_column_metadata(5) + buf = _build_binary_buf(1, 5) + _recv(buf, col_meta) + + self.assertEqual(get_parse_desc_cache_size(), 1)