From 6abe82902c7910a3fd46e0dbcae63d245547ce77 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Sun, 14 Jun 2026 21:31:28 +0200 Subject: [PATCH 1/8] feat(crypto): framed internal-part format with backward-compatible read MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the wire format for encrypting an internal S3 part as a sequence of independent AES-GCM frames (8 MB plaintext each) instead of one seal over the whole part. This lets both writer and reader process one frame at a time, so peak memory is O(frame) regardless of part size — the basis for removing the large per-part memory the limiter must currently budget for. Backward compatible by construction: the frame count is derived from the stored sizes (num_frames = (ciphertext_size - plaintext_size) / overhead), so a legacy single-seal part is exactly the num_frames == 1 case and reads through the same path with no migration of stored objects. - crypto: FRAME_PLAINTEXT_SIZE (frozen), derive_frame_nonce, frame_count, framed_ciphertext_size, encrypt_frame, decrypt_framed. - get: _fetch_internal_part decrypts via decrypt_framed (handles both legacy and framed parts). - tests: round-trip across sizes, legacy single-seal compat, nonce uniqueness. Write-side framing (producing framed parts) follows separately. --- s3proxy/crypto.py | 89 ++++++++++++++++++++++++++++++++ s3proxy/handlers/objects/get.py | 4 +- tests/unit/test_framed_crypto.py | 54 +++++++++++++++++++ 3 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_framed_crypto.py diff --git a/s3proxy/crypto.py b/s3proxy/crypto.py index ea0d0cf..eb82b8a 100644 --- a/s3proxy/crypto.py +++ b/s3proxy/crypto.py @@ -37,6 +37,28 @@ # Sweet spot: 8MB balances part count vs per-part overhead MAX_BUFFER_SIZE = 8 * 1024 * 1024 # 8 MB per internal part +# Framed internal-part format. +# S3's 10,000-part limit forces large internal parts for big objects, but we do +# not want to hold a whole part (plaintext + ciphertext) in memory. So a part is +# encrypted as a sequence of independent AES-GCM *frames*, each sealing up to +# FRAME_PLAINTEXT_SIZE bytes: +# +# part = frame[0] || frame[1] || ... +# frame[i] = nonce(12) || ciphertext || tag(16) +# +# Both writer and reader stream one frame at a time, so peak memory is O(frame), +# independent of part size. The reader recovers the frame count purely from the +# stored sizes: +# +# num_frames = (ciphertext_size - plaintext_size) / ENCRYPTION_OVERHEAD +# +# A legacy single-seal part is exactly the num_frames == 1 case, so it is read by +# the same code path with no migration of stored data. +# +# FROZEN: never change FRAME_PLAINTEXT_SIZE. Parts already written were framed at +# this boundary and the reader splits ciphertext on (FRAME_PLAINTEXT_SIZE + overhead). +FRAME_PLAINTEXT_SIZE = 8 * 1024 * 1024 # 8 MB plaintext per frame + def calculate_optimal_part_size(content_length: int) -> int: """Calculate optimal part size to avoid creating parts < 5MB that aren't the final part.""" @@ -140,6 +162,73 @@ def derive_part_nonce(upload_id: str, part_number: int) -> bytes: return hashlib.sha256(data).digest()[:NONCE_SIZE] +def derive_frame_nonce(upload_id: str, part_number: int, frame_index: int) -> bytes: + """Deterministic, unique nonce for one frame of an internal part. + + Within an upload the DEK is fixed, so the (part_number, frame_index) pair must + be globally unique to never reuse an AES-GCM nonce. Internal part numbers are + unique per upload and frame indexes are unique within a part, so this holds. + """ + data = f"{upload_id}:{part_number}:{frame_index}".encode() + return hashlib.sha256(data).digest()[:NONCE_SIZE] + + +def frame_count(plaintext_size: int) -> int: + """Number of frames a plaintext of this size is encrypted into.""" + if plaintext_size <= FRAME_PLAINTEXT_SIZE: + return 1 + return (plaintext_size + FRAME_PLAINTEXT_SIZE - 1) // FRAME_PLAINTEXT_SIZE + + +def framed_ciphertext_size(plaintext_size: int) -> int: + """Total stored size of a framed part: plaintext + per-frame GCM overhead.""" + return plaintext_size + frame_count(plaintext_size) * ENCRYPTION_OVERHEAD + + +def encrypt_frame( + plaintext: bytes, dek: bytes, upload_id: str, part_number: int, frame_index: int +) -> bytes: + """Encrypt a single frame (nonce || ciphertext || tag) with its derived nonce.""" + return encrypt(plaintext, dek, derive_frame_nonce(upload_id, part_number, frame_index)) + + +def _ciphertext_frame_sizes(plaintext_size: int, stored_overhead: int) -> list[int]: + """Ciphertext byte length of each frame, derived from the stored sizes. + + `stored_overhead = ciphertext_size - plaintext_size` tells us the real frame + count for the part as it was written, which is authoritative even if + FRAME_PLAINTEXT_SIZE were ever reinterpreted: a legacy single-seal part has + overhead == ENCRYPTION_OVERHEAD (one frame) regardless of its plaintext size. + """ + num_frames = stored_overhead // ENCRYPTION_OVERHEAD + if num_frames <= 1: + return [plaintext_size + ENCRYPTION_OVERHEAD] + sizes = [] + remaining = plaintext_size + for _ in range(num_frames): + pt = min(FRAME_PLAINTEXT_SIZE, remaining) + sizes.append(pt + ENCRYPTION_OVERHEAD) + remaining -= pt + return sizes + + +def decrypt_framed(ciphertext: bytes, dek: bytes, plaintext_size: int) -> bytes: + """Decrypt a (possibly framed) internal part held whole in memory. + + Backward compatible: a legacy single-seal part has only one frame's worth of + overhead, so it is decrypted in a single call via decrypt(). + """ + sizes = _ciphertext_frame_sizes(plaintext_size, len(ciphertext) - plaintext_size) + if len(sizes) == 1: + return decrypt(ciphertext, dek) + out = bytearray() + offset = 0 + for fsize in sizes: + out += decrypt(ciphertext[offset : offset + fsize], dek) + offset += fsize + return bytes(out) + + def wrap_key(dek: bytes, kek: bytes) -> bytes: """Wrap DEK using AES-KWP (Key Wrap with Padding).""" try: diff --git a/s3proxy/handlers/objects/get.py b/s3proxy/handlers/objects/get.py index 1338009..eb9a4f0 100644 --- a/s3proxy/handlers/objects/get.py +++ b/s3proxy/handlers/objects/get.py @@ -464,7 +464,9 @@ async def _fetch_internal_part( f"expected {expected_size} bytes, got {len(ciphertext)}" ) - return crypto.decrypt(ciphertext, dek) + # decrypt_framed transparently handles both legacy single-seal parts + # and multi-frame parts (frame count derived from the stored sizes). + return crypto.decrypt_framed(ciphertext, dek, internal_part.plaintext_size) except ClientError as e: if e.response["Error"]["Code"] == "InvalidRange": diff --git a/tests/unit/test_framed_crypto.py b/tests/unit/test_framed_crypto.py new file mode 100644 index 0000000..015df52 --- /dev/null +++ b/tests/unit/test_framed_crypto.py @@ -0,0 +1,54 @@ +"""Framed internal-part encryption: stream many small GCM frames per S3 part so +memory stays O(frame) regardless of part size, while staying backward compatible +with legacy single-seal parts already stored in S3.""" + +import pytest + +from s3proxy import crypto + +UPLOAD_ID = "u" * 40 +PART = 7 +F = crypto.FRAME_PLAINTEXT_SIZE + + +def _encrypt_framed(plaintext: bytes, dek: bytes) -> bytes: + out = bytearray() + for i in range(0, max(1, len(plaintext)), F): + out += crypto.encrypt_frame(plaintext[i : i + F], dek, UPLOAD_ID, PART, i // F) + return bytes(out) + + +@pytest.mark.parametrize( + "size", + [0, 1, 1024, F - 1, F, F + 1, 2 * F, 3 * F - 17, 5 * F + 123], +) +def test_framed_roundtrip(size): + dek = crypto.generate_dek() + plaintext = bytes((i * 31) % 256 for i in range(size)) if size < 100_000 else b"\xab" * size + + ciphertext = _encrypt_framed(plaintext, dek) + + expected_frames = crypto.frame_count(size) if size else 1 + assert len(ciphertext) == size + expected_frames * crypto.ENCRYPTION_OVERHEAD + assert crypto.framed_ciphertext_size(size) == len(ciphertext) + + assert crypto.decrypt_framed(ciphertext, dek, size) == plaintext + + +def test_legacy_single_seal_decrypts_via_framed(): + """A part written by the old code (one seal over a >8MB plaintext) must still + decrypt through the framed reader — this is the restore-compat guarantee.""" + dek = crypto.generate_dek() + plaintext = b"\x5c" * (3 * F + 99) # would be 4 frames if framed + legacy = crypto.encrypt(plaintext, dek, crypto.derive_part_nonce(UPLOAD_ID, PART)) + + # Legacy overhead is exactly one frame regardless of size. + assert len(legacy) - len(plaintext) == crypto.ENCRYPTION_OVERHEAD + assert crypto.decrypt_framed(legacy, dek, len(plaintext)) == plaintext + + +def test_frame_nonces_unique(): + nonces = {crypto.derive_frame_nonce(UPLOAD_ID, PART, i) for i in range(1000)} + assert len(nonces) == 1000 + # And distinct from the legacy part nonce so an upload never reuses one. + assert crypto.derive_part_nonce(UPLOAD_ID, PART) not in nonces From bcc5fe746da4a2c1e5da1db53f3cd5d0dcb7d0e6 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Sun, 14 Jun 2026 21:43:09 +0200 Subject: [PATCH 2/8] feat(upload-part): stream framed parts for unsigned uploads (O(frame) memory) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a memory-bounded UploadPart path for unsigned, known-length streams (e.g. barman/Sentry backups). Instead of buffering a whole internal part (up to 64MB) and holding its plaintext + ciphertext together to encrypt it, the body is read once and each internal part is uploaded as a stream of 8MB AES-GCM frames (crypto.encrypt_frame). Peak memory is O(FRAME_PLAINTEXT_SIZE) regardless of client part size, so the limiter's per-request estimate is honest without budgeting for whole parts — which is the root-cause fix for the multipart-upload OOMKills. - client.upload_part: accept a streaming (async-iterator) body + explicit ContentLength; payload_signing_enabled=False sends UNSIGNED-PAYLOAD so the body is not read whole to be hashed (integrity via TLS to the backend). - upload_part: dispatch unsigned/known-length parts to _stream_and_upload_framed; aws-chunked/signed parts (size unknown up front) keep the buffered path. - _PlaintextReader pulls exact frame-sized slices without accumulating the part. Reads are already framing-aware (decrypt_framed), and legacy single-seal parts are unaffected, so this is backward compatible. NOTE: the outbound streaming transport (aiobotocore async-iterator body + ContentLength + UNSIGNED-PAYLOAD to Hetzner) must be validated in staging before production rollout; round-trip correctness and the O(frame) memory bound are covered by unit tests. --- s3proxy/client/s3.py | 37 +++-- s3proxy/handlers/multipart/upload_part.py | 177 +++++++++++++++++++-- tests/unit/test_streaming_framed_upload.py | 142 +++++++++++++++++ 3 files changed, 328 insertions(+), 28 deletions(-) create mode 100644 tests/unit/test_streaming_framed_upload.py diff --git a/s3proxy/client/s3.py b/s3proxy/client/s3.py index a7518f7..1c4739d 100644 --- a/s3proxy/client/s3.py +++ b/s3proxy/client/s3.py @@ -3,6 +3,7 @@ from __future__ import annotations import time +from collections.abc import AsyncIterator from typing import TYPE_CHECKING, Any import aioboto3 @@ -53,7 +54,11 @@ def __init__(self, settings: Settings, credentials: S3Credentials): self.credentials = credentials self._config = Config( signature_version="s3v4", - s3={"addressing_style": "path"}, + # payload_signing_enabled=False sends x-amz-content-sha256: UNSIGNED-PAYLOAD, + # which is required to stream a non-seekable body (a framed UploadPart) + # without botocore reading it whole to hash it. Integrity is provided by + # TLS to the backend. + s3={"addressing_style": "path", "payload_signing_enabled": False}, retries={"max_attempts": 3, "mode": "adaptive"}, max_pool_connections=100, connect_timeout=10, @@ -183,19 +188,29 @@ async def upload_part( key: str, upload_id: str, part_number: int, - body: bytes, + body: bytes | AsyncIterator[bytes], + content_length: int | None = None, ) -> dict[str, Any]: - """Upload a part.""" + """Upload a part. + + body may be raw bytes or an async iterator of byte chunks (a streamed, + framed part). When streaming, content_length must be supplied so S3 gets a + Content-Length header. + """ start = time.monotonic() - result = await self._cached_client.upload_part( - Bucket=bucket, - Key=key, - UploadId=upload_id, - PartNumber=part_number, - Body=body, - ) + kwargs: dict[str, Any] = { + "Bucket": bucket, + "Key": key, + "UploadId": upload_id, + "PartNumber": part_number, + "Body": body, + } + if content_length is not None: + kwargs["ContentLength"] = content_length + result = await self._cached_client.upload_part(**kwargs) duration = time.monotonic() - start - size_mb = len(body) / 1024 / 1024 + size = content_length if content_length is not None else len(body) + size_mb = size / 1024 / 1024 logger.debug( "S3 upload_part completed", bucket=bucket, diff --git a/s3proxy/handlers/multipart/upload_part.py b/s3proxy/handlers/multipart/upload_part.py index a8fae1d..a58ade4 100644 --- a/s3proxy/handlers/multipart/upload_part.py +++ b/s3proxy/handlers/multipart/upload_part.py @@ -32,6 +32,32 @@ MAX_PARALLEL_INTERNAL_UPLOADS = 2 +class _PlaintextReader: + """Pulls exactly-sized plaintext slices from a chunked byte stream. + + Buffers at most one frame plus one inbound chunk, so reading a large part in + frames never accumulates the whole part. + """ + + def __init__(self, source: AsyncIterator[bytes]) -> None: + self._it = aiter(source) + self._buf = bytearray() + self._eof = False + + async def read(self, n: int) -> bytes: + while len(self._buf) < n and not self._eof: + try: + chunk = await self._it.__anext__() + except StopAsyncIteration: + self._eof = True + break + if chunk: + self._buf.extend(chunk) + out = bytes(memoryview(self._buf)[:n]) + del self._buf[:n] + return out + + class UploadPartMixin(BaseHandler): async def handle_upload_part(self, request: Request, creds: S3Credentials) -> Response: """Memory usage is O(MAX_BUFFER_SIZE) regardless of client part size.""" @@ -93,23 +119,41 @@ async def handle_upload_part(self, request: Request, creds: S3Credentials) -> Re ) try: - result = await self._stream_and_upload( - request, - client, - bucket, - key, - upload_id, - part_num, - state, - content_sha, - content_length, - is_unsigned, - is_streaming_sig, - is_large_signed, - needs_chunked_decode, - optimal_part_size, - internal_part_start, - ) + # Unsigned, known-length streams (e.g. backups) can be uploaded + # frame-by-frame with O(frame) memory. Other encodings (aws-chunked, + # signed) don't know the part size up front, so they keep the + # buffered path. + if is_unsigned and not needs_chunked_decode and content_length > 0: + result = await self._stream_and_upload_framed( + request, + client, + bucket, + key, + upload_id, + part_num, + state, + content_length, + optimal_part_size, + internal_part_start, + ) + else: + result = await self._stream_and_upload( + request, + client, + bucket, + key, + upload_id, + part_num, + state, + content_sha, + content_length, + is_unsigned, + is_streaming_sig, + is_large_signed, + needs_chunked_decode, + optimal_part_size, + internal_part_start, + ) # Late signature verification for large signed uploads if is_large_signed and content_sha and result["computed_sha256"] != content_sha: @@ -299,6 +343,105 @@ async def _stream_and_upload( "computed_sha256": sha256_hash.hexdigest(), } + async def _stream_and_upload_framed( + self, + request: Request, + client: S3Client, + bucket: str, + key: str, + upload_id: str, + part_num: int, + state: MultipartUploadState, + content_length: int, + optimal_part_size: int, + internal_part_start: int, + ) -> dict[str, str | int]: + """Memory-bounded UploadPart for unsigned, known-length streams. + + Reads the body once, splitting it into internal S3 parts of + optimal_part_size, and uploads each part as a stream of 8MB AES-GCM frames + (see crypto.encrypt_frame). Peak memory is O(FRAME_PLAINTEXT_SIZE) + regardless of the client part size, so the limiter's per-request estimate + is honest without reserving whole parts. + """ + md5_hash = hashlib.md5(usedforsecurity=False) + sha256_hash = hashlib.sha256() + reader = _PlaintextReader(request.stream()) + internal_parts: list[InternalPartMetadata] = [] + total_ciphertext_size = 0 + internal_part_num = internal_part_start + remaining_total = content_length + + while remaining_total > 0: + part_pt_size = min(optimal_part_size, remaining_total) + ct_size = crypto.framed_ciphertext_size(part_pt_size) + ipn = internal_part_num + + async def part_body(pt_size: int, ipn: int) -> AsyncIterator[bytes]: + remaining = pt_size + frame_idx = 0 + while remaining > 0: + frame_pt = await reader.read(min(crypto.FRAME_PLAINTEXT_SIZE, remaining)) + if not frame_pt: + break + md5_hash.update(frame_pt) + sha256_hash.update(frame_pt) + remaining -= len(frame_pt) + yield crypto.encrypt_frame(frame_pt, state.dek, upload_id, ipn, frame_idx) + frame_idx += 1 + + upload_start = time.monotonic() + resp = await client.upload_part( + bucket, key, upload_id, ipn, part_body(part_pt_size, ipn), content_length=ct_size + ) + etag = resp["ETag"].strip('"') + logger.info( + "INTERNAL_PART_UPLOADED", + bucket=bucket, + key=key, + client_part=part_num, + internal_part=ipn, + plaintext_mb=f"{part_pt_size / 1024 / 1024:.2f}MB", + elapsed_sec=f"{time.monotonic() - upload_start:.2f}s", + ) + + internal_parts.append( + InternalPartMetadata( + internal_part_number=ipn, + plaintext_size=part_pt_size, + ciphertext_size=ct_size, + etag=etag, + ) + ) + total_ciphertext_size += ct_size + internal_part_num += 1 + remaining_total -= part_pt_size + + client_etag = md5_hash.hexdigest() + part_meta = PartMetadata( + part_number=part_num, + plaintext_size=content_length, + ciphertext_size=total_ciphertext_size, + etag=client_etag, + md5=client_etag, + internal_parts=internal_parts, + ) + try: + await self.multipart_manager.add_part(bucket, key, upload_id, part_meta) + except StateMissingError: + await self._recover_upload_state( + client, bucket, key, upload_id, context="after part upload" + ) + await self.multipart_manager.add_part(bucket, key, upload_id, part_meta) + + return { + "client_etag": client_etag, + "total_plaintext_size": content_length, + "total_ciphertext_size": total_ciphertext_size, + "internal_parts_count": len(internal_parts), + "computed_sha256": sha256_hash.hexdigest(), + } + async def _get_stream_source( self, request: Request, diff --git a/tests/unit/test_streaming_framed_upload.py b/tests/unit/test_streaming_framed_upload.py new file mode 100644 index 0000000..b10e83f --- /dev/null +++ b/tests/unit/test_streaming_framed_upload.py @@ -0,0 +1,142 @@ +"""Streaming framed UploadPart: unsigned, known-length parts are uploaded as a +stream of 8MB AES-GCM frames, so write memory is O(frame) regardless of part +size, and the result decrypts back to the original bytes.""" + +import hashlib +import os +import tracemalloc +import types + +import pytest + +from s3proxy import crypto +from s3proxy.handlers.multipart.upload_part import UploadPartMixin + +UPLOAD_ID = "u" * 40 + + +class _Request: + def __init__(self, data: bytes, chunk: int = 1 << 20): + self._data, self._chunk = data, chunk + + async def stream(self): + for i in range(0, len(self._data), self._chunk): + yield self._data[i : i + self._chunk] + + +class _Manager: + def __init__(self): + self.part_meta = None + + async def add_part(self, bucket, key, upload_id, part_meta): + self.part_meta = part_meta + + +def _handler(manager): + h = UploadPartMixin.__new__(UploadPartMixin) + h.multipart_manager = manager + return h + + +@pytest.mark.asyncio +async def test_framed_upload_roundtrips_and_sets_metadata(): + dek = crypto.generate_dek() + optimal = 12 * 1024 * 1024 # > FRAME, so parts span multiple frames + plaintext = os.urandom(30 * 1024 * 1024) # -> parts of 12, 12, 6 MB + + captured: dict[int, bytes] = {} + + class _Client: + async def upload_part(self, bucket, key, upload_id, part_number, body, content_length=None): + buf = bytearray() + async for frame in body: + buf.extend(frame) + assert content_length == len(buf) # Content-Length matches streamed bytes + captured[part_number] = bytes(buf) + return {"ETag": f'"{part_number:032x}"'} + + mgr = _Manager() + result = await _handler(mgr)._stream_and_upload_framed( + _Request(plaintext), + _Client(), + "b", + "k", + UPLOAD_ID, + 1, + types.SimpleNamespace(dek=dek), + len(plaintext), + optimal, + 1, + ) + + # Three internal parts of the planned sizes. + sizes = [m.plaintext_size for m in mgr.part_meta.internal_parts] + assert sizes == [12 * 1024 * 1024, 12 * 1024 * 1024, 6 * 1024 * 1024] + + # Each part decrypts via the framed reader; concatenation is the original. + recovered = b"".join( + crypto.decrypt_framed(captured[m.internal_part_number], dek, m.plaintext_size) + for m in sorted(mgr.part_meta.internal_parts, key=lambda m: m.internal_part_number) + ) + assert recovered == plaintext + + assert result["client_etag"] == hashlib.md5(plaintext, usedforsecurity=False).hexdigest() + assert result["computed_sha256"] == hashlib.sha256(plaintext).hexdigest() + assert result["total_plaintext_size"] == len(plaintext) + + +class _DiscardingClient: + async def upload_part(self, bucket, key, upload_id, part_number, body, content_length=None): + async for _frame in body: # consume + discard, like a real streaming upload + pass + return {"ETag": f'"{part_number:032x}"'} + + +class _ZeroRequest: + """Streams `size` bytes from a fixed block so the handler — not the test — + owns whatever memory is held.""" + + def __init__(self, size: int): + self._size = size + + async def stream(self): + sent, block = 0, bytes(1 << 20) + while sent < self._size: + n = min(1 << 20, self._size - sent) + sent += n + yield block[:n] + + +async def _measure_framed_peak(part_size: int) -> int: + handler = _handler(_Manager()) + tracemalloc.start() + tracemalloc.reset_peak() + await handler._stream_and_upload_framed( + _ZeroRequest(part_size), + _DiscardingClient(), + "b", + "k", + UPLOAD_ID, + 1, + types.SimpleNamespace(dek=crypto.generate_dek()), + part_size, + part_size, + 1, + ) + _, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + return peak + + +@pytest.mark.asyncio +async def test_framed_upload_memory_is_independent_of_part_size(): + """The defining property: a 256MB part peaks no higher than a 64MB part + (memory is bounded by the frame, not the part) and far below the part size. + The legacy whole-part path peaked at ~2x the part size.""" + small = await _measure_framed_peak(64 * 1024 * 1024) + large = await _measure_framed_peak(256 * 1024 * 1024) + + # Quadrupling the part size must not meaningfully raise the peak. + assert large <= small + crypto.FRAME_PLAINTEXT_SIZE + # And the peak is a handful of frames, nowhere near the 256MB part. + assert large < 8 * crypto.FRAME_PLAINTEXT_SIZE, f"peak {large / 1024 / 1024:.1f}MB" From c9b77d6727b54884dda94d80eb749b9e410a0b95 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 15 Jun 2026 09:00:37 +0200 Subject: [PATCH 3/8] fix(upload-part): route large signed uploads to framed streaming path The postgres e2e repro showed barman uses large signed UploadParts, not UNSIGNED-PAYLOAD, so PR2's framed path never ran and pods still OOMKilled. Extend the framed gate to is_large_signed (known Content-Length, direct body stream) and log upload_path plus encoding flags at INFO so we can see which path each request takes without DEBUG plumbing. Also fix the e2e OOM assertion to use the chart's app.kubernetes.io labels. Co-authored-by: Cursor --- e2e/postgres/test.sh | 19 +++++++++++++++- s3proxy/handlers/multipart/upload_part.py | 27 ++++++++++++++++------- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/e2e/postgres/test.sh b/e2e/postgres/test.sh index 5b099a0..4682ad0 100755 --- a/e2e/postgres/test.sh +++ b/e2e/postgres/test.sh @@ -20,7 +20,7 @@ fi NAMESPACE="postgres-test" export CLUSTER_NAME="pg-cluster" -DATA_SIZE_GB=2 +DATA_SIZE_GB=3 SCALE_FACTOR=$((DATA_SIZE_GB * 70)) # pgbench scale: ~15MB per scale factor # Colors for output @@ -217,6 +217,23 @@ kubectl wait --namespace "$NAMESPACE" \ log_info "Backup completed!" kubectl get backup -n "$NAMESPACE" ${CLUSTER_NAME}-backup-1 -o yaml | grep -A5 "status:" +# ============================================================================ +# STEP 4b: Assert the s3proxy pods survived (the OOM we are gating against) +# ============================================================================ +log_info "=== Checking s3proxy pods were not OOM-killed during backup ===" +oom_found=0 +for p in $(kubectl get pods -n s3proxy -l app.kubernetes.io/name=s3proxy-python,app.kubernetes.io/component=server -o name); do + rc=$(kubectl get -n s3proxy "$p" -o jsonpath='{.status.containerStatuses[0].restartCount}' 2>/dev/null || echo 0) + reason=$(kubectl get -n s3proxy "$p" -o jsonpath='{.status.containerStatuses[0].lastState.terminated.reason}' 2>/dev/null || echo "") + log_info " $p restarts=${rc:-0} lastTerminated=${reason:-none}" + if [ "${reason}" = "OOMKilled" ] || [ "${rc:-0}" -gt 0 ]; then oom_found=1; fi +done +if [ "$oom_found" -eq 1 ]; then + log_error "s3proxy was OOM-killed/restarted during backup" + exit 1 +fi +log_info "✓ s3proxy survived the backup" + # ============================================================================ # STEP 5: Verify encryption + Delete cluster + Create new cluster (ALL PARALLEL) # ============================================================================ diff --git a/s3proxy/handlers/multipart/upload_part.py b/s3proxy/handlers/multipart/upload_part.py index a58ade4..b3f5e3f 100644 --- a/s3proxy/handlers/multipart/upload_part.py +++ b/s3proxy/handlers/multipart/upload_part.py @@ -100,6 +100,11 @@ async def handle_upload_part(self, request: Request, creds: S3Credentials) -> Re optimal_part_size = crypto.calculate_optimal_part_size(content_length) estimated_parts = max(1, (content_length + optimal_part_size - 1) // optimal_part_size) + use_framed = ( + (is_unsigned or is_large_signed) + and not needs_chunked_decode + and content_length > 0 + ) logger.info( "UPLOAD_PART_CONFIG", bucket=bucket, @@ -107,6 +112,11 @@ async def handle_upload_part(self, request: Request, creds: S3Credentials) -> Re part_number=part_num, optimal_part_size_mb=f"{optimal_part_size / 1024 / 1024:.2f}MB", estimated_internal_parts=estimated_parts, + is_unsigned=is_unsigned, + is_large_signed=is_large_signed, + is_streaming_sig=is_streaming_sig, + needs_chunked_decode=needs_chunked_decode, + upload_path="framed" if use_framed else "buffered", ) # Allocate internal part numbers @@ -119,11 +129,11 @@ async def handle_upload_part(self, request: Request, creds: S3Credentials) -> Re ) try: - # Unsigned, known-length streams (e.g. backups) can be uploaded - # frame-by-frame with O(frame) memory. Other encodings (aws-chunked, - # signed) don't know the part size up front, so they keep the - # buffered path. - if is_unsigned and not needs_chunked_decode and content_length > 0: + # Known-length direct streams (unsigned or large signed, e.g. barman + # backups) can be uploaded frame-by-frame with O(frame) memory. + # aws-chunked / streaming-sig bodies don't know the size up front and + # keep the buffered path. + if use_framed: result = await self._stream_and_upload_framed( request, client, @@ -356,13 +366,14 @@ async def _stream_and_upload_framed( optimal_part_size: int, internal_part_start: int, ) -> dict[str, str | int]: - """Memory-bounded UploadPart for unsigned, known-length streams. + """Memory-bounded UploadPart for known-length direct streams. + Used for unsigned and large signed uploads (e.g. barman backups) where + Content-Length is known and the body is read directly (not aws-chunked). Reads the body once, splitting it into internal S3 parts of optimal_part_size, and uploads each part as a stream of 8MB AES-GCM frames (see crypto.encrypt_frame). Peak memory is O(FRAME_PLAINTEXT_SIZE) - regardless of the client part size, so the limiter's per-request estimate - is honest without reserving whole parts. + regardless of the client part size. """ md5_hash = hashlib.md5(usedforsecurity=False) sha256_hash = hashlib.sha256() From 3bbe06706017e18ed2cea60158b6bdd39591fd70 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 15 Jun 2026 09:28:41 +0200 Subject: [PATCH 4/8] fix(upload-part): frame encrypt into bytes, upload via standard client The async streaming upload_part body failed in the postgres e2e (500s before any internal part uploaded). Encrypt each internal part frame-by-frame into a ciphertext buffer and upload as bytes instead, keeping signed payloads and the existing aiobotocore transport. Also route large signed uploads to the framed path and log upload_path at INFO for observability. Co-authored-by: Cursor --- s3proxy/client/s3.py | 36 +++++---------- s3proxy/handlers/multipart/upload_part.py | 51 ++++++++++++++-------- tests/unit/test_streaming_framed_upload.py | 41 ++++++++--------- 3 files changed, 61 insertions(+), 67 deletions(-) diff --git a/s3proxy/client/s3.py b/s3proxy/client/s3.py index 1c4739d..ac294c9 100644 --- a/s3proxy/client/s3.py +++ b/s3proxy/client/s3.py @@ -3,7 +3,6 @@ from __future__ import annotations import time -from collections.abc import AsyncIterator from typing import TYPE_CHECKING, Any import aioboto3 @@ -54,11 +53,7 @@ def __init__(self, settings: Settings, credentials: S3Credentials): self.credentials = credentials self._config = Config( signature_version="s3v4", - # payload_signing_enabled=False sends x-amz-content-sha256: UNSIGNED-PAYLOAD, - # which is required to stream a non-seekable body (a framed UploadPart) - # without botocore reading it whole to hash it. Integrity is provided by - # TLS to the backend. - s3={"addressing_style": "path", "payload_signing_enabled": False}, + s3={"addressing_style": "path"}, retries={"max_attempts": 3, "mode": "adaptive"}, max_pool_connections=100, connect_timeout=10, @@ -188,28 +183,19 @@ async def upload_part( key: str, upload_id: str, part_number: int, - body: bytes | AsyncIterator[bytes], - content_length: int | None = None, + body: bytes, ) -> dict[str, Any]: - """Upload a part. - - body may be raw bytes or an async iterator of byte chunks (a streamed, - framed part). When streaming, content_length must be supplied so S3 gets a - Content-Length header. - """ + """Upload a part.""" start = time.monotonic() - kwargs: dict[str, Any] = { - "Bucket": bucket, - "Key": key, - "UploadId": upload_id, - "PartNumber": part_number, - "Body": body, - } - if content_length is not None: - kwargs["ContentLength"] = content_length - result = await self._cached_client.upload_part(**kwargs) + result = await self._cached_client.upload_part( + Bucket=bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + Body=body, + ) duration = time.monotonic() - start - size = content_length if content_length is not None else len(body) + size = len(body) size_mb = size / 1024 / 1024 logger.debug( "S3 upload_part completed", diff --git a/s3proxy/handlers/multipart/upload_part.py b/s3proxy/handlers/multipart/upload_part.py index b3f5e3f..3fdb9aa 100644 --- a/s3proxy/handlers/multipart/upload_part.py +++ b/s3proxy/handlers/multipart/upload_part.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import gc import hashlib import time from collections import deque @@ -371,9 +372,10 @@ async def _stream_and_upload_framed( Used for unsigned and large signed uploads (e.g. barman backups) where Content-Length is known and the body is read directly (not aws-chunked). Reads the body once, splitting it into internal S3 parts of - optimal_part_size, and uploads each part as a stream of 8MB AES-GCM frames - (see crypto.encrypt_frame). Peak memory is O(FRAME_PLAINTEXT_SIZE) - regardless of the client part size. + optimal_part_size. Each internal part is encrypted frame-by-frame + (see crypto.encrypt_frame) into a ciphertext buffer, then uploaded as + bytes. Peak memory is O(part ciphertext + FRAME_PLAINTEXT_SIZE) — one + frame of plaintext at a time, not plaintext + ciphertext together. """ md5_hash = hashlib.md5(usedforsecurity=False) sha256_hash = hashlib.sha256() @@ -388,23 +390,27 @@ async def _stream_and_upload_framed( ct_size = crypto.framed_ciphertext_size(part_pt_size) ipn = internal_part_num - async def part_body(pt_size: int, ipn: int) -> AsyncIterator[bytes]: - remaining = pt_size - frame_idx = 0 - while remaining > 0: - frame_pt = await reader.read(min(crypto.FRAME_PLAINTEXT_SIZE, remaining)) - if not frame_pt: - break - md5_hash.update(frame_pt) - sha256_hash.update(frame_pt) - remaining -= len(frame_pt) - yield crypto.encrypt_frame(frame_pt, state.dek, upload_id, ipn, frame_idx) - frame_idx += 1 + ciphertext = bytearray() + remaining = part_pt_size + frame_idx = 0 + while remaining > 0: + frame_pt = await reader.read(min(crypto.FRAME_PLAINTEXT_SIZE, remaining)) + if not frame_pt: + break + md5_hash.update(frame_pt) + sha256_hash.update(frame_pt) + remaining -= len(frame_pt) + ciphertext.extend( + crypto.encrypt_frame(frame_pt, state.dek, upload_id, ipn, frame_idx) + ) + frame_idx += 1 + + part_ciphertext = bytes(ciphertext) + del ciphertext + gc.collect() upload_start = time.monotonic() - resp = await client.upload_part( - bucket, key, upload_id, ipn, part_body(part_pt_size, ipn), content_length=ct_size - ) + resp = await client.upload_part(bucket, key, upload_id, ipn, part_ciphertext) etag = resp["ETag"].strip('"') logger.info( "INTERNAL_PART_UPLOADED", @@ -601,5 +607,12 @@ def _handle_client_error( def _handle_generic_error( self, e: Exception, bucket: str, key: str, part_num: int, upload_id: str ) -> NoReturn: - logger.error("UPLOAD_PART_ERROR", bucket=bucket, key=key, part_num=part_num) + logger.error( + "UPLOAD_PART_ERROR", + bucket=bucket, + key=key, + part_num=part_num, + error=str(e), + error_type=type(e).__name__, + ) raise_for_exception(e) diff --git a/tests/unit/test_streaming_framed_upload.py b/tests/unit/test_streaming_framed_upload.py index b10e83f..7059ff2 100644 --- a/tests/unit/test_streaming_framed_upload.py +++ b/tests/unit/test_streaming_framed_upload.py @@ -1,6 +1,6 @@ -"""Streaming framed UploadPart: unsigned, known-length parts are uploaded as a -stream of 8MB AES-GCM frames, so write memory is O(frame) regardless of part -size, and the result decrypts back to the original bytes.""" +"""Framed UploadPart: known-length parts are encrypted frame-by-frame into a +ciphertext buffer and uploaded as bytes. Write memory is O(part ciphertext + +frame), not O(2× part), and the result decrypts back to the original bytes.""" import hashlib import os @@ -47,12 +47,8 @@ async def test_framed_upload_roundtrips_and_sets_metadata(): captured: dict[int, bytes] = {} class _Client: - async def upload_part(self, bucket, key, upload_id, part_number, body, content_length=None): - buf = bytearray() - async for frame in body: - buf.extend(frame) - assert content_length == len(buf) # Content-Length matches streamed bytes - captured[part_number] = bytes(buf) + async def upload_part(self, bucket, key, upload_id, part_number, body): + captured[part_number] = body return {"ETag": f'"{part_number:032x}"'} mgr = _Manager() @@ -86,9 +82,8 @@ async def upload_part(self, bucket, key, upload_id, part_number, body, content_l class _DiscardingClient: - async def upload_part(self, bucket, key, upload_id, part_number, body, content_length=None): - async for _frame in body: # consume + discard, like a real streaming upload - pass + async def upload_part(self, bucket, key, upload_id, part_number, body): + del body return {"ETag": f'"{part_number:032x}"'} @@ -107,20 +102,20 @@ async def stream(self): yield block[:n] -async def _measure_framed_peak(part_size: int) -> int: +async def _measure_framed_peak(client_part_size: int) -> int: handler = _handler(_Manager()) tracemalloc.start() tracemalloc.reset_peak() await handler._stream_and_upload_framed( - _ZeroRequest(part_size), + _ZeroRequest(client_part_size), _DiscardingClient(), "b", "k", UPLOAD_ID, 1, types.SimpleNamespace(dek=crypto.generate_dek()), - part_size, - part_size, + client_part_size, + crypto.PART_SIZE, # same internal part size barman uses for 512MB parts 1, ) _, peak = tracemalloc.get_traced_memory() @@ -130,13 +125,13 @@ async def _measure_framed_peak(part_size: int) -> int: @pytest.mark.asyncio async def test_framed_upload_memory_is_independent_of_part_size(): - """The defining property: a 256MB part peaks no higher than a 64MB part - (memory is bounded by the frame, not the part) and far below the part size. - The legacy whole-part path peaked at ~2x the part size.""" + """A 256MB client part peaks no higher than a 64MB one: ciphertext is built + one internal part at a time, not scaled by total client part size.""" small = await _measure_framed_peak(64 * 1024 * 1024) large = await _measure_framed_peak(256 * 1024 * 1024) - # Quadrupling the part size must not meaningfully raise the peak. - assert large <= small + crypto.FRAME_PLAINTEXT_SIZE - # And the peak is a handful of frames, nowhere near the 256MB part. - assert large < 8 * crypto.FRAME_PLAINTEXT_SIZE, f"peak {large / 1024 / 1024:.1f}MB" + # Must stay below the old buffered path (~2× part_size ≈ 257MB per request). + assert small < 220 * 1024 * 1024, f"small peak {small / 1024 / 1024:.1f}MB" + assert large < 220 * 1024 * 1024, f"large peak {large / 1024 / 1024:.1f}MB" + # Larger client part must not scale memory linearly with part count. + assert large <= small * 1.5 + crypto.FRAME_PLAINTEXT_SIZE From 0d0ca4f4ee358056d322eaae14fb5e2a837c3820 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 15 Jun 2026 09:44:59 +0200 Subject: [PATCH 5/8] test(e2e/postgres): mirror prod barman upload settings for OOM repro MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Configure barman with jobs=4 and --min-chunk-size=512MB (matching production) and disable base-backup compression so the ~3GB dataset spans multiple 512MB chunks. Together this exercises concurrent large multipart UploadParts through the proxy — the path that caused OOMKills. Co-authored-by: Cursor --- e2e/postgres/templates/postgres-cluster.yaml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/e2e/postgres/templates/postgres-cluster.yaml b/e2e/postgres/templates/postgres-cluster.yaml index 8434ce6..7378463 100644 --- a/e2e/postgres/templates/postgres-cluster.yaml +++ b/e2e/postgres/templates/postgres-cluster.yaml @@ -36,5 +36,12 @@ spec: wal: compression: gzip data: - compression: gzip + # Compression intentionally OFF: pgbench data is highly compressible, so + # gzip would shrink the backup below one 512MB chunk and we'd never get + # concurrent large uploads. Uncompressed, the dataset spans several 512MB + # chunks which (with jobs=4) reproduces the multipart-upload OOM. + # 4 parallel upload jobs pushing 512MB chunks mirrors production. + jobs: 4 + additionalCommandArgs: + - "--min-chunk-size=512MB" retentionPolicy: "7d" From ac7edc424603eae2fadbfe75d9bd4b82243f3acc Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 15 Jun 2026 11:57:00 +0200 Subject: [PATCH 6/8] fix(get): fetch and decrypt framed internal parts at frame granularity Frame-level GET keeps restore reads within the 64MB pod memory budget when Barman pulls large backup objects, and adds regression tests for the path. Also fix Python 3.14 except-tuple syntax across the proxy. Co-authored-by: Cursor --- e2e/postgres/templates/postgres-cluster.yaml | 1 - s3proxy/concurrency.py | 2 +- s3proxy/crypto.py | 5 ++ s3proxy/dashboard/collectors.py | 2 +- s3proxy/dashboard/stats_store.py | 8 +-- s3proxy/handlers/objects/get.py | 68 +++++++++++++------- s3proxy/request_handler.py | 2 +- s3proxy/utils.py | 2 +- tests/integration/test_get_prefetch.py | 67 +++++++++++++++++++ tests/unit/test_framed_crypto.py | 8 +++ 10 files changed, 134 insertions(+), 31 deletions(-) diff --git a/e2e/postgres/templates/postgres-cluster.yaml b/e2e/postgres/templates/postgres-cluster.yaml index 7378463..6f6adfe 100644 --- a/e2e/postgres/templates/postgres-cluster.yaml +++ b/e2e/postgres/templates/postgres-cluster.yaml @@ -40,7 +40,6 @@ spec: # gzip would shrink the backup below one 512MB chunk and we'd never get # concurrent large uploads. Uncompressed, the dataset spans several 512MB # chunks which (with jobs=4) reproduces the multipart-upload OOM. - # 4 parallel upload jobs pushing 512MB chunks mirrors production. jobs: 4 additionalCommandArgs: - "--min-chunk-size=512MB" diff --git a/s3proxy/concurrency.py b/s3proxy/concurrency.py index 4a4504f..18cbd59 100644 --- a/s3proxy/concurrency.py +++ b/s3proxy/concurrency.py @@ -35,7 +35,7 @@ def _create_malloc_release() -> Callable[[], int] | None: libc.malloc_trim.argtypes = [ctypes.c_size_t] libc.malloc_trim.restype = ctypes.c_int return lambda: libc.malloc_trim(0) - except OSError, AttributeError: + except (OSError, AttributeError): return None diff --git a/s3proxy/crypto.py b/s3proxy/crypto.py index eb82b8a..b25d393 100644 --- a/s3proxy/crypto.py +++ b/s3proxy/crypto.py @@ -192,6 +192,11 @@ def encrypt_frame( return encrypt(plaintext, dek, derive_frame_nonce(upload_id, part_number, frame_index)) +def ciphertext_frame_byte_sizes(plaintext_size: int, ciphertext_size: int) -> list[int]: + """Ciphertext byte length of each frame in a (possibly framed) internal part.""" + return _ciphertext_frame_sizes(plaintext_size, ciphertext_size - plaintext_size) + + def _ciphertext_frame_sizes(plaintext_size: int, stored_overhead: int) -> list[int]: """Ciphertext byte length of each frame, derived from the stored sizes. diff --git a/s3proxy/dashboard/collectors.py b/s3proxy/dashboard/collectors.py index afad1e4..79f968e 100644 --- a/s3proxy/dashboard/collectors.py +++ b/s3proxy/dashboard/collectors.py @@ -134,7 +134,7 @@ def _latency_percentiles(cumulative: dict[str, float] | None = None) -> dict[str continue try: buckets.append((float(le), float(count))) - except ValueError, TypeError: + except (ValueError, TypeError): continue if total < 1 and buckets: total = max(c for _, c in buckets) diff --git a/s3proxy/dashboard/stats_store.py b/s3proxy/dashboard/stats_store.py index d581b5e..8f21391 100644 --- a/s3proxy/dashboard/stats_store.py +++ b/s3proxy/dashboard/stats_store.py @@ -561,7 +561,7 @@ async def series(self, metric: str, range_key: str) -> tuple[list[float], list[f for k, v in raw.items(): try: points.append((int(k), float(v))) - except ValueError, TypeError: + except (ValueError, TypeError): continue return bucket_series(points, window, bucket) @@ -642,7 +642,7 @@ def _hfloat(h: dict, field: bytes) -> float: return 0.0 try: return float(v) - except ValueError, TypeError: + except (ValueError, TypeError): return 0.0 @@ -652,7 +652,7 @@ def _decode_float_map(h: dict) -> dict[str, float]: key = k.decode() if isinstance(k, bytes) else str(k) try: out[key] = float(v) - except ValueError, TypeError: + except (ValueError, TypeError): continue return out @@ -665,7 +665,7 @@ def _loads_sample(raw: bytes) -> RequestSample | None: try: d = orjson.loads(raw) return RequestSample(**d) - except ValueError, TypeError: + except (ValueError, TypeError): return None diff --git a/s3proxy/handlers/objects/get.py b/s3proxy/handlers/objects/get.py index eb9a4f0..cf00dc1 100644 --- a/s3proxy/handlers/objects/get.py +++ b/s3proxy/handlers/objects/get.py @@ -336,7 +336,9 @@ async def _stream_internal_parts( # absolute ciphertext bounds and the plaintext slice to emit. The prefetch # below only ever looks ahead within this filtered list, so it never # fetches a part the range would skip. - needed: list[tuple[Any, int, int, int, int]] = [] + # Frame-level fetches keep memory O(frame) so 64MB internal parts stay + # within a 64MB pod budget (whole-part decrypt would reserve ~2× part). + needed: list[tuple[int, int, int, int, int, int]] = [] ct_offset = ct_start pt_offset = 0 for ip in sorted(part_meta.internal_parts, key=lambda p: p.internal_part_number): @@ -347,26 +349,49 @@ async def _stream_internal_parts( continue if pt_offset > off_end: # entirely after the range break - ct_end = ct_offset + ip.ciphertext_size - 1 - self._validate_ciphertext_range( - bucket, key, part_num, ip.internal_part_number, ct_end, actual_size - ) - slice_start = max(0, off_start - pt_offset) - slice_end = min(ip.plaintext_size, off_end - pt_offset + 1) - needed.append((ip, ct_offset, ct_end, slice_start, slice_end)) + + frame_sizes = crypto.ciphertext_frame_byte_sizes(ip.plaintext_size, ip.ciphertext_size) + frame_pt_offset = 0 + frame_ct_offset = 0 + for fsize in frame_sizes: + fpt_size = fsize - crypto.ENCRYPTION_OVERHEAD + frame_global_start = pt_offset + frame_pt_offset + frame_global_end = frame_global_start + fpt_size - 1 + if frame_global_end < off_start: + frame_pt_offset += fpt_size + frame_ct_offset += fsize + continue + if frame_global_start > off_end: + break + + abs_ct_start = ct_offset + frame_ct_offset + abs_ct_end = abs_ct_start + fsize - 1 + self._validate_ciphertext_range( + bucket, key, part_num, ip.internal_part_number, abs_ct_end, actual_size + ) + slice_start = max(0, off_start - frame_global_start) + slice_end = min(fpt_size, off_end - frame_global_start + 1) + needed.append( + (ip.internal_part_number, abs_ct_start, abs_ct_end, fsize, slice_start, slice_end) + ) + frame_pt_offset += fpt_size + frame_ct_offset += fsize + ct_offset += ip.ciphertext_size pt_offset += ip.plaintext_size - def fetch(item: tuple[Any, int, int, int, int]) -> Awaitable[bytes]: - ip, c_start, c_end, _, _ = item - return self._fetch_internal_part(client, bucket, key, part_num, ip, c_start, c_end, dek) + def fetch(item: tuple[int, int, int, int, int, int]) -> Awaitable[bytes]: + ipn, c_start, c_end, fsize, _, _ = item + return self._fetch_and_decrypt_frame( + client, bucket, key, part_num, ipn, c_start, c_end, fsize, dek + ) # aclosing() guarantees the prefetch generator's finally (which cancels an # in-flight lookahead and releases its memory reservation) runs when this # stream is torn down — e.g. on client disconnect. async with contextlib.aclosing(self._stream_parts_with_prefetch(needed, fetch)) as stream: async for item, chunk in stream: - _, _, _, slice_start, slice_end = item + *_, slice_start, slice_end = item yield chunk[slice_start:slice_end] async def _stream_parts_with_prefetch( @@ -426,18 +451,19 @@ def _validate_ciphertext_range( f"expects byte {ct_end} but object size is {actual_size}" ) - async def _fetch_internal_part( + async def _fetch_and_decrypt_frame( self, client: S3Client, bucket: str, key: str, part_num: int, - internal_part, + internal_part_num: int, ct_start: int, ct_end: int, + frame_ciphertext_size: int, dek: bytes, ) -> bytes: - expected_size = ct_end - ct_start + 1 + expected_size = frame_ciphertext_size additional = max(0, expected_size * 2 - MAX_BUFFER_SIZE) extra_reserved = 0 try: @@ -454,19 +480,17 @@ async def _fetch_internal_part( bucket=bucket, key=key, part_number=part_num, - internal_part_number=internal_part.internal_part_number, + internal_part_number=internal_part_num, expected_size=expected_size, actual_size=len(ciphertext), ) raise S3Error.internal_error( f"Metadata corruption: part {part_num} " - f"internal part {internal_part.internal_part_number} " + f"internal part {internal_part_num} " f"expected {expected_size} bytes, got {len(ciphertext)}" ) - # decrypt_framed transparently handles both legacy single-seal parts - # and multi-frame parts (frame count derived from the stored sizes). - return crypto.decrypt_framed(ciphertext, dek, internal_part.plaintext_size) + return crypto.decrypt(ciphertext, dek) except ClientError as e: if e.response["Error"]["Code"] == "InvalidRange": @@ -475,12 +499,12 @@ async def _fetch_internal_part( bucket=bucket, key=key, part_number=part_num, - internal_part_number=internal_part.internal_part_number, + internal_part_number=internal_part_num, requested_range=f"{ct_start}-{ct_end}", ) raise S3Error.internal_error( f"Metadata corruption: part {part_num} " - f"internal part {internal_part.internal_part_number} " + f"internal part {internal_part_num} " f"range {ct_start}-{ct_end} invalid" ) from e raise diff --git a/s3proxy/request_handler.py b/s3proxy/request_handler.py index c71fbe5..afe273d 100644 --- a/s3proxy/request_handler.py +++ b/s3proxy/request_handler.py @@ -229,7 +229,7 @@ async def _handle_proxy_request_impl( dispatcher = RequestDispatcher(handler) try: return await dispatcher.dispatch(request, verified_creds) - except HTTPException, S3Error: + except (HTTPException, S3Error): raise except UnknownKidError as e: logger.warning("Cannot decrypt object: key not configured", kid=e.kid) diff --git a/s3proxy/utils.py b/s3proxy/utils.py index c758f6a..12be211 100644 --- a/s3proxy/utils.py +++ b/s3proxy/utils.py @@ -17,7 +17,7 @@ def parse_http_date(date_str: str | None) -> datetime | None: return None try: return parsedate_to_datetime(date_str) - except ValueError, TypeError: + except (ValueError, TypeError): return None diff --git a/tests/integration/test_get_prefetch.py b/tests/integration/test_get_prefetch.py index 809ba7c..f5a57d5 100644 --- a/tests/integration/test_get_prefetch.py +++ b/tests/integration/test_get_prefetch.py @@ -231,3 +231,70 @@ async def test_multipart_get_no_leak_on_client_disconnect(settings, mock_s3, kek await it.aclose() # client goes away mid-stream assert concurrency.get_active_memory() == 0 # lookahead reservation released + + +async def _make_framed_encrypted_multipart(mock_s3, kek, upload_id, part_sizes): + """Like _make_encrypted_multipart but with multi-frame internal parts (barman path).""" + dek = crypto.generate_dek() + wrapped = crypto.wrap_key(dek, kek) + internal_parts, ct_parts, total_ct = [], [], 0 + plaintext = bytearray() + for n, sz in enumerate(part_sizes, start=1): + chunk = bytes([n % 256]) * sz + plaintext.extend(chunk) + ct = bytearray() + for frame_idx in range(0, max(1, sz), crypto.FRAME_PLAINTEXT_SIZE): + frame_pt = chunk[frame_idx : frame_idx + crypto.FRAME_PLAINTEXT_SIZE] + ct += crypto.encrypt_frame(frame_pt, dek, upload_id, n, frame_idx // crypto.FRAME_PLAINTEXT_SIZE) + ct = bytes(ct) + ct_parts.append(ct) + internal_parts.append( + InternalPartMetadata( + internal_part_number=n, + plaintext_size=sz, + ciphertext_size=len(ct), + etag=f"e{n}", + ) + ) + total_ct += len(ct) + part_meta = PartMetadata( + part_number=1, + plaintext_size=len(plaintext), + ciphertext_size=total_ct, + etag="synthetic", + md5="md5", + internal_parts=internal_parts, + ) + meta = MultipartMetadata( + version=2, + part_count=1, + total_plaintext_size=len(plaintext), + parts=[part_meta], + wrapped_dek=wrapped, + kid="AKIAIOSFODNN7EXAMPLE", + ) + await mock_s3.create_bucket("b") + await mock_s3.put_object("b", "k", b"".join(ct_parts)) + await save_multipart_metadata(mock_s3, "b", "k", meta) + return bytes(plaintext) + + +FRAMED_PART = crypto.PART_SIZE # 64MB — would OOM on whole-part decrypt at 64MB budget + + +async def test_framed_64mb_internal_parts_get_under_memory_limit(settings, mock_s3, kek): + """Restore path: framed 64MB internal parts must GET at O(frame) memory.""" + concurrency.set_memory_limit(64) + concurrency.reset_state() + handler = _handler(settings) + handler._client = MagicMock(return_value=mock_s3) + upload_id = "framed-restore-upload" + expected = await _make_framed_encrypted_multipart( + mock_s3, kek, upload_id, [FRAMED_PART, FRAMED_PART // 2] + ) + + resp = await handler.handle_get_object(_get_request(), MagicMock()) + chunks = [c async for c in resp.body_iterator] + + assert b"".join(chunks) == expected + assert concurrency.get_active_memory() == 0 diff --git a/tests/unit/test_framed_crypto.py b/tests/unit/test_framed_crypto.py index 015df52..3171814 100644 --- a/tests/unit/test_framed_crypto.py +++ b/tests/unit/test_framed_crypto.py @@ -47,6 +47,14 @@ def test_legacy_single_seal_decrypts_via_framed(): assert crypto.decrypt_framed(legacy, dek, len(plaintext)) == plaintext +def test_ciphertext_frame_byte_sizes_matches_framed(): + for size in [1, crypto.FRAME_PLAINTEXT_SIZE, crypto.FRAME_PLAINTEXT_SIZE + 1, 3 * crypto.FRAME_PLAINTEXT_SIZE]: + ct = crypto.framed_ciphertext_size(size) + sizes = crypto.ciphertext_frame_byte_sizes(size, ct) + assert sum(sizes) == ct + assert all(s >= crypto.ENCRYPTION_OVERHEAD for s in sizes) + + def test_frame_nonces_unique(): nonces = {crypto.derive_frame_nonce(UPLOAD_ID, PART, i) for i in range(1000)} assert len(nonces) == 1000 From 22848afe3e7e542fb964d660e5825d2c48ec181f Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 15 Jun 2026 13:12:51 +0200 Subject: [PATCH 7/8] fix(e2e/postgres): wait for end WAL before restore and fix ruff E501 Restore was flaky when CNPG marked backup complete before the final WAL segment reached S3; wait for endWal and delete the source cluster only after restore succeeds. Wrap long lines for ruff. Co-authored-by: Cursor --- e2e/postgres/test.sh | 79 ++++++++++++++++++++------ s3proxy/handlers/objects/get.py | 9 ++- tests/integration/test_get_prefetch.py | 3 +- tests/unit/test_framed_crypto.py | 8 ++- 4 files changed, 78 insertions(+), 21 deletions(-) diff --git a/e2e/postgres/test.sh b/e2e/postgres/test.sh index 4682ad0..23349e2 100755 --- a/e2e/postgres/test.sh +++ b/e2e/postgres/test.sh @@ -33,6 +33,44 @@ log_info() { echo -e "${GREEN}[INFO]${NC} $1"; } log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; } log_error() { echo -e "${RED}[ERROR]${NC} $1"; } +S3_ENDPOINT="http://s3proxy-python-frontproxy.s3proxy:80" +S3_WAL_CHECK_POD="s3-wal-check" + +ensure_s3_wal_checker() { + if kubectl get pod -n "$NAMESPACE" "$S3_WAL_CHECK_POD" >/dev/null 2>&1; then + kubectl wait -n "$NAMESPACE" --for=condition=Ready "pod/${S3_WAL_CHECK_POD}" --timeout=120s + return + fi + kubectl run "$S3_WAL_CHECK_POD" --restart=Never -n "$NAMESPACE" \ + --image=amazon/aws-cli:2.15.0 \ + --overrides='{"spec":{"containers":[{"name":"'"$S3_WAL_CHECK_POD"'","image":"amazon/aws-cli:2.15.0","command":["sleep","3600"],"envFrom":[{"secretRef":{"name":"s3-credentials"}}]}]}}' + kubectl wait -n "$NAMESPACE" --for=condition=Ready "pod/${S3_WAL_CHECK_POD}" --timeout=120s +} + +wait_for_end_wal_in_s3() { + local end_wal="$1" + local timeline_prefix="${end_wal:0:16}" + local wal_object="pg-cluster/wals/${timeline_prefix}/${end_wal}.gz" + + log_info "Waiting for backup end WAL in S3: ${wal_object}" + ensure_s3_wal_checker + + local deadline=$((SECONDS + 600)) + while [ "$SECONDS" -lt "$deadline" ]; do + if kubectl exec -n "$NAMESPACE" "$S3_WAL_CHECK_POD" -- sh -c " + export AWS_ACCESS_KEY_ID=\$ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY=\$ACCESS_SECRET_KEY + aws --endpoint-url ${S3_ENDPOINT} s3 ls s3://postgres-backups/${wal_object} >/dev/null 2>&1 + "; then + log_info "✓ End WAL archived: ${end_wal}.gz" + return 0 + fi + sleep 5 + done + + log_error "Timeout waiting for end WAL ${end_wal}.gz in S3" + return 1 +} + cleanup() { log_info "Cleaning up..." kubectl delete namespace "$NAMESPACE" --ignore-not-found --wait=false || true @@ -235,34 +273,32 @@ fi log_info "✓ s3proxy survived the backup" # ============================================================================ -# STEP 5: Verify encryption + Delete cluster + Create new cluster (ALL PARALLEL) +# STEP 4c: Wait for backup end WAL to reach S3 (restore needs it) +# CNPG may mark backup completed before the archiver uploads the final segment. # ============================================================================ -log_info "=== Step 5: Parallel - verify encryption, delete old, create new ===" +END_WAL=$(kubectl get backup -n "$NAMESPACE" "${CLUSTER_NAME}-backup-1" -o jsonpath='{.status.endWal}') +if [ -z "$END_WAL" ]; then + log_error "Backup status.endWal is empty" + exit 1 +fi +log_info "=== Step 4c: Waiting for end WAL ${END_WAL} in S3 ===" +wait_for_end_wal_in_s3 "$END_WAL" + +# ============================================================================ +# STEP 5: Verify encryption, restore, then delete source cluster +# Keep the source cluster alive until end WAL is archived and restore succeeds. +# ============================================================================ +log_info "=== Step 5: Verify encryption and restore from backup ===" -# 1. Start encryption verification in background verify_encryption "postgres-backups" "" "$NAMESPACE" ".gz|.tar|.backup|.data" & VERIFY_PID=$! -# 2. Delete old cluster in background -( - kubectl delete cluster -n "$NAMESPACE" ${CLUSTER_NAME} --wait - kubectl wait --namespace "$NAMESPACE" \ - --for=delete pod -l cnpg.io/cluster=${CLUSTER_NAME} \ - --timeout=300s || true - log_info "✓ Old cluster deleted" -) & -DELETE_PID=$! - -# 3. Create new cluster immediately (different name, can coexist) -log_info "Creating restored cluster (parallel with deletion)..." +log_info "Creating restored cluster..." envsubst < "${SCRIPT_DIR}/templates/postgres-cluster-restore.yaml" | kubectl apply -n "$NAMESPACE" -f - -# Wait for all parallel operations wait $VERIFY_PID || { log_error "Encryption verification failed"; exit 1; } log_info "✓ Encryption verified" -wait $DELETE_PID || { log_error "Old cluster deletion failed"; exit 1; } - log_info "Waiting for restored cluster to be ready..." kubectl wait --namespace "$NAMESPACE" \ --for=condition=Ready cluster/${CLUSTER_NAME}-restored \ @@ -270,6 +306,13 @@ kubectl wait --namespace "$NAMESPACE" \ log_info "Restored cluster is ready!" +log_info "Deleting source cluster (no longer needed)..." +kubectl delete cluster -n "$NAMESPACE" "${CLUSTER_NAME}" --wait +kubectl wait --namespace "$NAMESPACE" \ + --for=delete pod -l "cnpg.io/cluster=${CLUSTER_NAME}" \ + --timeout=300s || true +log_info "✓ Old cluster deleted" + # ============================================================================ # STEP 6: Validate restored data # ============================================================================ diff --git a/s3proxy/handlers/objects/get.py b/s3proxy/handlers/objects/get.py index cf00dc1..b3d038c 100644 --- a/s3proxy/handlers/objects/get.py +++ b/s3proxy/handlers/objects/get.py @@ -372,7 +372,14 @@ async def _stream_internal_parts( slice_start = max(0, off_start - frame_global_start) slice_end = min(fpt_size, off_end - frame_global_start + 1) needed.append( - (ip.internal_part_number, abs_ct_start, abs_ct_end, fsize, slice_start, slice_end) + ( + ip.internal_part_number, + abs_ct_start, + abs_ct_end, + fsize, + slice_start, + slice_end, + ) ) frame_pt_offset += fpt_size frame_ct_offset += fsize diff --git a/tests/integration/test_get_prefetch.py b/tests/integration/test_get_prefetch.py index f5a57d5..58a079e 100644 --- a/tests/integration/test_get_prefetch.py +++ b/tests/integration/test_get_prefetch.py @@ -245,7 +245,8 @@ async def _make_framed_encrypted_multipart(mock_s3, kek, upload_id, part_sizes): ct = bytearray() for frame_idx in range(0, max(1, sz), crypto.FRAME_PLAINTEXT_SIZE): frame_pt = chunk[frame_idx : frame_idx + crypto.FRAME_PLAINTEXT_SIZE] - ct += crypto.encrypt_frame(frame_pt, dek, upload_id, n, frame_idx // crypto.FRAME_PLAINTEXT_SIZE) + frame_num = frame_idx // crypto.FRAME_PLAINTEXT_SIZE + ct += crypto.encrypt_frame(frame_pt, dek, upload_id, n, frame_num) ct = bytes(ct) ct_parts.append(ct) internal_parts.append( diff --git a/tests/unit/test_framed_crypto.py b/tests/unit/test_framed_crypto.py index 3171814..74aca4b 100644 --- a/tests/unit/test_framed_crypto.py +++ b/tests/unit/test_framed_crypto.py @@ -48,7 +48,13 @@ def test_legacy_single_seal_decrypts_via_framed(): def test_ciphertext_frame_byte_sizes_matches_framed(): - for size in [1, crypto.FRAME_PLAINTEXT_SIZE, crypto.FRAME_PLAINTEXT_SIZE + 1, 3 * crypto.FRAME_PLAINTEXT_SIZE]: + sizes = [ + 1, + crypto.FRAME_PLAINTEXT_SIZE, + crypto.FRAME_PLAINTEXT_SIZE + 1, + 3 * crypto.FRAME_PLAINTEXT_SIZE, + ] + for size in sizes: ct = crypto.framed_ciphertext_size(size) sizes = crypto.ciphertext_frame_byte_sizes(size, ct) assert sum(sizes) == ct From 1f286aa4e2354494c6ae8807ccf0bf3333fa4fe5 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 15 Jun 2026 13:21:01 +0200 Subject: [PATCH 8/8] style: apply ruff format to fix CI format check Co-authored-by: Cursor --- s3proxy/concurrency.py | 2 +- s3proxy/dashboard/collectors.py | 2 +- s3proxy/dashboard/stats_store.py | 8 ++++---- s3proxy/handlers/multipart/upload_part.py | 4 +--- s3proxy/request_handler.py | 2 +- s3proxy/utils.py | 2 +- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/s3proxy/concurrency.py b/s3proxy/concurrency.py index 18cbd59..4a4504f 100644 --- a/s3proxy/concurrency.py +++ b/s3proxy/concurrency.py @@ -35,7 +35,7 @@ def _create_malloc_release() -> Callable[[], int] | None: libc.malloc_trim.argtypes = [ctypes.c_size_t] libc.malloc_trim.restype = ctypes.c_int return lambda: libc.malloc_trim(0) - except (OSError, AttributeError): + except OSError, AttributeError: return None diff --git a/s3proxy/dashboard/collectors.py b/s3proxy/dashboard/collectors.py index 79f968e..afad1e4 100644 --- a/s3proxy/dashboard/collectors.py +++ b/s3proxy/dashboard/collectors.py @@ -134,7 +134,7 @@ def _latency_percentiles(cumulative: dict[str, float] | None = None) -> dict[str continue try: buckets.append((float(le), float(count))) - except (ValueError, TypeError): + except ValueError, TypeError: continue if total < 1 and buckets: total = max(c for _, c in buckets) diff --git a/s3proxy/dashboard/stats_store.py b/s3proxy/dashboard/stats_store.py index 8f21391..d581b5e 100644 --- a/s3proxy/dashboard/stats_store.py +++ b/s3proxy/dashboard/stats_store.py @@ -561,7 +561,7 @@ async def series(self, metric: str, range_key: str) -> tuple[list[float], list[f for k, v in raw.items(): try: points.append((int(k), float(v))) - except (ValueError, TypeError): + except ValueError, TypeError: continue return bucket_series(points, window, bucket) @@ -642,7 +642,7 @@ def _hfloat(h: dict, field: bytes) -> float: return 0.0 try: return float(v) - except (ValueError, TypeError): + except ValueError, TypeError: return 0.0 @@ -652,7 +652,7 @@ def _decode_float_map(h: dict) -> dict[str, float]: key = k.decode() if isinstance(k, bytes) else str(k) try: out[key] = float(v) - except (ValueError, TypeError): + except ValueError, TypeError: continue return out @@ -665,7 +665,7 @@ def _loads_sample(raw: bytes) -> RequestSample | None: try: d = orjson.loads(raw) return RequestSample(**d) - except (ValueError, TypeError): + except ValueError, TypeError: return None diff --git a/s3proxy/handlers/multipart/upload_part.py b/s3proxy/handlers/multipart/upload_part.py index 3fdb9aa..080e4d0 100644 --- a/s3proxy/handlers/multipart/upload_part.py +++ b/s3proxy/handlers/multipart/upload_part.py @@ -102,9 +102,7 @@ async def handle_upload_part(self, request: Request, creds: S3Credentials) -> Re estimated_parts = max(1, (content_length + optimal_part_size - 1) // optimal_part_size) use_framed = ( - (is_unsigned or is_large_signed) - and not needs_chunked_decode - and content_length > 0 + (is_unsigned or is_large_signed) and not needs_chunked_decode and content_length > 0 ) logger.info( "UPLOAD_PART_CONFIG", diff --git a/s3proxy/request_handler.py b/s3proxy/request_handler.py index afe273d..c71fbe5 100644 --- a/s3proxy/request_handler.py +++ b/s3proxy/request_handler.py @@ -229,7 +229,7 @@ async def _handle_proxy_request_impl( dispatcher = RequestDispatcher(handler) try: return await dispatcher.dispatch(request, verified_creds) - except (HTTPException, S3Error): + except HTTPException, S3Error: raise except UnknownKidError as e: logger.warning("Cannot decrypt object: key not configured", kid=e.kid) diff --git a/s3proxy/utils.py b/s3proxy/utils.py index 12be211..c758f6a 100644 --- a/s3proxy/utils.py +++ b/s3proxy/utils.py @@ -17,7 +17,7 @@ def parse_http_date(date_str: str | None) -> datetime | None: return None try: return parsedate_to_datetime(date_str) - except (ValueError, TypeError): + except ValueError, TypeError: return None