Skip to content

Make QueueManager message fetching interrupt-driven#159

Open
stidsborg wants to merge 4 commits into
mainfrom
interrupt-driven-queuemanager
Open

Make QueueManager message fetching interrupt-driven#159
stidsborg wants to merge 4 commits into
mainfrom
interrupt-driven-queuemanager

Conversation

@stidsborg
Copy link
Copy Markdown
Owner

Summary

Moves QueueManager from eager per-Subscribe store pulling to an interrupt-driven model.

  • Subscribe no longer fetches from the store. Previously every Subscribe call did await FetchAndNotify() (a store read) on the hot path. Now the initial store read happens once in Initialize(), and subsequent reads are driven by Interrupt()FetchMessagesOnce().
  • Subscribe drains buffered messages in-memory. It calls DeliverMessages() after registering the subscription, so a message that was fetched before the flow started waiting (e.g. already in the store, or returned in a multi-message batch) is still delivered immediately instead of waiting out the max-wait timeout and suspending.
  • Consolidated the delay logic. The duplicated Task.Delay(...).ContinueWith(...) across the timeout/no-timeout branches is now a single Task.Delay, picking the sooner of the max-wait and the timeout.
  • Renamed ExpireSubscriptionExpireOrSuspendSubscription to reflect its dual behavior (expire on timeout vs. suspend on max-wait).

Testing

  • dotnet test ./Core/Cleipnir.ResilientFunctions.Tests — full in-memory suite: 514/514 passed
  • Messaging filter: 55/55 passed, including FunctionCompletesAfterAwaitedMessageIsReceived, FunctionIsSuspendedWhenAwaitedMessageDoesNotAlreadyExist, InterruptSuspendedFlows, and the multi-message pull tests.
  • Run with --blame-hang-timeout 60s; no hangs.

🤖 Generated with Claude Code

stidsborg added 4 commits May 31, 2026 09:53
Drop the eager FetchAndNotify() from the Subscribe hot path. The initial
store read now happens once in Initialize(); subsequent reads are driven
by Interrupt()/FetchMessagesOnce(). Subscribe instead drains
already-buffered messages in-memory via DeliverMessages() so a message
that arrived before the wait is still delivered.

Also consolidate the duplicated delay/continuation in Subscribe into a
single Task.Delay, and rename ExpireSubscription to
ExpireOrSuspendSubscription to reflect its dual behavior.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant