Skip to content

Commit ec00ea3

Browse files
committed
test(grpc): add Rust integration tests and Python tests for retry/callback path (DX-0155)
## Purpose DX-0151 through DX-0154 added pushback parsing, smear jitter, and the PyO3 throttle callback to the Rust gRPC retry path. This task validates that the pieces compose correctly end-to-end. ## Solution - Added `rust/tests/retry_integration.rs` with four integration tests that exercise `retry_on_transient` directly via pure-Rust closures: - `callback_fires_per_retry_attempt`: 3 retryable errors then success; asserts the callback fires once per failure. - `pushback_smear_produces_delays_within_range`: injects `grpc-retry-pushback-ms` trailer and measures that the actual sleep lands in [pushback, pushback*1.5 + slack]. - `callback_exception_does_not_break_retry`: callback with silently-discarded error (mirrors transport.rs Python exception-ignoring pattern); retry succeeds. - `host_string_received_by_callback`: asserts callback receives the host string from RetryConfig. - Changed `crate-type = ["cdylib"]` to `["cdylib", "rlib"]` so the Cargo integration-test harness can link against the library crate. - Made `mod retry` public in `lib.rs` to expose its API to the `tests/` directory. - Refactored `test_retry_callback.py` to import GrpcChannel once at module level with `allow_module_level=True` skip guard (eliminates duplicate imports). - Added per-module mypy override to suppress `warn_unused_ignores` for `test_retry_callback.py` (the single `# type: ignore[import-not-found]` is intentional for environments where the extension is not yet built).
1 parent 469d25a commit ec00ea3

5 files changed

Lines changed: 182 additions & 17 deletions

File tree

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ include = ["LICENSE"]
7575
strict = true
7676
python_version = "3.10"
7777

78+
[[tool.mypy.overrides]]
79+
module = "tests.unit.grpc.test_retry_callback"
80+
warn_unused_ignores = false
81+
7882
[tool.ruff]
7983
line-length = 100
8084
target-version = "py310"

rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ edition = "2021"
55

66
[lib]
77
name = "pinecone_grpc"
8-
crate-type = ["cdylib"]
8+
crate-type = ["cdylib", "rlib"]
99

1010
[dependencies]
1111
pyo3 = { version = "0.24", features = ["extension-module", "abi3-py310"] }

rust/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use pyo3::prelude::*;
22

3-
mod retry;
3+
pub mod retry;
44
mod transport;
55

66
/// Generated protobuf types for the Pinecone data plane.

rust/tests/retry_integration.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
use std::sync::atomic::{AtomicU32, Ordering};
2+
use std::sync::Arc;
3+
use std::time::{Duration, Instant};
4+
5+
use pinecone_grpc::retry::{retry_on_transient, RetryConfig, ThrottleCallback};
6+
use tonic::Status;
7+
8+
#[tokio::test]
9+
async fn callback_fires_per_retry_attempt() {
10+
let call_count = Arc::new(AtomicU32::new(0));
11+
let count = call_count.clone();
12+
let cb_call_count = Arc::new(AtomicU32::new(0));
13+
let cb_count = cb_call_count.clone();
14+
15+
let on_throttle: ThrottleCallback = Arc::new(move |_h: String| {
16+
cb_count.fetch_add(1, Ordering::SeqCst);
17+
});
18+
19+
let config = RetryConfig {
20+
max_retries: 3,
21+
initial_backoff: Duration::from_millis(1),
22+
max_backoff: Duration::from_millis(5),
23+
on_throttle: Some(on_throttle),
24+
..RetryConfig::default()
25+
};
26+
27+
let result = retry_on_transient(&config, || {
28+
let count = count.clone();
29+
async move {
30+
let n = count.fetch_add(1, Ordering::SeqCst);
31+
if n < 3 {
32+
Err(Status::resource_exhausted("throttled"))
33+
} else {
34+
Ok::<(), Status>(())
35+
}
36+
}
37+
})
38+
.await;
39+
40+
assert!(result.is_ok());
41+
// 3 retryable failures before success → callback fires 3 times (once per failure)
42+
assert_eq!(cb_call_count.load(Ordering::SeqCst), 3);
43+
}
44+
45+
#[tokio::test]
46+
async fn pushback_smear_produces_delays_within_range() {
47+
let pushback_ms: u64 = 20;
48+
49+
let config = RetryConfig {
50+
max_retries: 1,
51+
initial_backoff: Duration::from_millis(1),
52+
max_backoff: Duration::from_millis(200),
53+
..RetryConfig::default()
54+
};
55+
56+
let start = Instant::now();
57+
let _ = retry_on_transient(&config, || async {
58+
let mut s = Status::resource_exhausted("throttled");
59+
s.metadata_mut().insert(
60+
"grpc-retry-pushback-ms",
61+
pushback_ms.to_string().parse().unwrap(),
62+
);
63+
Err::<(), Status>(s)
64+
})
65+
.await;
66+
let elapsed = start.elapsed();
67+
68+
// smear_pushback(20ms, 200ms) returns uniform(20ms, 30ms), well under cap.
69+
// Lower bound: must wait at least pushback ms.
70+
// Upper bound: pushback + pushback/2 + generous CI slack (200ms).
71+
assert!(
72+
elapsed >= Duration::from_millis(pushback_ms),
73+
"elapsed {:?} should be >= pushback {}ms",
74+
elapsed,
75+
pushback_ms
76+
);
77+
assert!(
78+
elapsed < Duration::from_millis(pushback_ms + pushback_ms / 2 + 200),
79+
"elapsed {:?} exceeded expected ceiling {}ms",
80+
elapsed,
81+
pushback_ms + pushback_ms / 2 + 200
82+
);
83+
}
84+
85+
#[tokio::test]
86+
async fn callback_exception_does_not_break_retry() {
87+
// Verify that a callback which handles its own failure silently (matching the
88+
// transport.rs pattern for Python exceptions:
89+
// `if let Err(_) = py_cb.call1(py, (h,)) { /* log and ignore */ }`)
90+
// does not prevent retry from succeeding.
91+
let call_count = Arc::new(AtomicU32::new(0));
92+
let count = call_count.clone();
93+
let cb_call_count = Arc::new(AtomicU32::new(0));
94+
let cb_count = cb_call_count.clone();
95+
96+
let on_throttle: ThrottleCallback = Arc::new(move |_h: String| {
97+
cb_count.fetch_add(1, Ordering::SeqCst);
98+
// Simulate a callback that raises (e.g. Python ValueError) — error is discarded,
99+
// as transport.rs does with `if let Err(e) = py_cb.call1(py, (h,)) {}`.
100+
let _discarded: Result<(), &str> = Err("ValueError: simulated throttle error");
101+
});
102+
103+
let config = RetryConfig {
104+
max_retries: 2,
105+
initial_backoff: Duration::from_millis(1),
106+
max_backoff: Duration::from_millis(5),
107+
on_throttle: Some(on_throttle),
108+
host: "test-index.svc.pinecone.io".into(),
109+
..RetryConfig::default()
110+
};
111+
112+
let result = retry_on_transient(&config, || {
113+
let count = count.clone();
114+
async move {
115+
let n = count.fetch_add(1, Ordering::SeqCst);
116+
if n < 2 {
117+
Err(Status::resource_exhausted("throttled"))
118+
} else {
119+
Ok::<(), Status>(())
120+
}
121+
}
122+
})
123+
.await;
124+
125+
assert!(
126+
result.is_ok(),
127+
"retry should succeed despite callback raising"
128+
);
129+
assert_eq!(
130+
cb_call_count.load(Ordering::SeqCst),
131+
2,
132+
"callback should fire on each retryable error"
133+
);
134+
}
135+
136+
#[tokio::test]
137+
async fn host_string_received_by_callback() {
138+
let expected_host = "my-index-abc123.svc.pinecone.io";
139+
let received_hosts: Arc<std::sync::Mutex<Vec<String>>> =
140+
Arc::new(std::sync::Mutex::new(Vec::new()));
141+
let hosts_clone = received_hosts.clone();
142+
143+
let on_throttle: ThrottleCallback = Arc::new(move |h: String| {
144+
hosts_clone.lock().unwrap().push(h);
145+
});
146+
147+
let config = RetryConfig {
148+
max_retries: 2,
149+
initial_backoff: Duration::from_millis(1),
150+
max_backoff: Duration::from_millis(5),
151+
on_throttle: Some(on_throttle),
152+
host: expected_host.to_string(),
153+
..RetryConfig::default()
154+
};
155+
156+
let _ = retry_on_transient(&config, || async {
157+
Err::<(), Status>(Status::resource_exhausted("throttled"))
158+
})
159+
.await;
160+
161+
let hosts = received_hosts.lock().unwrap();
162+
assert!(
163+
!hosts.is_empty(),
164+
"callback should have been invoked at least once"
165+
);
166+
assert!(
167+
hosts.iter().all(|h| h == expected_host),
168+
"callback received unexpected host strings: {:?}",
169+
hosts
170+
);
171+
}

tests/unit/grpc/test_retry_callback.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44

55
import pytest
66

7+
try:
8+
from pinecone._grpc import GrpcChannel # type: ignore[import-not-found]
9+
except ImportError:
10+
pytest.skip("Rust extension not built", allow_module_level=True)
11+
712

813
class TestGrpcThrottleCallback:
914
def test_callback_accepts_host_string(self) -> None:
1015
"""GrpcChannel constructor accepts on_throttle callback as kwarg."""
11-
try:
12-
from pinecone._grpc import GrpcChannel # type: ignore[import-not-found]
13-
except ImportError:
14-
pytest.skip("Rust extension not built")
15-
1616
received: list[str] = []
1717
ch = GrpcChannel(
1818
endpoint="https://test-index-abc.svc.pinecone.io:443",
@@ -25,11 +25,6 @@ def test_callback_accepts_host_string(self) -> None:
2525

2626
def test_callback_none_is_default(self) -> None:
2727
"""Constructor accepts None or absence of on_throttle without error."""
28-
try:
29-
from pinecone._grpc import GrpcChannel # type: ignore[import-not-found]
30-
except ImportError:
31-
pytest.skip("Rust extension not built")
32-
3328
ch = GrpcChannel(
3429
endpoint="https://test-index-abc.svc.pinecone.io:443",
3530
api_key="test-key",
@@ -40,11 +35,6 @@ def test_callback_none_is_default(self) -> None:
4035

4136
def test_callback_omitted_same_as_none(self) -> None:
4237
"""Omitting on_throttle is identical to passing on_throttle=None."""
43-
try:
44-
from pinecone._grpc import GrpcChannel # type: ignore[import-not-found]
45-
except ImportError:
46-
pytest.skip("Rust extension not built")
47-
4838
ch_explicit = GrpcChannel(
4939
endpoint="https://test-index-abc.svc.pinecone.io:443",
5040
api_key="test-key",

0 commit comments

Comments
 (0)