fix(backend): bound storage_executor fan-out with per-request semaphores (#7387)#7388
fix(backend): bound storage_executor fan-out with per-request semaphores (#7387)#7388beastoin wants to merge 13 commits into
Conversation
…asedHardware#7387) Add module-level semaphores and sliding window to prevent unbounded task submission that causes storage queue spikes to 844 at 96 workers. - _STORAGE_CHUNK_SEM(32): global cap on concurrent chunk downloads - _PRECACHE_FILE_SEM(4): global cap on concurrent audio file precache - _CHUNK_WINDOW_SIZE=8: per-call sliding window with FIRST_COMPLETED - Semaphore acquired before submit, released in done callback (exception-safe) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…edHardware#7387) Both _precache_all_parallel and _cache_uncached_parallel now acquire the global file-level semaphore before submitting to storage_executor. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…asedHardware#7387) process_memory does LLM invoke + DB writes, not storage I/O. Moved to llm_executor with BoundedSemaphore(4) to avoid monopolizing the pool. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sedHardware#7387) Avoids parent-child deadlock on storage_executor when download_audio_chunks_and_merge submits child tasks to the same pool. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e#7387) 22 source-level tests verifying semaphore placement, sliding window pattern, and pool assignment for all 4 fan-out sites. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…are#7387) Replace storage_executor assertions with llm_executor + BoundedSemaphore checks to match the new pool assignment. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…dware#7387) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…asedHardware#7387) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…process_executor (BasedHardware#7387) _send_summary_notification does paywall checks, DB reads, LLM calls, and webhook delivery — not storage I/O. Moved to postprocess_executor with batch-of-8 to prevent unbounded fan-out. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… tests (BasedHardware#7387) 3 behavioral tests: window caps inflight, semaphore released on exception, global semaphore limits cross-request. 2 notification source-level tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Greptile SummaryThis PR introduces per-request semaphore gating to bound unbounded fan-out patterns on
Confidence Score: 3/5Merging is safe for immediate load relief, but two structural issues survive that could resurface under sustained high traffic. The precache path in storage.py still submits _cache_single to storage_executor where it internally blocks waiting for child storage_executor tasks, the exact pattern explicitly fixed for speaker_identification.py. Separately, _precache_all_parallel and _cache_uncached_parallel in sync.py can exhaust all 24 postprocess_executor threads by blocking them on _PRECACHE_FILE_SEM.acquire() when global capacity is held by concurrent storage work, stalling unrelated background tasks. backend/utils/other/storage.py (precache parent-child pattern) and backend/routers/sync.py (postprocess thread exhaustion) deserve a closer look before merging to production at high traffic volumes. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Precache Request] --> B[postprocess_executor\n_precache_all_parallel]
B --> C{_PRECACHE_FILE_SEM\nBoundedSemaphore 4}
C -->|slot acquired| D[storage_executor\n_cache_single]
D --> E[download_audio_chunks_and_merge\nrunning on storage_executor thread]
E --> F{_STORAGE_CHUNK_SEM\nBoundedSemaphore 32}
F -->|slot acquired| G[storage_executor\ndownload_single_chunk child]
G -->|done callback| F
D -->|done callback| C
I[speaker_identification] --> J[sync_executor\ndownload_audio_chunks_and_merge]
J --> F
K[KG Rebuild] --> L[llm_executor\nprocess_memory]
L -->|_KG_REBUILD_SEM 4| M[LLM + DB work]
style C fill:#f90,color:#000
style F fill:#f90,color:#000
style E fill:#fcc,color:#000
|
| futures = [] | ||
| for af in audio_files: | ||
| _PRECACHE_FILE_SEM.acquire() | ||
| try: | ||
| f = storage_executor.submit(_cache_single, af) | ||
| f.add_done_callback(lambda _: _PRECACHE_FILE_SEM.release()) | ||
| futures.append(f) | ||
| except Exception: | ||
| _PRECACHE_FILE_SEM.release() | ||
| raise |
There was a problem hiding this comment.
Parent-child deadlock still present on
storage_executor for precache
_cache_single is submitted to storage_executor. Internally it calls get_or_create_merged_audio → download_audio_chunks_and_merge, which now uses the sliding window that submits additional child tasks to the same storage_executor pool and blocks via wait() until they complete. This is the identical parent-waits-for-child-on-same-pool pattern that was explicitly fixed for speaker_identification.py (by moving the parent to sync_executor). The _PRECACHE_FILE_SEM(4) cap (max 4 parent slots + 32 child slots = 36 of 96 workers) keeps the risk low, but under sustained traffic with other storage work queued, the remaining 60 free slots could be consumed, stalling all 4 parents indefinitely. The fix applied to speaker_identification — submitting the parent to a different pool (sync_executor) — should be applied here too.
| from utils.executors import db_executor, llm_executor | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _KG_REBUILD_SEM = threading.BoundedSemaphore(4) | ||
|
|
||
| from langchain_core.output_parsers import PydanticOutputParser |
There was a problem hiding this comment.
The
_KG_REBUILD_SEM module-level assignment is interleaved between two import blocks. PEP 8 and most linters (E402) require all imports to appear before any executable statements at module level. Move it after the full import block to avoid triggering linter errors.
| from utils.executors import db_executor, llm_executor | |
| logger = logging.getLogger(__name__) | |
| _KG_REBUILD_SEM = threading.BoundedSemaphore(4) | |
| from langchain_core.output_parsers import PydanticOutputParser | |
| from utils.executors import db_executor, llm_executor | |
| from langchain_core.output_parsers import PydanticOutputParser |
|
@beastoin Not approved yet: Please push the follow-up and rerun by AI for @beastoin |
…sedHardware#7387) Prevents one failed user summary from blocking remaining batches. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ecutor (BasedHardware#7387) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…asedHardware#7387) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
The stale assertion issues flagged in the previous review have all been fixed:
All 134 tests pass. Re-review iteration 3 approved with by AI for @beastoin |
Summary
Fixes #7387 — bounds all unbounded fan-out patterns in
storage_executorsubmissions that cause queue spikes to 844+ at 96 workers during sustained traffic (~3,300 req/min across 3 instances).Changes
storage.py— Sliding window for chunk downloads: Replaced unbounded dict-comprehension submit withFIRST_COMPLETEDsliding window (_CHUNK_WINDOW_SIZE=8) gated by_STORAGE_CHUNK_SEM(32)global semaphore. Handles both individual chunks and batch blobs as one job stream.storage.py— Precache file semaphore: Added_PRECACHE_FILE_SEM(4)to gate concurrent file precache submissions inprecache_conversation_audio._precache_all().sync.py— Precache semaphore in sync endpoints: Both_precache_all_paralleland_cache_uncached_parallelnow use_PRECACHE_FILE_SEMfrom storage to bound their fan-out.knowledge_graph.py— Wrong pool fix: Movedrebuild_knowledge_graphfromstorage_executortollm_executor(correct pool for LLM+DB work) with_KG_REBUILD_SEM(4)bounded semaphore.speaker_identification.py— Parent-child deadlock fix: Changed parent call todownload_audio_chunks_and_mergefromstorage_executortosync_executorto prevent coordinator-child deadlock.notifications.py— Wrong pool fix: Moved notification fan-out fromstorage_executortopostprocess_executorwith batched processing (_BATCH_SIZE=8) and error isolation viareturn_exceptions=True.Semaphore safety pattern (all sites)
Test plan
test_storage_fanout_limits.py(source-level + behavioral)test_clean_sweep_migrations.py— KG and notification assertionstest_async_http_infrastructure.py— webhook wiring testtest.shbeast omi dev boot-check— clean importsExpected production impact
Changed-path coverage checklist
storage.py:_STORAGE_CHUNK_SEM + sliding windowstorage.py:_PRECACHE_FILE_SEM + _precache_allsync.py:_precache_all_parallel + _cache_uncached_parallelknowledge_graph.py:llm_executor + _KG_REBUILD_SEMspeaker_identification.py:sync_executor parent callnotifications.py:postprocess_executor + batching🤖 Generated with Claude Code