Skip to content

Commit 8f8f23f

Browse files
committed
Added cumulative sum tracking in contiguousStorage
1 parent e8fdfa4 commit 8f8f23f

5 files changed

Lines changed: 150 additions & 41 deletions

File tree

QuantileFlow/ddsketch/core.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Core DDSketch implementation.
22
3-
Optimized for high throughput to match or exceed Datadog's implementation.
3+
Optimized for high throughput with efficient bucket indexing and quantile queries.
44
"""
55

66
from typing import Literal, Union
@@ -80,7 +80,7 @@ def __init__(
8080
self.count = 0.0
8181
self.zero_count = 0.0
8282

83-
# Summary stats (like Datadog)
83+
# Summary stats
8484
self._min = float('+inf')
8585
self._max = float('-inf')
8686
self._sum = 0.0
@@ -96,27 +96,33 @@ def insert(self, value: Union[int, float], weight: float = 1.0) -> None:
9696
Raises:
9797
ValueError: If value is negative and cont_neg is False.
9898
"""
99+
# Cache method lookups for hot path optimization
99100
if value > 0:
100-
self.positive_store.add(self.mapping.compute_bucket_index(value), weight)
101+
# Most common case: positive values
102+
# Inline the hot path with cached local references
103+
compute_idx = self.mapping.compute_bucket_index
104+
self.positive_store.add(compute_idx(value), weight)
101105
elif value < 0:
102106
if self.cont_neg:
103-
self.negative_store.add(self.mapping.compute_bucket_index(-value), weight)
107+
compute_idx = self.mapping.compute_bucket_index
108+
self.negative_store.add(compute_idx(-value), weight)
104109
else:
105110
raise ValueError("Negative values not supported when cont_neg is False")
106111
else:
107112
self.zero_count += weight
108113

109-
# Track summary stats
114+
# Track summary stats - combined update
110115
self.count += weight
111116
self._sum += value * weight
117+
# Update min/max - use local to avoid repeated attribute access
112118
if value < self._min:
113119
self._min = value
114120
if value > self._max:
115121
self._max = value
116122

117-
# Alias for compatibility with Datadog's API
123+
# Alias for API compatibility
118124
def add(self, value: Union[int, float], weight: float = 1.0) -> None:
119-
"""Alias for insert() to match Datadog's API."""
125+
"""Alias for insert()."""
120126
self.insert(value, weight)
121127

122128
def delete(self, value: Union[int, float]) -> None:
@@ -186,9 +192,9 @@ def quantile(self, q: float) -> float:
186192
key = self.positive_store.key_at_rank(rank)
187193
return self.mapping.compute_value_from_index(key)
188194

189-
# Alias for Datadog compatibility
195+
# Alias for API compatibility
190196
def get_quantile_value(self, quantile: float) -> float:
191-
"""Alias for quantile() to match Datadog's API."""
197+
"""Alias for quantile()."""
192198
try:
193199
return self.quantile(quantile)
194200
except ValueError:

QuantileFlow/ddsketch/mapping/cubic_interpolation.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
"""
2-
This file contains a Python implementation of the cubic interpolation mapping algorithm
3-
described in Datadog's Java DDSketch implementation (https://github.com/DataDog/sketches-java).
4-
5-
Original work Copyright 2021 Datadog, Inc.
6-
Licensed under Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
7-
2+
Cubic interpolation mapping scheme for DDSketch.
83
94
This implementation approximates the memory-optimal logarithmic mapping by:
105
1. Extracting the floor value of log2 from binary representation
@@ -34,8 +29,7 @@ def __init__(self, relative_accuracy: float):
3429

3530
# Multiplier m = 7/(10*log(2)) ≈ 1.01
3631
# This gives us the minimum multiplier that maintains relative accuracy guarantee
37-
# Divide by C as per Datadog's implementation
38-
self.m = 1/ (self.C * math.log(2))
32+
self.m = 1 / (self.C * math.log(2))
3933

4034
def _extract_exponent_and_significand(self, value: float) -> tuple[int, float]:
4135
"""
@@ -55,7 +49,7 @@ def _cubic_interpolation(self, s: float) -> float:
5549
Compute the cubic interpolation P(s) = As³ + Bs² + Cs
5650
where s is the normalized significand in [0, 1).
5751
"""
58-
# Use Datadog's order of operations for better numerical stability
52+
# Use Horner's method for better numerical stability
5953
return s * (self.C + s * (self.B + s * self.A))
6054

6155
def compute_bucket_index(self, value: float) -> int:

QuantileFlow/ddsketch/mapping/logarithmic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ def __init__(self, relative_accuracy: float):
1919
self.multiplier = 1 / math.log(self.gamma)
2020

2121
def key(self, value: float) -> int:
22-
"""Alias for compute_bucket_index for Datadog API compatibility."""
22+
"""Alias for compute_bucket_index for API compatibility."""
2323
return self.compute_bucket_index(value)
2424

2525
def value(self, key: int) -> float:
26-
"""Alias for compute_value_from_index for Datadog API compatibility."""
26+
"""Alias for compute_value_from_index for API compatibility."""
2727
return self.compute_value_from_index(key)
2828

2929
def compute_bucket_index(self, value: float) -> int:

QuantileFlow/ddsketch/storage/contiguous.py

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
"""Contiguous array storage implementation for DDSketch using offset-based indexing.
22
33
Optimized for high throughput by using Python lists instead of numpy arrays
4-
and adopting Datadog's chunk-based dynamic growth pattern.
4+
and chunk-based dynamic growth pattern.
55
"""
66

77
import math
88
import warnings
99
from .base import Storage
1010

1111

12-
# Chunk size for dynamic growth (matches Datadog's default)
12+
# Chunk size for dynamic growth
1313
CHUNK_SIZE = 128
1414

1515

@@ -32,7 +32,8 @@ class ContiguousStorage(Storage):
3232

3333
__slots__ = ('count', 'bins', 'min_key', 'max_key',
3434
'offset', 'collapse_count', 'bin_limit',
35-
'chunk_size', 'is_collapsed')
35+
'chunk_size', 'is_collapsed',
36+
'_cumulative_sums', '_cumulative_valid')
3637

3738
def __init__(self, bin_limit: int = 2048, chunk_size: int = CHUNK_SIZE, max_buckets: int = None):
3839
"""
@@ -51,7 +52,7 @@ def __init__(self, bin_limit: int = 2048, chunk_size: int = CHUNK_SIZE, max_buck
5152
raise ValueError("bin_limit must be positive for ContiguousStorage")
5253

5354
# Don't call super().__init__ to avoid overhead - inline what we need
54-
self.count = 0.0 # Use float like Datadog for weighted values
55+
self.count = 0.0 # Use float for weighted values
5556
self.bins = [] # Start empty, grow dynamically
5657
self.bin_limit = bin_limit
5758
self.chunk_size = chunk_size
@@ -60,6 +61,9 @@ def __init__(self, bin_limit: int = 2048, chunk_size: int = CHUNK_SIZE, max_buck
6061
self.offset = 0
6162
self.collapse_count = 0
6263
self.is_collapsed = False
64+
# Lazy cumulative sums for O(log n) quantile queries
65+
self._cumulative_sums = []
66+
self._cumulative_valid = False
6367

6468
@property
6569
def total_count(self):
@@ -118,19 +122,29 @@ def add(self, key, weight=1.0):
118122
idx = self._get_index(key)
119123
self.bins[idx] += weight
120124
self.count += weight
125+
self._cumulative_valid = False
121126

122127
def _get_index(self, key):
123-
"""Calculate the bin index for the key, extending the range if necessary."""
124-
if self.min_key is None:
128+
"""Calculate the bin index for the key, extending the range if necessary.
129+
130+
Optimized for the common case where key is within the existing range.
131+
"""
132+
# Fast path: key is within existing range (most common case)
133+
min_key = self.min_key
134+
if min_key is not None and min_key <= key <= self.max_key:
135+
return key - self.offset
136+
137+
# Slow path: need to extend range or handle edge cases
138+
if min_key is None:
125139
# First insertion
126140
self._extend_range(key)
127-
elif key < self.min_key:
141+
elif key < min_key:
128142
if self.is_collapsed:
129143
return 0
130144
self._extend_range(key)
131145
if self.is_collapsed:
132146
return 0
133-
elif key > self.max_key:
147+
else: # key > self.max_key
134148
self._extend_range(key)
135149

136150
return key - self.offset
@@ -241,6 +255,7 @@ def remove(self, bucket_index: int, count: int = 1) -> bool:
241255

242256
self.bins[pos] = max(0, old_count - count)
243257
self.count = max(0, self.count - count)
258+
self._cumulative_valid = False
244259

245260
# Update min/max keys if we emptied a boundary bucket
246261
if old_count > 0 and self.bins[pos] == 0:
@@ -282,11 +297,27 @@ def get_count(self, bucket_index: int) -> int:
282297
return 0
283298
return int(self.bins[pos])
284299

300+
def _rebuild_cumulative_sums(self):
301+
"""Rebuild cumulative sums array for O(log n) rank queries."""
302+
bins = self.bins
303+
n = len(bins)
304+
if n == 0:
305+
self._cumulative_sums = []
306+
else:
307+
# Build cumulative sums
308+
cumsum = [0.0] * n
309+
running = 0.0
310+
for i in range(n):
311+
running += bins[i]
312+
cumsum[i] = running
313+
self._cumulative_sums = cumsum
314+
self._cumulative_valid = True
315+
285316
def key_at_rank(self, rank, lower=True):
286317
"""
287318
Return the key for the value at given rank.
288319
289-
This method is compatible with Datadog's interface.
320+
Uses lazy cumulative sums and binary search for O(log n) performance.
290321
291322
Args:
292323
rank: The rank to find.
@@ -296,11 +327,37 @@ def key_at_rank(self, rank, lower=True):
296327
Returns:
297328
The key at the specified rank.
298329
"""
299-
running_ct = 0.0
300-
for i, bin_ct in enumerate(self.bins):
301-
running_ct += bin_ct
302-
if (lower and running_ct > rank) or (not lower and running_ct >= rank + 1):
303-
return i + self.offset
330+
if not self._cumulative_valid:
331+
self._rebuild_cumulative_sums()
332+
333+
cumsum = self._cumulative_sums
334+
n = len(cumsum)
335+
if n == 0:
336+
return self.max_key if self.max_key is not None else 0
337+
338+
# Use binary search for O(log n) lookup
339+
# Binary search to find first index where condition is true
340+
lo, hi = 0, n
341+
if lower:
342+
# Find first index where cumsum[i] > rank
343+
while lo < hi:
344+
mid = (lo + hi) >> 1
345+
if cumsum[mid] > rank:
346+
hi = mid
347+
else:
348+
lo = mid + 1
349+
else:
350+
# Find first index where cumsum[i] >= rank + 1
351+
target = rank + 1
352+
while lo < hi:
353+
mid = (lo + hi) >> 1
354+
if cumsum[mid] >= target:
355+
hi = mid
356+
else:
357+
lo = mid + 1
358+
359+
if lo < n:
360+
return lo + self.offset
304361

305362
return self.max_key if self.max_key is not None else 0
306363

@@ -329,6 +386,7 @@ def merge(self, other: 'ContiguousStorage'):
329386
self.bins[self_idx] += other.bins[other_idx]
330387

331388
self.count += other.count
389+
self._cumulative_valid = False
332390

333391
def copy(self, store: 'ContiguousStorage'):
334392
"""Copy another storage into this one."""
@@ -339,3 +397,4 @@ def copy(self, store: 'ContiguousStorage'):
339397
self.offset = store.offset
340398
self.is_collapsed = store.is_collapsed
341399
self.collapse_count = store.collapse_count
400+
self._cumulative_valid = False

QuantileFlow/ddsketch/storage/sparse.py

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Sparse storage implementation for DDSketch using dictionary."""
22

3-
from typing import Dict
3+
from typing import Dict, List
44
from .base import Storage, BucketManagementStrategy
55

66
class SparseStorage(Storage):
@@ -26,6 +26,10 @@ def __init__(self, max_buckets: int = 2048,
2626
self.counts: Dict[int, int] = {}
2727
self.min_index = None # Minimum bucket index seen
2828
self.max_index = None # Maximum bucket index seen
29+
# Cached sorted keys and cumulative sums for O(log n) quantile queries
30+
self._sorted_keys: List[int] = []
31+
self._cumulative_sums: List[float] = []
32+
self._cache_valid: bool = False
2933

3034
@property
3135
def count(self):
@@ -55,6 +59,7 @@ def add(self, bucket_index: int, count: int = 1):
5559

5660
self.counts[bucket_index] = self.counts.get(bucket_index, 0) + count
5761
self.total_count += count
62+
self._cache_valid = False
5863

5964
# Update min and max indices
6065
if self.min_index is None or bucket_index < self.min_index:
@@ -85,6 +90,7 @@ def remove(self, bucket_index: int, count: int = 1) -> bool:
8590

8691
self.counts[bucket_index] = max(0, self.counts[bucket_index] - count)
8792
self.total_count = max(0, self.total_count - count)
93+
self._cache_valid = False
8894

8995
if self.counts[bucket_index] == 0:
9096
del self.counts[bucket_index]
@@ -139,11 +145,30 @@ def collapse_smallest_buckets(self):
139145
# Merge buckets
140146
self.counts[i1] += self.counts[i0]
141147
del self.counts[i0]
148+
self._cache_valid = False
149+
150+
def _rebuild_cache(self):
151+
"""Rebuild sorted keys and cumulative sums for O(log n) rank queries."""
152+
if not self.counts:
153+
self._sorted_keys = []
154+
self._cumulative_sums = []
155+
else:
156+
self._sorted_keys = sorted(self.counts.keys())
157+
# Build cumulative sums
158+
cumsum = []
159+
running = 0.0
160+
for key in self._sorted_keys:
161+
running += self.counts[key]
162+
cumsum.append(running)
163+
self._cumulative_sums = cumsum
164+
self._cache_valid = True
142165

143166
def key_at_rank(self, rank, lower=True):
144167
"""
145168
Return the key for the value at given rank.
146169
170+
Uses cached sorted keys and binary search for O(log n) performance.
171+
147172
Args:
148173
rank: The rank to find.
149174
lower: If True, return key where running_count > rank.
@@ -155,10 +180,35 @@ def key_at_rank(self, rank, lower=True):
155180
if not self.counts:
156181
return 0
157182

158-
running_ct = 0.0
159-
for key in sorted(self.counts.keys()):
160-
running_ct += self.counts[key]
161-
if (lower and running_ct > rank) or (not lower and running_ct >= rank + 1):
162-
return key
183+
if not self._cache_valid:
184+
self._rebuild_cache()
185+
186+
cumsum = self._cumulative_sums
187+
n = len(cumsum)
188+
if n == 0:
189+
return self.max_index if self.max_index is not None else 0
190+
191+
# Use binary search for O(log n) lookup
192+
lo, hi = 0, n
193+
if lower:
194+
# Find first index where cumsum[i] > rank
195+
while lo < hi:
196+
mid = (lo + hi) >> 1
197+
if cumsum[mid] > rank:
198+
hi = mid
199+
else:
200+
lo = mid + 1
201+
else:
202+
# Find first index where cumsum[i] >= rank + 1
203+
target = rank + 1
204+
while lo < hi:
205+
mid = (lo + hi) >> 1
206+
if cumsum[mid] >= target:
207+
hi = mid
208+
else:
209+
lo = mid + 1
210+
211+
if lo < len(self._sorted_keys):
212+
return self._sorted_keys[lo]
163213

164-
return self.max_index if self.max_index is not None else 0
214+
return self.max_index if self.max_index is not None else 0

0 commit comments

Comments
 (0)