feat: Make transport channel capacity configurable#1040
feat: Make transport channel capacity configurable#1040mvanhorn wants to merge 8 commits intogetsentry:masterfrom
Conversation
Add `transport_channel_capacity` field to `ClientOptions` that controls the bounded sync channel size used by the transport thread. Defaults to 30 (preserving current behavior). Users in high-throughput scenarios can increase this to reduce the chance of dropped envelopes. Closes getsentry#994 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #1040 +/- ##
==========================================
+ Coverage 73.81% 74.03% +0.22%
==========================================
Files 64 67 +3
Lines 7538 7988 +450
==========================================
+ Hits 5564 5914 +350
- Misses 1974 2074 +100 |
|
Hi @mvanhorn, thanks for the contribution! Before I proceed to a full review, please address the CI failures and the AI review agent comments. If any of the AI review comments are inaccurate, please comment on them and mark as resolved. Thanks! |
- Add transport_channel_capacity to manual Debug implementation - Clamp channel capacity to minimum of 1 to prevent rendezvous channel from silently dropping envelopes via try_send - Run cargo fmt to fix lint CI failure
|
Addressed in 9c8f904:
|
szokeasaurusrex
left a comment
There was a problem hiding this comment.
Thanks again for the contribution. I think we should revise the public API shape to avoid public API breakages before merging this change.
The current implementation introduces public API breakage by:
- adding a new public field to
ClientOptions - changing the signatures of the public transport thread constructors
I would like to avoid those breakages here and instead expose this as additive API:
- add additive
with_capacity(...)constructors on the public transport thread types, i.e.StdTransportThread::with_capacity(send, channel_capacity)andTokioTransportThread::with_capacity(send, channel_capacity) - keep the existing
new(...)signatures unchanged, but make those constructors delegate towith_capacity(..., 30) - add transport-specific constructors such as
ReqwestHttpTransport::with_channel_capacity(options, channel_capacity)(and similarly forcurlandureq), which would use those newwith_capacity(...)thread constructors internally - document use through
ClientOptions.transport
That still gives users a way to override the transport queue capacity via the existing TransportFactory mechanism, without changing ClientOptions yet.
For example, initializing the SDK with a custom capacity could look like this:
let opts = ClientOptions {
transport: Some(Arc::new(move |opts| {
Arc::new(ReqwestHttpTransport::with_channel_capacity(opts, 256))
})),
..Default::default()
};If you are open to it, please refactor the PR in that direction. If not, let me know and I can take it over as a follow-up.
…el_capacity Per review: avoid public API breakages in ClientOptions and the transport constructors. Replace the 'transport_channel_capacity: usize' field and the changed TransportThread/transport constructor signatures with purely additive APIs: - TransportThread::new(send) restored to original signature, delegates to with_capacity(send, 30). TransportThread::with_capacity(send, capacity) is the new entry point for custom capacity. - Same pattern for tokio_thread::TransportThread. - ReqwestHttpTransport/CurlHttpTransport/UreqHttpTransport gain with_channel_capacity(options, capacity). Existing new/with_client/ with_agent keep the default capacity of 30. - Remove transport_channel_capacity from ClientOptions (field, Default, and Debug impl). Users that need to override capacity configure it via ClientOptions.transport with a factory returning the desired transport, e.g. Arc::new(move |opts| Arc::new(ReqwestHttpTransport::with_channel_capacity(opts, 256))).
|
Thanks for the direction @szokeasaurusrex. Refactored to the additive shape you described in fbff3ea:
Your example works unchanged: let opts = ClientOptions {
transport: Some(Arc::new(move |opts| {
Arc::new(ReqwestHttpTransport::with_channel_capacity(opts, 256))
})),
..Default::default()
};
|
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit fbff3ea. Configure here.
|
@szokeasaurusrex - the refactor in fbff3ea matches the additive-API shape you requested. I've also replied to and resolved the two stale bot comments from Apr 15, which were analyzing against the original design. Verified locally: |
|
@mvanhorn I have this PR on my list of things to review. I am busy with another project at the moment but will try to review this within the next week or so |
|
Thanks @szokeasaurusrex - no rush, whenever you get a chance. |
szokeasaurusrex
left a comment
There was a problem hiding this comment.
Thanks for your patience for my review. I added some small comments; this looks pretty good overall though!
| /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to | ||
| /// avoid a rendezvous channel, which would silently drop envelopes under | ||
| /// `try_send`. | ||
| pub fn with_capacity<SendFn, SendFuture>(mut send: SendFn, channel_capacity: usize) -> Self |
There was a problem hiding this comment.
m: Let's make this pub(crate) for now to limit public API surface. If folks want to have this as a public API in the future, we can expose it at that time.
| pub fn with_capacity<SendFn, SendFuture>(mut send: SendFn, channel_capacity: usize) -> Self | |
| pub(crate) fn with_capacity<SendFn, SendFuture>(mut send: SendFn, channel_capacity: usize) -> Self |
| /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to | ||
| /// avoid a rendezvous channel, which would silently drop envelopes under | ||
| /// `try_send`. | ||
| pub fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self |
There was a problem hiding this comment.
m: Let's make this pub(crate) for now to limit public API surface. If folks want to have this as a public API in the future, we can expose it at that time.
| pub fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self | |
| pub(crate) fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self |
| /// Creates a new Transport. | ||
| pub fn new(options: &ClientOptions) -> Self { | ||
| Self::new_internal(options, None) | ||
| Self::new_internal(options, None, 30) |
There was a problem hiding this comment.
m: As we use the number 30 as the default channel capacity in all the transports, we should extract it to a constant that we reuse in all of them.
…TY const Per @szokeasaurusrex review: with_capacity narrowed to pub(crate) on both TransportThread types; the 30 literal extracted to a shared pub(crate) const reused across curl, reqwest, and ureq transports. Includes: Authorized scope (mod.rs) + correct extension (ureq.rs has the same literal). Mechanical change verified by build + fmt; no runtime effect needs test coverage.
|
Addressed in 6d1c9ff:
Verified |
szokeasaurusrex
left a comment
There was a problem hiding this comment.
Hey, thanks for addressing those! I just have one more thought about the clamping, then I think this is good to merge
| where | ||
| SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, | ||
| { | ||
| let (sender, receiver) = sync_channel(channel_capacity.max(1)); |
There was a problem hiding this comment.
I think we should honor 0 here instead of clamping it. This is an advanced, opt-in transport API, and sync_channel(0) has defined rendezvous/no-buffer semantics. A capacity of 1 can still drop most events under bursts; it is not a general safeguard, only a different buffering policy. If someone explicitly passes 0, treating that as “no queued buffering” seems reasonable.
I tested this end-to-end with the clamp removed and with_channel_capacity(..., 0): sending 10 rapid events accepted 1 and dropped 9. That matches the channel semantics: zero capacity does not necessarily drop everything; it accepts an envelope when try_send happens while the transport thread is already waiting on the receiver. If we keep support for 0, the doc comment should describe that behavior rather than saying it would silently drop envelopes generally.
| // NOTE: returning RateLimiter here, otherwise we are in borrow hell | ||
| SendFuture: std::future::Future<Output = RateLimiter>, | ||
| { | ||
| let (sender, receiver) = sync_channel(channel_capacity.max(1)); |
There was a problem hiding this comment.
Same concept applies here; I tested both transport thread variants with the clamp removed, and both accepted 1 of 10 rapid events with capacity 0 rather than dropping everything.
…(1) clamp Per @szokeasaurusrex's review: this is an advanced opt-in transport API, and sync_channel(0) has defined rendezvous semantics. With try_send, capacity 0 accepts an envelope when the transport thread is currently waiting on the receiver and drops it otherwise, which is a valid no-buffer back-pressure policy rather than "drop everything." Update the doc comment in thread.rs and tokio_thread.rs to describe that behavior accurately.
|
Done in c819ce9 - dropped the |
| pub(crate) fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self | ||
| where | ||
| SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, | ||
| { | ||
| let (sender, receiver) = sync_channel(channel_capacity); | ||
| let shutdown = Arc::new(AtomicBool::new(false)); | ||
| let shutdown_worker = shutdown.clone(); | ||
| let handle = thread::Builder::new() |
There was a problem hiding this comment.
Bug: With channel_capacity=0, flush() and drop() use a blocking send() for control tasks, which can cause the caller to hang if the worker thread is busy.
Severity: MEDIUM
Suggested Fix
Consider using try_send() for control tasks like Task::Flush and Task::Shutdown, similar to how envelopes are handled. This would align with the documented behavior and prevent blocking in flush() and drop() when the channel capacity is zero and the worker is busy.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: sentry/src/transports/thread.rs#L46-L53
Potential issue: When the transport thread is configured with `channel_capacity=0`, it
creates a rendezvous channel. However, the `flush()` and `drop()` methods use a blocking
`send()` to dispatch control tasks (`Task::Flush`, `Task::Shutdown`). If the worker
thread is occupied with a long-running operation, such as sending an envelope over HTTP,
it cannot receive new tasks. Consequently, the call to `flush()` will block until its
timeout is reached, and more critically, `drop()` will block the calling thread (e.g.,
during application shutdown) until the long-running operation completes. This can lead
to significant delays or hangs during shutdown for users who opt into this advanced
configuration. The documentation is also misleading, as it implies non-blocking behavior
for all sends.
Also affects:
sentry/src/transports/tokio_thread.rs:48~55

Summary
Adds transport-level
with_channel_capacityconstructors so users can tune the bounded channel size used by the transport thread. The default remains 30, preserving existing behavior.The original design added a
transport_channel_capacityfield toClientOptions. Per @szokeasaurusrex's review, refactored to additive transport-level methods soClientOptionsstays minimal and the feature is opt-in at the transport boundary.Why this matters
In high-throughput scenarios (many transactions with single spans each), the hardcoded capacity of 30 can saturate quickly, leading to dropped envelopes. Identified in #923, tracked in #994. Making it configurable lets users trade memory for reliability based on their workload.
Changes
sentry/src/transports/thread.rs:TransportThread::new(send)keeps its original signature. NewTransportThread::with_capacity(send, channel_capacity)accepts a custom capacity, clamped to a minimum of 1 to avoid rendezvous channels.sentry/src/transports/tokio_thread.rs: Same pattern for the async transport variant.sentry/src/transports/curl.rs: AddedCurlHttpTransport::with_channel_capacity(options, channel_capacity).new(options)unchanged.sentry/src/transports/reqwest.rs: AddedReqwestHttpTransport::with_channel_capacity(options, channel_capacity).new(options)/with_client(options, client)unchanged.sentry/src/transports/ureq.rs: AddedUreqHttpTransport::with_channel_capacity(options, channel_capacity).new(options)/with_agent(options, agent)unchanged.Usage
Testing
cargo check --workspace --all-featurespassescargo fmt --allcleancargo clippy --workspace --all-featurescleanCloses #994
This contribution was developed with AI assistance (Claude Code).