Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ definitions:
anyOf:
- type: number
title: Number of seconds
minimum: 0
- type: string
title: Interpolated Value
interpolation_context:
Expand All @@ -490,6 +491,13 @@ definitions:
- 30
- 30.5
- "{{ config['backoff_time'] }}"
jitter_range_in_seconds:
title: Jitter Range
description: Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2), so jitter only increases the base backoff.
type: number
minimum: 0
examples:
- 15
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2033,6 +2041,13 @@ definitions:
- 5
- 5.5
- "10"
jitter_range_in_seconds:
title: Jitter Range
description: Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2), so jitter only increases the computed backoff.
type: number
minimum: 0
examples:
- 2
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ class ConstantBackoffStrategy(BaseModel):
examples=[30, 30.5, "{{ config['backoff_time'] }}"],
title="Backoff Time",
)
jitter_range_in_seconds: Optional[float] = Field(
None,
description="Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2), so jitter only increases the base backoff.",
examples=[15],
ge=0,
title="Jitter Range",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -512,6 +519,13 @@ class ExponentialBackoffStrategy(BaseModel):
examples=[5, 5.5, "10"],
title="Factor",
)
jitter_range_in_seconds: Optional[float] = Field(
None,
description="Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2), so jitter only increases the computed backoff.",
examples=[2],
ge=0,
title="Jitter Range",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -3077,6 +3091,7 @@ class AsyncRetriever(BaseModel):
failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field(
None,
description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.",
ge=1,
)
download_target_requester: Optional[Union[HttpRequester, CustomRequester]] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1752,12 +1752,19 @@ def create_concurrent_cursor_from_perpartition_cursor(
def create_constant_backoff_strategy(
model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any
) -> ConstantBackoffStrategy:
ModelToComponentFactory._validate_jitter_range(model.jitter_range_in_seconds)
return ConstantBackoffStrategy(
backoff_time_in_seconds=model.backoff_time_in_seconds,
jitter_range_in_seconds=model.jitter_range_in_seconds,
config=config,
parameters=model.parameters or {},
)

@staticmethod
def _validate_jitter_range(jitter_range_in_seconds: Optional[float]) -> None:
if jitter_range_in_seconds is not None and jitter_range_in_seconds < 0:
raise ValueError("jitter_range_in_seconds must be greater than or equal to 0")

def create_cursor_pagination(
self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any
) -> CursorPaginationStrategy:
Expand Down Expand Up @@ -2432,8 +2439,12 @@ def create_response_to_file_extractor(
def create_exponential_backoff_strategy(
model: ExponentialBackoffStrategyModel, config: Config
) -> ExponentialBackoffStrategy:
ModelToComponentFactory._validate_jitter_range(model.jitter_range_in_seconds)
return ExponentialBackoffStrategy(
factor=model.factor or 5, parameters=model.parameters or {}, config=config
factor=model.factor or 5,
jitter_range_in_seconds=model.jitter_range_in_seconds,
parameters=model.parameters or {},
config=config,
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import random
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
from typing import Any, Mapping, Optional, Union, cast

import requests

Expand All @@ -24,6 +25,7 @@ class ConstantBackoffStrategy(BackoffStrategy):
backoff_time_in_seconds: Union[float, InterpolatedString, str]
parameters: InitVar[Mapping[str, Any]]
config: Config
jitter_range_in_seconds: Optional[float] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if not isinstance(self.backoff_time_in_seconds, InterpolatedString):
Expand All @@ -42,4 +44,10 @@ def backoff_time(
response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
attempt_count: int,
) -> Optional[float]:
return self.backoff_time_in_seconds.eval(self.config) # type: ignore # backoff_time_in_seconds is always cast to an interpolated string
backoff_time = float(
cast(InterpolatedString, self.backoff_time_in_seconds).eval(self.config)
)
if self.jitter_range_in_seconds is None:
return backoff_time

return random.uniform(backoff_time, backoff_time + (self.jitter_range_in_seconds * 2))
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import random
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union

Expand All @@ -24,6 +25,7 @@ class ExponentialBackoffStrategy(BackoffStrategy):
parameters: InitVar[Mapping[str, Any]]
config: Config
factor: Union[float, InterpolatedString, str] = 5
jitter_range_in_seconds: Optional[float] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if not isinstance(self.factor, InterpolatedString):
Expand All @@ -35,11 +37,15 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

@property
def _retry_factor(self) -> float:
return self._factor.eval(self.config) # type: ignore # factor is always cast to an interpolated string
return float(self._factor.eval(self.config))

def backoff_time(
self,
response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
attempt_count: int,
) -> Optional[float]:
return self._retry_factor * 2**attempt_count # type: ignore # factor is always cast to an interpolated string
backoff_time = float(self._retry_factor * 2**attempt_count)
if self.jitter_range_in_seconds is None:
return backoff_time

return random.uniform(backoff_time, backoff_time + (self.jitter_range_in_seconds * 2))
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,15 @@
from airbyte_cdk.sources.declarative.models import (
SubstreamPartitionRouter as SubstreamPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConstantBackoffStrategy as ConstantBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomRequester as CustomRequesterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
OffsetIncrement as OffsetIncrementModel,
)
Expand Down Expand Up @@ -1754,6 +1760,123 @@ def test_create_requester(test_name, error_handler, expected_backoff_strategy_ty
)


@pytest.mark.parametrize(
"backoff_strategy_yaml, expected_backoff_strategy_type, expected_jitter_range",
[
pytest.param(
"""
error_handler:
backoff_strategies:
- type: "ConstantBackoffStrategy"
backoff_time_in_seconds: 60
jitter_range_in_seconds: 7
""",
ConstantBackoffStrategy,
7,
id="constant_backoff_strategy",
),
pytest.param(
"""
error_handler:
backoff_strategies:
- type: "ExponentialBackoffStrategy"
factor: 5
jitter_range_in_seconds: 15
""",
ExponentialBackoffStrategy,
15,
id="exponential_backoff_strategy",
),
],
)
def test_create_requester_with_backoff_jitter(
backoff_strategy_yaml, expected_backoff_strategy_type, expected_jitter_range
):
content = f"""
requester:
type: HttpRequester
path: "/v3/marketing/lists"
url_base: "https://api.sendgrid.com"
{backoff_strategy_yaml}
"""
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
requester_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["requester"], {}
)

requester = factory.create_component(
model_type=HttpRequesterModel,
component_definition=requester_manifest,
config=input_config,
name="name",
decoder=None,
)

assert isinstance(requester.error_handler, DefaultErrorHandler)
assert len(requester.error_handler.backoff_strategies) == 1
backoff_strategy = requester.error_handler.backoff_strategies[0]
assert isinstance(backoff_strategy, expected_backoff_strategy_type)
assert backoff_strategy.jitter_range_in_seconds == expected_jitter_range


@pytest.mark.parametrize(
"backoff_strategy_model, backoff_strategy_arguments",
[
pytest.param(
ConstantBackoffStrategyModel,
{
"type": "ConstantBackoffStrategy",
"backoff_time_in_seconds": 60,
},
id="constant_backoff_strategy",
),
pytest.param(
ExponentialBackoffStrategyModel,
{
"type": "ExponentialBackoffStrategy",
"factor": 5,
},
id="exponential_backoff_strategy",
),
],
)
def test_backoff_jitter_schema_validation(backoff_strategy_model, backoff_strategy_arguments):
backoff_strategy_model(**backoff_strategy_arguments, jitter_range_in_seconds=0)

with pytest.raises(ValidationError, match="jitter_range_in_seconds"):
backoff_strategy_model(
**backoff_strategy_arguments,
jitter_range_in_seconds="{{ config['backoff_jitter'] }}",
)

with pytest.raises(ValidationError, match="jitter_range_in_seconds"):
backoff_strategy_model(**backoff_strategy_arguments, jitter_range_in_seconds=-1)


@pytest.mark.parametrize(
"backoff_strategy_model",
[
pytest.param(
ConstantBackoffStrategyModel(
type="ConstantBackoffStrategy", backoff_time_in_seconds=60
),
id="constant_backoff_strategy",
),
pytest.param(
ExponentialBackoffStrategyModel(type="ExponentialBackoffStrategy", factor=5),
id="exponential_backoff_strategy",
),
],
)
def test_create_backoff_strategy_with_negative_jitter_raises_error(backoff_strategy_model):
# Verify factory validation catches negative jitter even if Pydantic validation is bypassed.
backoff_strategy_model.__dict__["jitter_range_in_seconds"] = -1

with pytest.raises(ValueError, match="jitter_range_in_seconds"):
factory._create_component_from_model(backoff_strategy_model, config=input_config)


def test_create_request_with_legacy_session_authenticator():
content = """
requester:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,67 @@


@pytest.mark.parametrize(
"test_name, attempt_count, backofftime, expected_backoff_time",
"test_name, attempt_count, backofftime, jitter_range, expected_backoff_time",
[
("test_constant_backoff_first_attempt", 1, BACKOFF_TIME, BACKOFF_TIME),
("test_constant_backoff_first_attempt_float", 1, 6.7, 6.7),
("test_constant_backoff_attempt_round_float", 1.0, 6.7, 6.7),
("test_constant_backoff_attempt_round_float", 1.5, 6.7, 6.7),
("test_constant_backoff_first_attempt_round_float", 1, 10.0, BACKOFF_TIME),
("test_constant_backoff_second_attempt_round_float", 2, 10.0, BACKOFF_TIME),
("test_constant_backoff_first_attempt", 1, BACKOFF_TIME, None, BACKOFF_TIME),
("test_constant_backoff_first_attempt_float", 1, 6.7, None, 6.7),
("test_constant_backoff_attempt_round_float", 1.0, 6.7, None, 6.7),
("test_constant_backoff_attempt_round_float", 1.5, 6.7, None, 6.7),
("test_constant_backoff_first_attempt_round_float", 1, 10.0, None, BACKOFF_TIME),
("test_constant_backoff_second_attempt_round_float", 2, 10.0, None, BACKOFF_TIME),
("test_constant_backoff_zero_jitter", 2, BACKOFF_TIME, 0, BACKOFF_TIME),
(
"test_constant_backoff_from_parameters",
1,
"{{ parameters['backoff'] }}",
None,
PARAMETERS_BACKOFF_TIME,
),
("test_constant_backoff_from_config", 1, "{{ config['backoff'] }}", CONFIG_BACKOFF_TIME),
(
"test_constant_backoff_from_config",
1,
"{{ config['backoff'] }}",
None,
CONFIG_BACKOFF_TIME,
),
],
)
def test_constant_backoff(test_name, attempt_count, backofftime, expected_backoff_time):
def test_constant_backoff(
test_name, attempt_count, backofftime, jitter_range, expected_backoff_time
):
response_mock = MagicMock()
backoff_strategy = ConstantBackoffStrategy(
parameters={"backoff": PARAMETERS_BACKOFF_TIME},
backoff_time_in_seconds=backofftime,
config={"backoff": CONFIG_BACKOFF_TIME},
jitter_range_in_seconds=jitter_range,
config={"backoff": CONFIG_BACKOFF_TIME, "jitter": 0},
)
backoff = backoff_strategy.backoff_time(response_mock, attempt_count=attempt_count)
assert backoff == expected_backoff_time


@pytest.mark.parametrize(
"backofftime, jitter_range, expected_lower_bound, expected_upper_bound",
[
pytest.param(60, 15, 60, 90, id="base_backoff_floor"),
pytest.param(10, 30, 10, 70, id="large_jitter"),
pytest.param(0, 5, 0, 10, id="zero_base"),
],
)
def test_constant_backoff_with_jitter_bounds(
backofftime, jitter_range, expected_lower_bound, expected_upper_bound
):
response_mock = MagicMock()
backoff_strategy = ConstantBackoffStrategy(
parameters={},
backoff_time_in_seconds=backofftime,
jitter_range_in_seconds=jitter_range,
config={},
)

backoff_times = [
backoff_strategy.backoff_time(response_mock, attempt_count=1) for _ in range(2000)
]

assert all(expected_lower_bound <= backoff <= expected_upper_bound for backoff in backoff_times)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
assert len(set(backoff_times)) > 1
Loading
Loading