Skip to content

fix OOM in fast networks#7

Open
bennyz wants to merge 2 commits intojumpstarter-dev:mainfrom
bennyz:limit-buffer-memory
Open

fix OOM in fast networks#7
bennyz wants to merge 2 commits intojumpstarter-dev:mainfrom
bennyz:limit-buffer-memory

Conversation

@bennyz
Copy link
Member

@bennyz bennyz commented Feb 14, 2026

Currently the download buffer is bounded by item count, this is ok if chunks are 16KB,
However if we get 256KB chunks for example, we go over the 128MB limit.

This patch uses ByteBoundChannel to enforce the byte limit, even we are under the chunk
size limit

Summary by CodeRabbit

Release Notes

  • New Features

    • Implemented a byte-bounded buffering mechanism for streaming, downloads, tar extraction, and decompression pipelines to enforce fixed memory limits with item-count caps.
  • Tests

    • Added integration tests validating backpressure, throughput, oversized-chunk behavior, and memory-bounded transfer scenarios.

@coderabbitai
Copy link

coderabbitai bot commented Feb 14, 2026

📝 Walkthrough

Walkthrough

Introduces a byte-bounded channel that enforces backpressure by total buffered bytes via a SizedItem trait, ByteBoundedSender/ByteBoundedReceiver, and a factory; replaces several mpsc channels in download/decompression pipelines and adds integration tests and stream-reader adaptations.

Changes

Cohort / File(s) Summary
Byte-Bounded Channel Core
src/fls/byte_channel.rs
Adds SizedItem trait and impl for bytes::Bytes; implements ByteBoundedSender and ByteBoundedReceiver using an Arc<Semaphore> to track byte permits; provides byte_bounded_channel constructor and comprehensive unit tests for byte-level backpressure and edge cases.
Module Declaration
src/fls/mod.rs
Exports new byte_channel module (pub mod byte_channel).
Download Buffer Integration
src/fls/from_url.rs
Replaces previous capacity-based in-memory buffer with byte_bounded_channel using a fixed max_buffer_bytes and item limit; updates logging and buffer creation to use byte-bounded semantics.
OCI Pipeline Refactor
src/fls/oci/from_oci.rs
Replaces mpsc channels with ByteBoundedSender<Bytes> / ByteBoundedReceiver<Bytes> across tar extraction, decompression, and raw-disk download flows; updates pipeline components, coordination structs, and channel wiring to use byte-bounded buffering.
Stream Reader Abstraction
src/fls/stream_utils.rs
Introduces ReceiverVariant enum to unify mpsc::Receiver<Bytes> and ByteBoundedReceiver<Bytes]; adds ChannelReader::new_byte_bounded and adapts ChannelReader::new to wrap receivers in the variant; forwards blocking_recv through the variant.
Byte-Bounded Channel Tests
tests/byte_bounded_memory.rs
Adds integration tests covering backpressure with large chunks, high-throughput small chunks, mixed chunk sizes, oversized-chunk handling, and chunk-size independence; uses tokio tests and timed assertions.
Test Utilities
tests/common/mod.rs
Adds #[allow(dead_code)] attributes to create_test_data and compress_gz helper functions (lint suppression only).

Sequence Diagram(s)

sequenceDiagram
    %% Participants
    participant Producer as Producer
    participant Sender as ByteBoundedSender
    participant Sem as Semaphore
    participant MPSC as "mpsc::Sender/Receiver"
    participant Receiver as ByteBoundedReceiver
    participant Consumer as Consumer

    Producer->>Sender: send(item)
    Sender->>Sender: compute permits_needed = min(item.byte_size, max_bytes)
    Sender->>Sem: acquire(permits_needed)
    Sem-->>Sender: permits acquired (or block)
    Sender->>MPSC: send(item)
    MPSC-->>Sender: OK
    Sender-->>Producer: OK

    Consumer->>Receiver: recv()
    Receiver->>MPSC: recv()
    MPSC-->>Receiver: item
    Receiver->>Receiver: compute permits_to_release = min(item.byte_size, max_bytes)
    Receiver->>Sem: release(permits_to_release)
    Sem-->>Receiver: permits returned
    Receiver-->>Consumer: item
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I hopped a packet, counted bytes in tow,
Permits I grabbed so backpressure could grow,
From sender to receiver the semaphore sings,
Streams hum steady as each chunk takes wings,
thump-thump, carrot-powered CI! 🥕

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly relates to the main change: introducing byte-bounded channels to enforce memory limits and prevent OOM on fast networks with large chunk sizes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

No actionable comments were generated in the recent review. 🎉

🧹 Recent nitpick comments
src/fls/byte_channel.rs (1)

35-50: Permits are leaked if inner.send() fails on Line 49.

After permit.forget() on Line 47, if self.inner.send(item).await returns Err (receiver dropped), those permits are permanently lost from the semaphore. In practice this is benign because a failed send means the channel is dead and no further sends will occur. However, for correctness and defensiveness, consider acquiring permits, sending, and only then forgetting — or recovering permits on send failure.

♻️ Proposed fix to recover permits on send failure
     pub async fn send(&self, item: T) -> Result<(), mpsc::error::SendError<T>> {
         let permits_needed = item.byte_size().min(self.max_bytes);
 
         let permits_needed_u32 = permits_needed as u32;
 
-        // acquire_many_owned returns OwnedSemaphorePermit which we intentionally
-        // forget — the receiver side adds permits back after consuming the item.
-        let permit = self
+        let permit = self
             .semaphore
             .acquire_many(permits_needed_u32)
             .await
             .expect("semaphore closed unexpectedly");
-        permit.forget();
 
-        self.inner.send(item).await
+        match self.inner.send(item).await {
+            Ok(()) => {
+                permit.forget();
+                Ok(())
+            }
+            Err(e) => {
+                // Drop permit naturally, returning permits to semaphore
+                drop(permit);
+                Err(e)
+            }
+        }
     }

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@src/fls/byte_channel.rs`:
- Around line 35-48: The cast in send() (permits_needed as u32) can overflow
when max_bytes/byte_size() exceed u32::MAX; add a guard in the channel
constructor (byte_bounded_channel) to ensure the provided max_bytes is <=
u32::MAX and fail early (return an Err or panic with a clear message) so send()
can safely cast to u32; also update send() to use a checked conversion (e.g.,
try_into/try_from) or rely on the validated invariant to make the cast explicit.
Ensure you reference/validate the same max_bytes used by byte_bounded_channel
and mention the failure path so callers cannot create a channel larger than
u32::MAX.

In `@tests/byte_bounded_memory.rs`:
- Around line 34-37: The test currently detects backpressure by comparing
send_duration to a hardcoded wall-clock threshold (the send_duration variable
and its comparisons against Duration::from_millis(...)) which is flaky in CI;
change both places that compare send_duration to a duration to a structural
check instead — e.g., use a non-blocking try_send (or inspect a
semaphore/channel available_permits / is_full) before performing the send and
increment backpressure_counter only when try_send fails or when you must acquire
a permit (i.e., the send would block), or if you cannot change to try_send then
relax the thresholds substantially; update the checks around
backpressure_counter and remove reliance on elapsed time so tests are robust.
- Line 2: The file currently declares an unused module import "mod common" which
causes a compiler warning; either remove the unused declaration or actually use
the utilities from that module (e.g., call create_test_data, compress_gz, or
compress_xz) in tests such as those in byte_bounded_memory.rs; locate the "mod
common" line and delete it if you don't need common helpers, or import and
invoke the specific functions (create_test_data, compress_gz, compress_xz) where
appropriate to justify keeping the module.
🧹 Nitpick comments (3)
src/fls/byte_channel.rs (1)

61-77: Permit accounting for zero-byte items.

If a SizedItem has byte_size() == 0, it acquires 0 permits and passes through without consuming any byte budget. This is correct but means zero-byte items are bounded only by max_items, not by max_bytes. If this is intentional, consider documenting it; if not, consider acquiring at least 1 permit for non-empty items.

For bytes::Bytes, len() == 0 is a valid state (e.g., Bytes::new()), so this is a realistic scenario in edge cases.

src/fls/mod.rs (1)

4-4: Consider pub(crate) if the module is not part of the public API.

byte_channel is exposed as pub mod alongside automotive and oci. If it's only needed by integration tests, you could use #[cfg(test)] pub mod byte_channel or keep it pub(crate) and re-export only what tests need. However, if integration tests under tests/ require access, pub is the simplest path.

src/fls/oci/from_oci.rs (1)

2290-2296: Buffer capacity calculation duplicated across call sites.

The formula ((buffer_size_mb * 1024) / 16).max(1000) appears here and is effectively the same as the buffer_capacity calculation in flash_from_oci (lines 1699-1701). Consider extracting this into a shared helper to keep the heuristic in one place.

Sketch
fn default_buffer_item_capacity(buffer_size_mb: usize) -> usize {
    ((buffer_size_mb * 1024) / 16).max(1000)
}

Currently the download buffer is bounded by item count, this is ok if chunks are 16KB,
However if we get 256KB chunks for example, we go over the 128MB limit.

This patch uses ByteBoundChannel to enforce the byte limit, even we are under the chunk
size limit

Signed-off-by: Benny Zlotnik <bzlotnik@redhat.com>
Assisted-by: claude-opus-4.5
Signed-off-by: Benny Zlotnik <bzlotnik@redhat.com>
@bennyz bennyz force-pushed the limit-buffer-memory branch from e41c71d to 764efde Compare February 14, 2026 09:25
@bennyz bennyz requested a review from mangelajo February 14, 2026 10:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant