diff --git a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py index 45716768f..f951614f1 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py @@ -61,7 +61,7 @@ 429: ErrorResolution( response_action=ResponseAction.RATE_LIMITED, failure_type=FailureType.transient_error, - error_message="HTTP Status Code: 429. Error: Too many requests.", + error_message="Rate limit exceeded (HTTP status code 429). Try decreasing the number of workers to stay within API rate limits.", ), 500: ErrorResolution( response_action=ResponseAction.RETRY, diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 3a0a62739..abed9cf0f 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -293,10 +293,19 @@ def _send_with_retry( return response except BaseBackoffException as e: - self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True) + if isinstance(e, RateLimitBackoffException): + self._logger.error("Rate limit retries exhausted.", exc_info=True) + raise AirbyteTracedException( + internal_message=f"Rate limit retries exhausted. Exception: {e}", + message="Rate limit exceeded and retries exhausted. Try decreasing the number of workers to stay within API rate limits.", + failure_type=e.failure_type or FailureType.transient_error, + exception=e, + stream_descriptor=StreamDescriptor(name=self._name), + ) + self._logger.error("Retries exhausted with backoff exception.", exc_info=True) raise AirbyteTracedException( internal_message=f"Exhausted available request attempts. Exception: {e}", - message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}", + message=f"Exhausted available request attempts. Exception: {e}", failure_type=e.failure_type or FailureType.system_error, exception=e, stream_descriptor=StreamDescriptor(name=self._name), diff --git a/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte_cdk/sources/streams/http/rate_limiting.py index 926a7ad56..9d6b0a15d 100644 --- a/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -146,7 +146,9 @@ def log_retry_attempt(details: Mapping[str, Any]) -> None: f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" ) logger.info( - f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." + f"Rate limit hit after {details['tries']} tries. Waiting {details['wait']} seconds then retrying. " + f"Try decreasing the number of workers to stay within API rate limits. " + f"Last error: {str(exc)}" ) return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py index 87e522d4a..b271135c0 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py @@ -59,7 +59,7 @@ ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="HTTP Status Code: 429. Error: Too many requests.", + error_message="Rate limit exceeded (HTTP status code 429). Try decreasing the number of workers to stay within API rate limits.", ), id="test_http_code_matches_retry_action", ), diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 7512c3722..865f571e5 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -222,7 +222,8 @@ def get_error_handler(self) -> Optional[ErrorHandler]: send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( - AirbyteTracedException, match="Exception: HTTP Status Code: 429. Error: Too many requests." + AirbyteTracedException, + match="Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.", ): list(stream.read_records(SyncMode.full_refresh)) if retries <= 0: @@ -316,7 +317,7 @@ def test_raise_on_http_errors_off_429(mocker): mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( AirbyteTracedException, - match="Exhausted available request attempts. Please see logs for more details. Exception: HTTP Status Code: 429. Error: Too many requests.", + match="Exhausted available request attempts. Exception: Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.", ): stream.exit_on_rate_limit = True list(stream.read_records(SyncMode.full_refresh))