From 13c6f47627a0c77c1031673dbdde3e6e486bf1e9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 22:26:34 +0000 Subject: [PATCH 1/4] fix(cdk): improve rate limiting error messages to suggest reducing concurrency or workers Co-Authored-By: sophie.cui@airbyte.io --- .../http/error_handlers/default_error_mapping.py | 2 +- airbyte_cdk/sources/streams/http/http_client.py | 13 +++++++++++-- airbyte_cdk/sources/streams/http/rate_limiting.py | 3 ++- .../error_handlers/test_http_response_filter.py | 2 +- unit_tests/sources/streams/http/test_http.py | 4 ++-- 5 files changed, 17 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py index 45716768f..62741747e 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py @@ -61,7 +61,7 @@ 429: ErrorResolution( response_action=ResponseAction.RATE_LIMITED, failure_type=FailureType.transient_error, - error_message="HTTP Status Code: 429. Error: Too many requests.", + error_message="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", ), 500: ErrorResolution( response_action=ResponseAction.RETRY, diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 3a0a62739..f6cb00d16 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -293,10 +293,19 @@ def _send_with_retry( return response except BaseBackoffException as e: - self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True) + if isinstance(e, RateLimitBackoffException): + self._logger.error("Rate limit retries exhausted.", exc_info=True) + raise AirbyteTracedException( + internal_message=f"Rate limit retries exhausted. Exception: {e}", + message="Rate limit exceeded and retries exhausted. Try decreasing concurrency or the number of workers to stay within API rate limits.", + failure_type=e.failure_type or FailureType.transient_error, + exception=e, + stream_descriptor=StreamDescriptor(name=self._name), + ) + self._logger.error("Retries exhausted with backoff exception.", exc_info=True) raise AirbyteTracedException( internal_message=f"Exhausted available request attempts. Exception: {e}", - message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}", + message=f"Exhausted available request attempts. Exception: {e}", failure_type=e.failure_type or FailureType.system_error, exception=e, stream_descriptor=StreamDescriptor(name=self._name), diff --git a/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte_cdk/sources/streams/http/rate_limiting.py index 926a7ad56..d46a11615 100644 --- a/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -146,7 +146,8 @@ def log_retry_attempt(details: Mapping[str, Any]) -> None: f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" ) logger.info( - f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." + f"Rate limit hit after {details['tries']} tries. Waiting {details['wait']} seconds then retrying. " + f"Try decreasing concurrency or the number of workers to stay within API rate limits." ) return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py index 87e522d4a..00e0a2381 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py @@ -59,7 +59,7 @@ ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="HTTP Status Code: 429. Error: Too many requests.", + error_message="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", ), id="test_http_code_matches_retry_action", ), diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 7512c3722..0255a9e11 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -222,7 +222,7 @@ def get_error_handler(self) -> Optional[ErrorHandler]: send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( - AirbyteTracedException, match="Exception: HTTP Status Code: 429. Error: Too many requests." + AirbyteTracedException, match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits." ): list(stream.read_records(SyncMode.full_refresh)) if retries <= 0: @@ -316,7 +316,7 @@ def test_raise_on_http_errors_off_429(mocker): mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( AirbyteTracedException, - match="Exhausted available request attempts. Please see logs for more details. Exception: HTTP Status Code: 429. Error: Too many requests.", + match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", ): stream.exit_on_rate_limit = True list(stream.read_records(SyncMode.full_refresh)) From cb4514e8bcd3627c52cbc3c866f16048698cfb18 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 22:28:31 +0000 Subject: [PATCH 2/4] style: apply ruff format to test_http.py Co-Authored-By: sophie.cui@airbyte.io --- unit_tests/sources/streams/http/test_http.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 0255a9e11..32b2a5907 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -222,7 +222,8 @@ def get_error_handler(self) -> Optional[ErrorHandler]: send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( - AirbyteTracedException, match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits." + AirbyteTracedException, + match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", ): list(stream.read_records(SyncMode.full_refresh)) if retries <= 0: From d12c6eecb5949027bf460526d8f925e8ac22c6be Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 22:38:27 +0000 Subject: [PATCH 3/4] fix: update rate limit messages to reference num_workers and keep HTTP status code Co-Authored-By: sophie.cui@airbyte.io --- .../streams/http/error_handlers/default_error_mapping.py | 2 +- airbyte_cdk/sources/streams/http/http_client.py | 2 +- airbyte_cdk/sources/streams/http/rate_limiting.py | 2 +- .../requesters/error_handlers/test_http_response_filter.py | 2 +- unit_tests/sources/streams/http/test_http.py | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py index 62741747e..f951614f1 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py @@ -61,7 +61,7 @@ 429: ErrorResolution( response_action=ResponseAction.RATE_LIMITED, failure_type=FailureType.transient_error, - error_message="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", + error_message="Rate limit exceeded (HTTP status code 429). Try decreasing the number of workers to stay within API rate limits.", ), 500: ErrorResolution( response_action=ResponseAction.RETRY, diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index f6cb00d16..abed9cf0f 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -297,7 +297,7 @@ def _send_with_retry( self._logger.error("Rate limit retries exhausted.", exc_info=True) raise AirbyteTracedException( internal_message=f"Rate limit retries exhausted. Exception: {e}", - message="Rate limit exceeded and retries exhausted. Try decreasing concurrency or the number of workers to stay within API rate limits.", + message="Rate limit exceeded and retries exhausted. Try decreasing the number of workers to stay within API rate limits.", failure_type=e.failure_type or FailureType.transient_error, exception=e, stream_descriptor=StreamDescriptor(name=self._name), diff --git a/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte_cdk/sources/streams/http/rate_limiting.py index d46a11615..87496a235 100644 --- a/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -147,7 +147,7 @@ def log_retry_attempt(details: Mapping[str, Any]) -> None: ) logger.info( f"Rate limit hit after {details['tries']} tries. Waiting {details['wait']} seconds then retrying. " - f"Try decreasing concurrency or the number of workers to stay within API rate limits." + f"Try decreasing the number of workers to stay within API rate limits." ) return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py index 00e0a2381..b271135c0 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py @@ -59,7 +59,7 @@ ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", + error_message="Rate limit exceeded (HTTP status code 429). Try decreasing the number of workers to stay within API rate limits.", ), id="test_http_code_matches_retry_action", ), diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 32b2a5907..5f16232a0 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -223,7 +223,7 @@ def get_error_handler(self) -> Optional[ErrorHandler]: with pytest.raises( AirbyteTracedException, - match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", + match="Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.", ): list(stream.read_records(SyncMode.full_refresh)) if retries <= 0: @@ -317,7 +317,7 @@ def test_raise_on_http_errors_off_429(mocker): mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( AirbyteTracedException, - match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.", + match="Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.", ): stream.exit_on_rate_limit = True list(stream.read_records(SyncMode.full_refresh)) From 7301186d342ad8091e0fe0aef91499f0cd73285a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:41:36 +0000 Subject: [PATCH 4/4] fix: restore exception context in rate limit log and improve test coverage per review Co-Authored-By: sophie.cui@airbyte.io --- airbyte_cdk/sources/streams/http/rate_limiting.py | 3 ++- unit_tests/sources/streams/http/test_http.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte_cdk/sources/streams/http/rate_limiting.py index 87496a235..9d6b0a15d 100644 --- a/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -147,7 +147,8 @@ def log_retry_attempt(details: Mapping[str, Any]) -> None: ) logger.info( f"Rate limit hit after {details['tries']} tries. Waiting {details['wait']} seconds then retrying. " - f"Try decreasing the number of workers to stay within API rate limits." + f"Try decreasing the number of workers to stay within API rate limits. " + f"Last error: {str(exc)}" ) return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 5f16232a0..865f571e5 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -317,7 +317,7 @@ def test_raise_on_http_errors_off_429(mocker): mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( AirbyteTracedException, - match="Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.", + match="Exhausted available request attempts. Exception: Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.", ): stream.exit_on_rate_limit = True list(stream.read_records(SyncMode.full_refresh))