Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
d2175f1
use replaceable channel wrapper
daniel-sanche Jul 25, 2025
5e107fc
got unit tests working
daniel-sanche Jul 26, 2025
c4a97e1
put back in cache invalidation
daniel-sanche Jul 26, 2025
e71b1d5
added wrapped multicallables to avoid cache invalidation
daniel-sanche Jul 26, 2025
b81a9be
added crosssync, moved close logic back to client
daniel-sanche Jul 29, 2025
a1dffb5
generated sync code
daniel-sanche Jul 29, 2025
e3ec02b
got tests running
daniel-sanche Jul 29, 2025
4e13783
fixed tests
daniel-sanche Jul 29, 2025
7d90a04
remove extra wrapper; added invalidate_stubs helper
daniel-sanche Jul 29, 2025
26cd601
fixed lint
daniel-sanche Jul 29, 2025
375332f
fixed lint
daniel-sanche Jul 29, 2025
428d75a
renamed replaceablechannel to swappablechannel
daniel-sanche Jul 29, 2025
4b39bc5
added tests
daniel-sanche Jul 29, 2025
3f090c2
added docstrings
daniel-sanche Jul 29, 2025
883ceab
Merge branch 'main' into refactor_refresh
daniel-sanche Jul 29, 2025
04c762a
initial commit
daniel-sanche Jul 29, 2025
29dff4d
added back interceptor
daniel-sanche Jul 29, 2025
e4f8238
added metrics to client
daniel-sanche Jul 29, 2025
fcb062e
fixed lint
daniel-sanche Aug 1, 2025
ac8dbe4
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 1, 2025
d155f8a
set up channel interceptions
daniel-sanche Aug 2, 2025
9fece96
added TrackedBackoffGenerator
daniel-sanche Aug 2, 2025
aec2577
fixed lint
daniel-sanche Aug 2, 2025
ec4e847
fixed import
daniel-sanche Aug 2, 2025
8f99e4e
added operation.cancel
daniel-sanche Aug 6, 2025
f8e6603
added operation cancelled to interceptor
daniel-sanche Aug 6, 2025
f5e057e
gave each operation a uuid
daniel-sanche Aug 6, 2025
8c397bb
return attempt metric on new attempt
daniel-sanche Aug 7, 2025
2c34198
use standard context manager
daniel-sanche Aug 7, 2025
9bd1e07
use default backoff generator
daniel-sanche Aug 7, 2025
96d1355
require backoff; refactor check
daniel-sanche Aug 7, 2025
de5d07b
fixed context manager naming; lint
daniel-sanche Aug 7, 2025
d73f379
moved first_response_latency to operation
daniel-sanche Aug 7, 2025
a2070f9
Merge branch 'main' into refactor_refresh
daniel-sanche Aug 8, 2025
4a4f80a
fixed import
daniel-sanche Aug 8, 2025
5ea9f0e
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 11, 2025
708a35a
fixed broken unit tests
daniel-sanche Aug 11, 2025
67c08fd
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 11, 2025
5ae7acc
added set_next to TrackedBackoffGenerator
daniel-sanche Aug 11, 2025
cb32296
added assertions to test_client
daniel-sanche Aug 12, 2025
f07e765
added new test metrics interceptor file
daniel-sanche Aug 26, 2025
a34c01e
first round of tests
daniel-sanche Aug 26, 2025
84f61ee
added metadata capture for failed rpcs
daniel-sanche Aug 26, 2025
d4ae637
added test for starting attempts
daniel-sanche Aug 26, 2025
1fbcadd
added sync tests
daniel-sanche Aug 26, 2025
edacd04
got tests passing
daniel-sanche Aug 26, 2025
c628d21
removed helper class
daniel-sanche Aug 26, 2025
6d585ec
refactoring
daniel-sanche Aug 26, 2025
01e6b36
refactored interceptor
daniel-sanche Aug 26, 2025
05fe577
added unit tests for interceptor
daniel-sanche Aug 26, 2025
4871abd
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 26, 2025
b6eac6c
fixed lint
daniel-sanche Aug 26, 2025
07b3295
feat: added metrics interceptor
daniel-sanche Sep 9, 2025
e3ac131
pulled out operation logic
daniel-sanche Sep 9, 2025
557b54a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2025
3306ad7
fixed missing convert
daniel-sanche Sep 9, 2025
65f15de
updated system test
daniel-sanche Sep 9, 2025
16c5b6a
fixed lint
daniel-sanche Sep 9, 2025
00cc52f
fixed annotation
daniel-sanche Sep 9, 2025
019a8c2
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 9, 2025
4ccfdab
removed duplicate import
daniel-sanche Sep 9, 2025
bd9ab70
added more tests
daniel-sanche Sep 9, 2025
50b3e48
remove operation metadata key
daniel-sanche Sep 9, 2025
2b35127
assign metadata directly
daniel-sanche Sep 9, 2025
9cbda99
added test
daniel-sanche Sep 9, 2025
73f4b3c
replace details mocks with real type
daniel-sanche Sep 9, 2025
d5e012d
strip operation id from metadata before request
daniel-sanche Sep 9, 2025
486068b
added try; generated sync
daniel-sanche Sep 9, 2025
bb00b8b
re-generated sync classes; removed test
daniel-sanche Sep 15, 2025
c89f6d4
loosen test
daniel-sanche Sep 15, 2025
bebeb70
use contextvars
daniel-sanche Sep 30, 2025
5ba2bbe
pulled in improvements to data model
daniel-sanche Sep 30, 2025
4098fd9
removed cancel from spec
daniel-sanche Sep 30, 2025
144d75e
broke out streaming wrapper into static function
daniel-sanche Sep 30, 2025
e8785ac
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
c1cc24d
fixed tests
daniel-sanche Sep 30, 2025
c433f3c
fixed lint
daniel-sanche Sep 30, 2025
85d4cf0
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
ed9d3cf
fixed lint
daniel-sanche Oct 1, 2025
bc6036e
added close to metric spec
daniel-sanche Oct 2, 2025
18ec330
fixed lint
daniel-sanche Oct 2, 2025
f1be54a
added test
daniel-sanche Oct 3, 2025
9a33d86
Merge branch 'main' into csm_1_data_model
daniel-sanche Nov 22, 2025
e94e143
renamed methods
daniel-sanche Nov 22, 2025
596e54e
improved from_context
daniel-sanche Nov 22, 2025
16c8ed8
changed buffer time
daniel-sanche Nov 26, 2025
4887830
ran blacken
daniel-sanche Nov 26, 2025
5c80c79
use time mocks
daniel-sanche Nov 26, 2025
8de4875
fixed lint
daniel-sanche Nov 26, 2025
6a4d742
loosened test tolerances
daniel-sanche Nov 26, 2025
a474560
removed metrics superclass from interceptor
daniel-sanche Nov 26, 2025
5390813
fixed lint
daniel-sanche Nov 26, 2025
3e0d134
improved comments
daniel-sanche Nov 26, 2025
6b48242
moved interceptor into _metrics
daniel-sanche Nov 26, 2025
dcf3d0a
pulled out tracking into new file
daniel-sanche Nov 26, 2025
c94e4ff
simplified wrapper method
daniel-sanche Nov 26, 2025
3a87a35
Revert "moved interceptor into _metrics"
daniel-sanche Nov 26, 2025
b5c361b
moved tracked retry out of autogen folder
daniel-sanche Nov 26, 2025
1b0b857
fixed typing
daniel-sanche Nov 26, 2025
ac315d0
added tests
daniel-sanche Nov 27, 2025
87e78d1
removed unneeded imports
daniel-sanche Nov 27, 2025
54b7208
ran blacken
daniel-sanche Nov 27, 2025
819e1ae
Moved retry trackers into own file
daniel-sanche Nov 27, 2025
22eb2e1
added docstring
daniel-sanche Nov 27, 2025
0ec8d14
fixed type
daniel-sanche Nov 27, 2025
fa25c2b
import annotations
daniel-sanche Dec 3, 2025
f9ac548
Update google/cloud/bigtable/data/_metrics/data_model.py
daniel-sanche Dec 19, 2025
284c8a6
addressed PR comments
daniel-sanche Jan 6, 2026
16f7d57
improved state machine
daniel-sanche Jan 6, 2026
d167487
added negative check
daniel-sanche Jan 6, 2026
c408e14
removed uuid
daniel-sanche Jan 6, 2026
a70824b
fixed lint
daniel-sanche Jan 6, 2026
4ceab60
added cryptography to prerelease deps
daniel-sanche Jan 6, 2026
78c640b
updated state diagram
daniel-sanche Jan 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,23 @@ class OperationType(Enum):

Comment thread
mutianf marked this conversation as resolved.

class OperationState(Enum):
"""Enum for the state of the active operation."""
"""Enum for the state of the active operation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in end_with_status, it seems like CREATED can transition to COMPLETED, which I think it's valid if the rpc timed out before attempt is started (but probably rare), I think we should update the comment here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I added it to the diagram

┌───────────┐
│ CREATED │
└─────┬─────┘
┌▶ ACTIVE_ATTEMPT ───┐
│ │ │
│ ▼ │
└─ BETWEEN_ATTEMPTS │
│ │
▼ ▼
┌───────────┐ │
│ COMPLETED │ ◀─────┘
└───────────┘
"""

CREATED = 0
ACTIVE_ATTEMPT = 1
Expand Down Expand Up @@ -138,6 +154,7 @@ class ActiveOperationMetric:

op_type: OperationType
uuid: str = field(default_factory=lambda: str(uuid.uuid4()))
state: OperationState = OperationState.CREATED
# create a default backoff generator, initialized with standard default backoff values
backoff_generator: TrackedBackoffGenerator = field(
default_factory=lambda: TrackedBackoffGenerator(
Expand All @@ -151,7 +168,6 @@ class ActiveOperationMetric:
zone: str | None = None
completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list)
is_streaming: bool = False # only True for read_rows operations
was_completed: bool = False
handlers: list[MetricsHandler] = field(default_factory=list)
# the time it takes to recieve the first response from the server, in nanoseconds
# attached by interceptor
Expand Down Expand Up @@ -186,18 +202,6 @@ def from_context(cls) -> ActiveOperationMetric | None:
return None
return op

@property
def state(self) -> OperationState:
if self.was_completed:
return OperationState.COMPLETED
elif self.active_attempt is None:
if self.completed_attempts:
return OperationState.BETWEEN_ATTEMPTS
else:
return OperationState.CREATED
else:
return OperationState.ACTIVE_ATTEMPT

def __post_init__(self):
"""
Save new instances to contextvars on init
Expand All @@ -209,7 +213,8 @@ def start(self) -> None:
Optionally called to mark the start of the operation. If not called,
the operation will be started at initialization.

Assumes operation is in CREATED state.
StartState: CREATED
EndState: CREATED
"""
if self.state != OperationState.CREATED:
return self._handle_error(INVALID_STATE_ERROR.format("start", self.state))
Expand All @@ -221,7 +226,8 @@ def start_attempt(self) -> ActiveAttemptMetric | None:
"""
Called to initiate a new attempt for the operation.

Assumes operation is in either CREATED or BETWEEN_ATTEMPTS states
StartState: CREATED | BETWEEN_ATTEMPTS
EndState: ACTIVE_ATTEMPT
"""
if (
self.state != OperationState.BETWEEN_ATTEMPTS
Expand All @@ -244,6 +250,7 @@ def start_attempt(self) -> ActiveAttemptMetric | None:
backoff_ns = 0

self.active_attempt = ActiveAttemptMetric(backoff_before_attempt_ns=backoff_ns)
self.state = OperationState.ACTIVE_ATTEMPT
return self.active_attempt

def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
Expand All @@ -252,7 +259,8 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:

If not called, default values for the metadata will be used.

Assumes operation is in ACTIVE_ATTEMPT state.
StartState: ACTIVE_ATTEMPT
EndState: ACTIVE_ATTEMPT

Args:
- metadata: the metadata as extracted from the grpc call
Expand Down Expand Up @@ -312,7 +320,8 @@ def end_attempt_with_status(self, status: StatusCode | BaseException) -> None:
be attempted, `end_with_status` or `end_with_success` should be used
to finalize the operation along with the attempt.

Assumes operation is in ACTIVE_ATTEMPT state.
StartState: ACTIVE_ATTEMPT
EndState: BETWEEN_ATTEMPTS

Args:
- status: The status of the attempt.
Expand All @@ -332,6 +341,7 @@ def end_attempt_with_status(self, status: StatusCode | BaseException) -> None:
)
self.completed_attempts.append(complete_attempt)
self.active_attempt = None
self.state = OperationState.BETWEEN_ATTEMPTS
for handler in self.handlers:
handler.on_attempt_complete(complete_attempt, self)

Expand All @@ -340,7 +350,8 @@ def end_with_status(self, status: StatusCode | BaseException) -> None:
Called to mark the end of the operation. If there is an active attempt,
end_attempt_with_status will be called with the same status.

Assumes operation is not already in COMPLETED state.
StartState: CREATED | ACTIVE_ATTEMPT | BETWEEN_ATTEMPTS
EndState: COMPLETED

Causes on_operation_completed to be called for each registered handler.

Expand All @@ -356,7 +367,7 @@ def end_with_status(self, status: StatusCode | BaseException) -> None:
)
if self.state == OperationState.ACTIVE_ATTEMPT:
self.end_attempt_with_status(final_status)
self.was_completed = True
self.state = OperationState.COMPLETED
finalized = CompletedOperationMetric(
op_type=self.op_type,
uuid=self.uuid,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a uuid per operation? what is it used for?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. In an older version of this code, I used a map to track active operations, with the uuid as the look-up key. But I changed it to use contextvars, which made everything a lot cleaner. So I think this can be removed now

Expand All @@ -376,7 +387,8 @@ def end_with_success(self):
"""
Called to mark the end of the operation with a successful status.

Assumes operation is not already in COMPLETED state.
StartState: CREATED | ACTIVE_ATTEMPT | BETWEEN_ATTEMPTS
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only valid state transition for the success case is from BETWEEN_ATTEMPTS -> COMPLETE? I think the state can only transition from CREATED -> COMPLETED if the rpc timed out before the attempt even started? and can only transition from ACTIVE_ATTEMPT -> COMPLETED if the server errored out or rpc timed out? which won't be the case here? So I think we should add a check to make sure the previous state was BETWEEN_ATTEMPTS?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I think it only goes to BETWEEN_ATTEMPTS when an attempt fails. So if a request is successful, it can go straight from ACTIVE_ATTEMPT to COMPLETE
  • For CREATED->COMPLETE, I don't think we'd expect that to happen often, but it IIRC, it should be possible to run into an exception or something in the handwritten code before the actual rpc starts. Maybe we'd just want to drop the metric in that case, since there isn't much to record?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see i was confused by end_attempt_with_status transitions to BETWEEN_ATTEMPT state. But when status is ok this is transisioned from ACTIVE STATE and immediately to COMPLETE.

EndState: COMPLETED

Causes on_operation_completed to be called for each registered handler.
"""
Expand Down
53 changes: 18 additions & 35 deletions tests/unit/data/_metrics/test_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def test_ctor_defaults(self, mock_monotonic_ns):
assert metric.cluster_id is None
assert metric.zone is None
assert len(metric.completed_attempts) == 0
assert metric.was_completed is False
assert len(metric.handlers) == 0
assert metric.is_streaming is False
assert metric.flow_throttling_time_ns == 0
assert metric.state == State.CREATED

def test_ctor_explicit(self):
"""
Expand All @@ -55,7 +55,7 @@ def test_ctor_explicit(self):
expected_cluster_id = "cluster"
expected_zone = "zone"
expected_completed_attempts = [mock.Mock()]
expected_was_completed = True
expected_state = State.COMPLETED
expected_handlers = [mock.Mock()]
expected_is_streaming = True
expected_flow_throttling = 12
Expand All @@ -65,8 +65,8 @@ def test_ctor_explicit(self):
active_attempt=expected_active_attempt,
cluster_id=expected_cluster_id,
zone=expected_zone,
state=expected_state,
completed_attempts=expected_completed_attempts,
was_completed=expected_was_completed,
handlers=expected_handlers,
is_streaming=expected_is_streaming,
flow_throttling_time_ns=expected_flow_throttling,
Expand All @@ -77,7 +77,7 @@ def test_ctor_explicit(self):
assert metric.cluster_id == expected_cluster_id
assert metric.zone == expected_zone
assert metric.completed_attempts == expected_completed_attempts
assert metric.was_completed == expected_was_completed
assert metric.state == expected_state
assert metric.handlers == expected_handlers
assert metric.is_streaming == expected_is_streaming
assert metric.flow_throttling_time_ns == expected_flow_throttling
Expand All @@ -99,27 +99,18 @@ def test_state_machine_w_methods(self):
metric.end_with_success()
assert metric.state == State.COMPLETED

def test_state_machine_w_state(self):
def test_state_machine(self):
"""
Exercise state machine by directly manupulating state variables

relevant variables are: active_attempt, completed_attempts, was_completed
Exercise state machine by moving through states
"""
metric = self._make_one(mock.Mock())
for was_completed_value in [False, True]:
metric.was_completed = was_completed_value
for active_operation_value in [None, mock.Mock()]:
metric.active_attempt = active_operation_value
for completed_attempts_value in [[], [mock.Mock()]]:
metric.completed_attempts = completed_attempts_value
if was_completed_value:
assert metric.state == State.COMPLETED
elif active_operation_value is not None:
assert metric.state == State.ACTIVE_ATTEMPT
elif completed_attempts_value:
assert metric.state == State.BETWEEN_ATTEMPTS
else:
assert metric.state == State.CREATED
assert metric.state == State.CREATED
metric.start_attempt()
assert metric.state == State.ACTIVE_ATTEMPT
metric.end_attempt_with_status(0)
assert metric.state == State.BETWEEN_ATTEMPTS
metric.end_with_success()
assert metric.state == State.COMPLETED

@pytest.mark.parametrize(
"method,args,valid_states,error_method_name",
Expand Down Expand Up @@ -161,13 +152,7 @@ def test_error_invalid_states(self, method, args, valid_states, error_method_nam
for state in invalid_states:
with mock.patch.object(cls, "_handle_error") as mock_handle_error:
mock_handle_error.return_value = None
metric = self._make_one(mock.Mock())
if state == State.ACTIVE_ATTEMPT:
metric.active_attempt = mock.Mock()
elif state == State.BETWEEN_ATTEMPTS:
metric.completed_attempts.append(mock.Mock())
elif state == State.COMPLETED:
metric.was_completed = True
metric = self._make_one(mock.Mock(), state=state)
return_obj = getattr(metric, method)(*args)
assert return_obj is None
assert mock_handle_error.call_count == 1
Expand Down Expand Up @@ -286,7 +271,7 @@ def test_add_response_metadata_cbt_header(
cls = type(self._make_one(mock.Mock()))
with mock.patch.object(cls, "_handle_error") as mock_handle_error:
metric = self._make_one(
mock.Mock(), cluster_id=start_cluster, zone=start_zone
mock.Mock(), cluster_id=start_cluster, zone=start_zone, state=State.ACTIVE_ATTEMPT
)
metric.active_attempt = mock.Mock()
metric.active_attempt.gfe_latency_ns = None
Expand Down Expand Up @@ -323,7 +308,7 @@ def test_add_response_metadata_cbt_header_w_error(self, metadata_field):

cls = type(self._make_one(mock.Mock()))
with mock.patch.object(cls, "_handle_error") as mock_handle_error:
metric = self._make_one(mock.Mock())
metric = self._make_one(mock.Mock(), state=State.ACTIVE_ATTEMPT)
metric.cluster_id = None
metric.zone = None
metric.active_attempt = mock.Mock()
Expand Down Expand Up @@ -371,7 +356,7 @@ def test_add_response_metadata_server_timing_header(

cls = type(self._make_one(mock.Mock()))
with mock.patch.object(cls, "_handle_error") as mock_handle_error:
metric = self._make_one(mock.Mock())
metric = self._make_one(mock.Mock(), state=State.ACTIVE_ATTEMPT)
metric.active_attempt = mock.Mock()
metric.active_attempt.gfe_latency_ns = None
metadata = grpc.aio.Metadata()
Expand Down Expand Up @@ -477,7 +462,7 @@ def test_end_with_status(self, mock_monotonic_ns):

handlers = [mock.Mock(), mock.Mock()]
metric = self._make_one(
expected_type, handlers=handlers, start_time_ns=expected_start_time
expected_type, handlers=handlers, start_time_ns=expected_start_time, state=State.ACTIVE_ATTEMPT
)
metric.cluster_id = expected_cluster
metric.zone = expected_zone
Expand All @@ -492,7 +477,6 @@ def test_end_with_status(self, mock_monotonic_ns):
metric.end_with_status(expected_status)
# test that ActiveOperation was updated to terminal state
assert metric.state == State.COMPLETED
assert metric.was_completed is True
assert metric.active_attempt is None
assert len(metric.completed_attempts) == 1
# check that finalized operation was passed to handlers
Expand Down Expand Up @@ -588,7 +572,6 @@ def test_end_on_empty_operation(self):
metric = self._make_one(mock.Mock(), handlers=handlers)
metric.end_with_success()
assert metric.state == State.COMPLETED
assert metric.was_completed is True
final_op = handlers[0].on_operation_complete.call_args[0][0]
assert final_op.final_status == StatusCode.OK
assert final_op.completed_attempts == []
Expand Down