Skip to content

fix(backend): bound storage_executor fan-out with per-request semaphores (#7387)#7388

Open
beastoin wants to merge 13 commits into
BasedHardware:mainfrom
beastoin:fix/storage-fanout-limits-7387
Open

fix(backend): bound storage_executor fan-out with per-request semaphores (#7387)#7388
beastoin wants to merge 13 commits into
BasedHardware:mainfrom
beastoin:fix/storage-fanout-limits-7387

Conversation

@beastoin
Copy link
Copy Markdown
Collaborator

@beastoin beastoin commented May 19, 2026

Summary

Fixes #7387 — bounds all unbounded fan-out patterns in storage_executor submissions that cause queue spikes to 844+ at 96 workers during sustained traffic (~3,300 req/min across 3 instances).

Changes

  1. storage.py — Sliding window for chunk downloads: Replaced unbounded dict-comprehension submit with FIRST_COMPLETED sliding window (_CHUNK_WINDOW_SIZE=8) gated by _STORAGE_CHUNK_SEM(32) global semaphore. Handles both individual chunks and batch blobs as one job stream.

  2. storage.py — Precache file semaphore: Added _PRECACHE_FILE_SEM(4) to gate concurrent file precache submissions in precache_conversation_audio._precache_all().

  3. sync.py — Precache semaphore in sync endpoints: Both _precache_all_parallel and _cache_uncached_parallel now use _PRECACHE_FILE_SEM from storage to bound their fan-out.

  4. knowledge_graph.py — Wrong pool fix: Moved rebuild_knowledge_graph from storage_executor to llm_executor (correct pool for LLM+DB work) with _KG_REBUILD_SEM(4) bounded semaphore.

  5. speaker_identification.py — Parent-child deadlock fix: Changed parent call to download_audio_chunks_and_merge from storage_executor to sync_executor to prevent coordinator-child deadlock.

  6. notifications.py — Wrong pool fix: Moved notification fan-out from storage_executor to postprocess_executor with batched processing (_BATCH_SIZE=8) and error isolation via return_exceptions=True.

Semaphore safety pattern (all sites)

_SEM.acquire()
try:
    f = executor.submit(work)
    f.add_done_callback(lambda _: _SEM.release())
except Exception:
    _SEM.release()
    raise

Test plan

  • 27 new tests in test_storage_fanout_limits.py (source-level + behavioral)
  • Updated test_clean_sweep_migrations.py — KG and notification assertions
  • Updated test_async_http_infrastructure.py — webhook wiring test
  • 134 total tests pass across all 3 test files
  • New test file added to test.sh
  • beast omi dev boot-check — clean imports
  • L1: Backend starts, all API endpoints respond correctly
  • L2: Backend + Pusher both start and serve requests, no executor errors

Expected production impact

  • Max storage_executor queue depth: <200 (down from 844+)
  • No fan-out site can submit more than 8 concurrent tasks per call
  • Global chunk semaphore caps cross-request chunk downloads at 32
  • Precache file semaphore caps concurrent file downloads at 4
  • KG rebuild and notification work no longer competes for storage_executor slots

Changed-path coverage checklist

Path ID Changed path Happy-path test Non-happy-path test L1 result L2 result
P1 storage.py:_STORAGE_CHUNK_SEM + sliding window Backend starts, conversations 200 Sync precache 404 for fake ID PASS PASS
P2 storage.py:_PRECACHE_FILE_SEM + _precache_all Module loads, no errors Semaphore release in except block (source test) PASS PASS
P3 sync.py:_precache_all_parallel + _cache_uncached_parallel Sync router processes audio files Endpoint returns proper error PASS PASS
P4 knowledge_graph.py:llm_executor + _KG_REBUILD_SEM Module imports clean Semaphore release in except block (source test) PASS PASS
P5 speaker_identification.py:sync_executor parent call Module loads cleanly Source-level: no storage_executor in context PASS PASS
P6 notifications.py:postprocess_executor + batching Daily summaries 200 return_exceptions isolates failures PASS PASS

🤖 Generated with Claude Code

beastoin and others added 10 commits May 19, 2026 11:43
…asedHardware#7387)

Add module-level semaphores and sliding window to prevent unbounded
task submission that causes storage queue spikes to 844 at 96 workers.

- _STORAGE_CHUNK_SEM(32): global cap on concurrent chunk downloads
- _PRECACHE_FILE_SEM(4): global cap on concurrent audio file precache
- _CHUNK_WINDOW_SIZE=8: per-call sliding window with FIRST_COMPLETED
- Semaphore acquired before submit, released in done callback (exception-safe)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…edHardware#7387)

Both _precache_all_parallel and _cache_uncached_parallel now acquire
the global file-level semaphore before submitting to storage_executor.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…asedHardware#7387)

process_memory does LLM invoke + DB writes, not storage I/O. Moved to
llm_executor with BoundedSemaphore(4) to avoid monopolizing the pool.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sedHardware#7387)

Avoids parent-child deadlock on storage_executor when
download_audio_chunks_and_merge submits child tasks to the same pool.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e#7387)

22 source-level tests verifying semaphore placement, sliding window
pattern, and pool assignment for all 4 fan-out sites.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…are#7387)

Replace storage_executor assertions with llm_executor + BoundedSemaphore
checks to match the new pool assignment.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…dware#7387)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…process_executor (BasedHardware#7387)

_send_summary_notification does paywall checks, DB reads, LLM calls, and
webhook delivery — not storage I/O. Moved to postprocess_executor with
batch-of-8 to prevent unbounded fan-out.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… tests (BasedHardware#7387)

3 behavioral tests: window caps inflight, semaphore released on exception,
global semaphore limits cross-request. 2 notification source-level tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 19, 2026

Greptile Summary

This PR introduces per-request semaphore gating to bound unbounded fan-out patterns on storage_executor that were causing queue spikes up to 844 tasks at peak traffic. It also migrates KG rebuild to llm_executor and fixes a confirmed parent-child deadlock in speaker_identification.py by moving the parent call to sync_executor.

  • storage.py: Replaces the bulk-submit dict comprehension in download_audio_chunks_and_merge with an 8-wide FIRST_COMPLETED sliding window gated by _STORAGE_CHUNK_SEM(32), and adds _PRECACHE_FILE_SEM(4) around all three precache fan-out sites.
  • knowledge_graph.py: Moves batch rebuild from storage_executor to llm_executor with _KG_REBUILD_SEM(4) to prevent LLM/DB work from monopolizing storage workers.
  • speaker_identification.py: Moves download_audio_chunks_and_merge parent call to sync_executor, eliminating the confirmed storage pool parent-child deadlock.

Confidence Score: 3/5

Merging is safe for immediate load relief, but two structural issues survive that could resurface under sustained high traffic.

The precache path in storage.py still submits _cache_single to storage_executor where it internally blocks waiting for child storage_executor tasks, the exact pattern explicitly fixed for speaker_identification.py. Separately, _precache_all_parallel and _cache_uncached_parallel in sync.py can exhaust all 24 postprocess_executor threads by blocking them on _PRECACHE_FILE_SEM.acquire() when global capacity is held by concurrent storage work, stalling unrelated background tasks.

backend/utils/other/storage.py (precache parent-child pattern) and backend/routers/sync.py (postprocess thread exhaustion) deserve a closer look before merging to production at high traffic volumes.

Important Files Changed

Filename Overview
backend/utils/other/storage.py Adds sliding window with global semaphore for chunk downloads and file-level semaphore for precache; residual parent-child deadlock risk in _cache_single submitted to storage_executor.
backend/routers/sync.py Imports and applies _PRECACHE_FILE_SEM at both precache fan-out sites; blocking semaphore acquire inside postprocess_executor threads risks exhausting the pool under concurrent load.
backend/utils/llm/knowledge_graph.py Migrates KG rebuild from storage_executor to llm_executor with BoundedSemaphore(4); _KG_REBUILD_SEM declaration is interleaved mid-import block (PEP 8 E402).
backend/utils/speaker_identification.py Moves download_audio_chunks_and_merge parent call from storage_executor to sync_executor, correctly eliminating the parent-child deadlock on the same pool.
backend/tests/unit/test_storage_fanout_limits.py New source-level test suite verifying semaphore constants, sliding window pattern, and pool assignments across storage, sync, knowledge_graph, and speaker_identification.
backend/tests/unit/test_clean_sweep_migrations.py Updates KG migration tests to assert llm_executor and BoundedSemaphore usage; removes stale storage_executor assertions.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Precache Request] --> B[postprocess_executor\n_precache_all_parallel]
    B --> C{_PRECACHE_FILE_SEM\nBoundedSemaphore 4}
    C -->|slot acquired| D[storage_executor\n_cache_single]
    D --> E[download_audio_chunks_and_merge\nrunning on storage_executor thread]
    E --> F{_STORAGE_CHUNK_SEM\nBoundedSemaphore 32}
    F -->|slot acquired| G[storage_executor\ndownload_single_chunk child]
    G -->|done callback| F
    D -->|done callback| C
    I[speaker_identification] --> J[sync_executor\ndownload_audio_chunks_and_merge]
    J --> F
    K[KG Rebuild] --> L[llm_executor\nprocess_memory]
    L -->|_KG_REBUILD_SEM 4| M[LLM + DB work]
    style C fill:#f90,color:#000
    style F fill:#f90,color:#000
    style E fill:#fcc,color:#000
Loading

Comments Outside Diff (1)

  1. backend/routers/sync.py, line 207-226 (link)

    P1 _precache_all_parallel can exhaust postprocess_executor threads

    This function runs in a postprocess_executor thread. It blocks on _PRECACHE_FILE_SEM.acquire() when the global 4-slot capacity is consumed by other concurrent requests. If many concurrent precache requests arrive simultaneously — each dispatched to a postprocess_executor thread — all 24 postprocess_executor workers can end up blocked on the semaphore, stalling every other background task (persona updates, post-processing) that depends on the same pool. A non-blocking approach (e.g., using asyncio.Semaphore from the async handler and coordinating via asyncio.gather, or submitting a coordinator task only when a semaphore slot is immediately available) would prevent this thread exhaustion scenario. The same concern applies to _cache_uncached_parallel below.

Reviews (1): Last reviewed commit: "test(backend): add behavioral sliding-wi..." | Re-trigger Greptile

Comment on lines +1031 to +1040
futures = []
for af in audio_files:
_PRECACHE_FILE_SEM.acquire()
try:
f = storage_executor.submit(_cache_single, af)
f.add_done_callback(lambda _: _PRECACHE_FILE_SEM.release())
futures.append(f)
except Exception:
_PRECACHE_FILE_SEM.release()
raise
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Parent-child deadlock still present on storage_executor for precache

_cache_single is submitted to storage_executor. Internally it calls get_or_create_merged_audiodownload_audio_chunks_and_merge, which now uses the sliding window that submits additional child tasks to the same storage_executor pool and blocks via wait() until they complete. This is the identical parent-waits-for-child-on-same-pool pattern that was explicitly fixed for speaker_identification.py (by moving the parent to sync_executor). The _PRECACHE_FILE_SEM(4) cap (max 4 parent slots + 32 child slots = 36 of 96 workers) keeps the risk low, but under sustained traffic with other storage work queued, the remaining 60 free slots could be consumed, stalling all 4 parents indefinitely. The fix applied to speaker_identification — submitting the parent to a different pool (sync_executor) — should be applied here too.

Comment on lines +8 to 14
from utils.executors import db_executor, llm_executor

logger = logging.getLogger(__name__)

_KG_REBUILD_SEM = threading.BoundedSemaphore(4)

from langchain_core.output_parsers import PydanticOutputParser
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The _KG_REBUILD_SEM module-level assignment is interleaved between two import blocks. PEP 8 and most linters (E402) require all imports to appear before any executable statements at module level. Move it after the full import block to avoid triggering linter errors.

Suggested change
from utils.executors import db_executor, llm_executor
logger = logging.getLogger(__name__)
_KG_REBUILD_SEM = threading.BoundedSemaphore(4)
from langchain_core.output_parsers import PydanticOutputParser
from utils.executors import db_executor, llm_executor
from langchain_core.output_parsers import PydanticOutputParser

@beastoin
Copy link
Copy Markdown
Collaborator Author

@beastoin Not approved yet: backend/test.sh still fails because tests/unit/test_clean_sweep_migrations.py:360 asserts utils/other/notifications.py contains storage_executor, but this PR intentionally moved daily-summary notification work to postprocess_executor; please update that stale assertion so the required backend suite passes. Also please make _send_bulk_summary_notification collect/log per-user failures within each batch, for example with return_exceptions=True, so one bad user in an early batch does not prevent later batches from being submitted.

Please push the follow-up and rerun backend/test.sh.


by AI for @beastoin

beastoin and others added 3 commits May 19, 2026 11:52
…sedHardware#7387)

Prevents one failed user summary from blocking remaining batches.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ecutor (BasedHardware#7387)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@beastoin
Copy link
Copy Markdown
Collaborator Author

The stale assertion issues flagged in the previous review have all been fixed:

  1. test_clean_sweep_migrations.py:360 — updated from storage_executor to postprocess_executor (commit 13c1ab2)
  2. test_async_http_infrastructure.py webhook wiring test — updated to check postprocess_executor (commit e264282)
  3. Added return_exceptions=True to bulk summary batching (commit 8e8035d)

All 134 tests pass. Re-review iteration 3 approved with PR_APPROVED_LGTM.


by AI for @beastoin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Backend storage_executor queue spikes from unbounded fan-out submissions

1 participant