feat(cdk): enable fail-fast shutdown on memory threshold with dual-condition check#962
feat(cdk): enable fail-fast shutdown on memory threshold with dual-condition check#962devin-ai-integration[bot] wants to merge 9 commits intomainfrom
Conversation
…ndition check Add critical memory threshold (95% cgroup + 80% process RSS) that raises AirbyteTracedException with FailureType.system_error when both conditions are met. This gives connectors a clean error message instead of an opaque OOM kill. Key design decisions: - Dual-condition: only raises when BOTH cgroup >= 95% AND process RSS >= 80% of container limit. This avoids false positives from reclaimable kernel page cache (e.g., SQLite file cache). - Process RSS read from /proc/self/status (no new dependencies). - Kill switch: AIRBYTE_MEMORY_FAIL_FAST env var (default: true, set to 'false' to disable). - Failure-path contract: entrypoint read loop wrapped in try/finally to flush queued state messages before exception propagates. Resolves airbytehq/airbyte-internal-issues#15982 Co-Authored-By: bot_apk <apk@cognition.ai>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1774478445-memory-failfast#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1774478445-memory-failfastPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Fast)3 943 tests +9 3 932 ✅ +9 7m 47s ⏱️ +48s Results for commit 6c2cd9c. ± Comparison against base commit acafc75. This pull request removes 2 and adds 11 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 946 tests +9 3 934 ✅ +9 10m 57s ⏱️ +8s Results for commit 6c2cd9c. ± Comparison against base commit acafc75. This pull request removes 2 and adds 11 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
|
I think the second signal should be Why:
Concrete changes I'd suggest:
That would make the implementation line up more closely with the design described in the PR. |
…8%, log+skip on missing RssAnon Address review feedback: - Parse RssAnon from /proc/self/status instead of VmRSS to avoid inflation from file-backed/shared resident pages - Raise _CRITICAL_THRESHOLD from 95% to 98% for more conservative fail-fast behavior - When RssAnon is unavailable, log a warning and skip fail-fast instead of falling back to cgroup-only raising (stays truly dual-condition) - Add test proving metric choice matters (VmRSS high but RssAnon low) - Update all docstrings/comments to say 'anonymous resident memory' Co-Authored-By: bot_apk <apk@cognition.ai>
|
Great points — all addressed in 748212f:
|
There was a problem hiding this comment.
Pull request overview
Adds a dual-signal “fail-fast” memory shutdown path to the CDK so connectors can fail with a clear AirbyteTracedException(FailureType.system_error) under near-OOM conditions, instead of being OOM-killed.
Changes:
- Extend
MemoryMonitorto optionally raise when cgroup usage is critical and processRssAnonis high (with env-var + constructor gating). - Add unit tests for
RssAnonparsing, dual-condition behavior, and feature-flag behavior. - Wrap the
read()loop to attempt flushing queued messages even when a fail-fast exception interrupts a sync.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
airbyte_cdk/utils/memory_monitor.py |
Implements dual-condition fail-fast using cgroup + /proc/self/status RssAnon, plus the AIRBYTE_MEMORY_FAIL_FAST switch. |
unit_tests/utils/test_memory_monitor.py |
Adds coverage for RssAnon parsing and the new fail-fast decision logic + toggles. |
airbyte_cdk/entrypoint.py |
Adds a try/finally around the read loop to flush queued messages on interruption. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address Copilot review feedback: - Restructure entrypoint.py to catch the fail-fast exception around check_memory_usage(), flush queued messages, then re-raise — avoids yielding inside a finally block which can trigger RuntimeError on GeneratorExit. - Use clear=True in patch.dict for test_fail_fast_enabled_by_default to ensure AIRBYTE_MEMORY_FAIL_FAST is truly unset in test env. Co-Authored-By: bot_apk <apk@cognition.ai>
Per review feedback, increase _MEMORY_THRESHOLD from 0.90 to 0.95 to reduce noise from the warning log. Update all related docstrings, comments, and test assertions accordingly. Co-Authored-By: bot_apk <apk@cognition.ai>
|
Thanks for making the One follow-up: I would set Now that the logic is intentionally very conservative on the cgroup side ( If the design goal is "only fail fast when there is very strong evidence of imminent process-driven OOM",
Absent fleet data, I would rather start at |
Per review feedback, increase _ANON_RSS_THRESHOLD from 0.80 to 0.90 to align with the conservative 98% cgroup threshold. Start stricter and relax later once fleet data is available. Co-Authored-By: bot_apk <apk@cognition.ai>
|
Patrick Nilan (@pnilan) Agreed — Current thresholds summary:
|
Co-Authored-By: bot_apk <apk@cognition.ai>
|
/prerelease
|
|
I think the exception scope in At try:
self._memory_monitor.check_memory_usage()
except Exception:
for queued_message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(queued_message, stream_message_counter)
raiseThe intended exception from the new fail-fast path is Why this matters:
I think the safer and more self-documenting version is: try:
self._memory_monitor.check_memory_usage()
except AirbyteTracedException:
# Flush queued messages (state checkpoints, logs) before propagating
# the memory fail-fast exception, so the platform receives the last
# committed state for the next sync.
for queued_message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(queued_message, stream_message_counter)
raiseThat keeps the new flush-and-reraise behavior tightly scoped to the intentional fail-fast mechanism introduced in this PR. If the broader behavior is intentional, I’d suggest documenting that explicitly in a comment, because I’d also recommend a targeted test for this path:
That would lock in the exact behavior this PR is trying to guarantee. |
…-before-raise test Co-Authored-By: bot_apk <apk@cognition.ai>
|
Good call — narrowed the catch to Also added
|
|
I think this change also needs a targeted regression test for the new exception path in Right now, the PR adds special behavior at
That behavior is important because it is the mechanism that preserves the latest queued state checkpoint before the sync fails fast. I do not see a test in this PR that locks that contract in. The existing read tests in
But they do not cover the new case where:
I think the regression test should prove all of the following in one flow:
Suggested shape:
That would protect the exact behavior this PR is introducing and would catch future regressions in the flush-before-reraise path. |
|
I also think the Right now the implementation presents fail-fast as runtime-configurable:
If the production environment cannot set that env var, then this is not a real operational control. In practice that means:
In that case, I think the cleaner implementation is to remove the env-var path entirely and make the behavior explicit in code. Concretely, I would suggest:
That leaves the team with a simpler and more honest contract: either fail-fast is part of the product behavior, or it is not. If a kill switch is still desired, it should be wired to a control surface the runtime actually supports rather than an env var that operators cannot set. |
…nt and step-by-step generator iteration Co-Authored-By: bot_apk <apk@cognition.ai>
|
Agreed — rewrote the test in e649a70 to match your suggested shape exactly. It now calls
This locks in the full flush-before-reraise contract including the record count population. |
|
Good question. The env var was included in the original issue spec as a kill switch for safe rollout, but you raise a valid point — if the runtime can't actually set per-connector env vars, it's dead code that overstates operational flexibility. I'd like your guidance before removing it:
Happy to remove the env var path, the |
|
Answering your question directly: the runtime environment for these syncs will not support Given that, I do think the env-var path should be removed from this PR rather than kept as a kill switch. Concretely, please:
My concern is not just code cleanliness. If the runtime cannot actually provide this env var, then leaving it in the implementation creates a control surface that looks operationally real but is not usable in practice. That is worse than having no kill switch at all, because it overstates rollback flexibility. If the platform wants a real rollout lever later, it should be introduced through a control surface the runtime actually supports. I would not block this PR on designing that alternative. For this change, I would simplify the implementation and remove the dead env-var path. |
…ail-fast unconditional Co-Authored-By: bot_apk <apk@cognition.ai>
|
Done in 6c2cd9c. Removed the env-var kill switch entirely:
Agree that if a real rollout lever is needed later, it should be wired to a control surface the runtime actually supports. |
Summary
Adds fail-fast memory shutdown to the CDK's
MemoryMonitor. When container memory pressure is critical, the monitor now raisesAirbyteTracedExceptionwithFailureType.system_errorinstead of letting the process hit an opaque OOM-kill.Dual-condition logic: The exception only fires when both:
RssAnon) ≥ 90% of the container limit (read from/proc/self/status— no new dependencies)RssAnonis used instead ofVmRSSbecauseVmRSS = RssAnon + RssFile + RssShmem— total RSS can be inflated by mmap'd file-backed pages (e.g. SQLite) and shared memory that don't represent real process heap pressure.RssAnonis the closest proxy for private, non-reclaimable Python-heap memory. When cgroup is high butRssAnonis low, an INFO log is emitted instead of raising. WhenRssAnonis unavailable (e.g. older kernel, non-Linux), the monitor logs a warning and skips fail-fast rather than falling back to cgroup-only raising — keeping the implementation truly dual-condition.Logging threshold raised to 95%: The existing warning log threshold was bumped from 90% → 95% to reduce noise. Warnings now only fire when the container is already close to the critical zone.
Unconditional fail-fast (no kill switch): Fail-fast is always active — there is no env var or constructor parameter to disable it. If a runtime control surface is needed later, it should be wired to something the platform actually supports rather than an env var the runtime cannot set.
Failure-path contract (entrypoint.py): When
check_memory_usage()raisesAirbyteTracedException, the exception is caught inline, queued state messages are flushed viayield, and the exception is re-raised. The catch is scoped specifically toAirbyteTracedException(not broadException) so that unrelated monitor bugs do not participate in the checkpoint-flush path. Normal-completion flush happens after the read loop.Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/15982:
Updates since last revision
AIRBYTE_MEMORY_FAIL_FASTenv var kill switch entirely (commit6c2cd9cc): Deleted the_ENV_FAIL_FASTconstant,import os,fail_fastconstructor parameter,self._fail_fastattribute, and all env-var resolution logic. Fail-fast is now unconditional when the dual-condition is met.test_fail_fast_disabled_via_constructor,test_fail_fast_disabled_via_env_var,test_fail_fast_enabled_by_default,test_fail_fast_constructor_overrides_env_var.Review & Testing Checklist for Human
RssAnondual-condition is conservative enough for the fleet.RssAnonkernel availability:RssAnonwas added in Linux 4.5. If any production containers run an older kernel, the field will be absent and fail-fast will be silently skipped (log-only). Confirm this graceful degradation is acceptable.test_memory_failfast_flushes_queued_state_before_raisingtest validates the contract with mocks (includingsourceStats.recordCount). Verify in a real container that the platform actually receives the flushed state checkpoint when the exception fires.AirbyteTracedExceptionis caught for the flush-before-reraise path. Ifcheck_memory_usage()ever raises a different exception type (e.g. an unexpected bug), it will propagate without flushing queued state. Confirm this is the desired behavior.Suggested test plan: Deploy a connector with artificially low memory limits (e.g., 256 MB container running a large sync) and verify: (a) the
AirbyteTracedExceptionappears in logs with the expected message referencingRssAnon, (b) the last state checkpoint is received by the platform, (c) re-sync resumes from the flushed checkpoint.Notes
/proc/self/status(RssAnonfield in kB) to avoid adding apsutildependency.RssAnonwas chosen overVmRSSto exclude file-backed and shared resident pages.RssAnonparsing (including a test provingVmRSS-only content returnsNone), dual-condition raise/no-raise paths (including a test with highVmRSSbut lowRssAnon),RssAnon-unavailable graceful skip, and the entrypoint flush-before-raise contract withsourceStats.recordCountverification.Link to Devin session: https://app.devin.ai/sessions/be2f3d2a1505478891a9859e41ca1cb9