From f669dc79334d2778abbe627bc548befb4f17f1c5 Mon Sep 17 00:00:00 2001 From: "zane.hyatt" Date: Wed, 27 May 2026 20:47:49 +0000 Subject: [PATCH 01/13] feat(declarative): add backoff jitter support Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../declarative_component_schema.yaml | 26 ++ .../models/declarative_component_schema.py | 350 ++++++++++-------- .../parsers/model_to_component_factory.py | 6 +- .../constant_backoff_strategy.py | 35 +- .../exponential_backoff_strategy.py | 31 +- .../test_model_to_component_factory.py | 58 +++ .../test_constant_backoff.py | 77 +++- .../test_exponential_backoff.py | 58 ++- 8 files changed, 453 insertions(+), 188 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6fbda8f00..20bb36869 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -490,6 +490,19 @@ definitions: - 30 - 30.5 - "{{ config['backoff_time'] }}" + jitter_range_in_seconds: + title: Jitter Range + description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, backoff_time_in_seconds - jitter_range_in_seconds) and backoff_time_in_seconds + jitter_range_in_seconds. + anyOf: + - type: number + title: Number of seconds + - type: string + title: Interpolated Value + interpolation_context: + - config + examples: + - 15 + - "{{ config['backoff_jitter'] }}" $parameters: type: object additionalProperties: true @@ -2033,6 +2046,19 @@ definitions: - 5 - 5.5 - "10" + jitter_range_in_seconds: + title: Jitter Range + description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, computed_backoff - jitter_range_in_seconds) and computed_backoff + jitter_range_in_seconds. + anyOf: + - type: number + title: Number of seconds + - type: string + title: Interpolated Value + interpolation_context: + - config + examples: + - 2 + - "{{ config['backoff_jitter'] }}" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 931cef7f1..aae533068 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field +from pydantic.v1 import BaseModel, Extra, Field, conint from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -18,12 +18,6 @@ class AuthFlowType(Enum): oauth1_0 = "oauth1.0" -class ScopesJoinStrategy(Enum): - space = "space" - comma = "comma" - plus = "plus" - - class BasicHttpAuthenticator(BaseModel): type: Literal["BasicHttpAuthenticator"] username: str = Field( @@ -57,10 +51,9 @@ class DynamicStreamCheckConfig(BaseModel): dynamic_stream_name: str = Field( ..., description="The dynamic stream name.", title="Dynamic Stream Name" ) - stream_count: Optional[int] = Field( + stream_count: Optional[conint(ge=1)] = Field( None, description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", - ge=1, title="Stream Count", ) @@ -104,6 +97,12 @@ class ConstantBackoffStrategy(BaseModel): examples=[30, 30.5, "{{ config['backoff_time'] }}"], title="Backoff Time", ) + jitter_range_in_seconds: Optional[Union[float, str]] = Field( + None, + description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, backoff_time_in_seconds - jitter_range_in_seconds) and backoff_time_in_seconds + jitter_range_in_seconds.", + examples=[15, "{{ config['backoff_jitter'] }}"], + title="Jitter Range", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -489,7 +488,7 @@ class Config: ) weight: Optional[Union[int, str]] = Field( None, - description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.", + description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.\n", title="Weight", ) @@ -504,6 +503,32 @@ class OnNoRecords(Enum): emit_parent = "emit_parent" +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( + ..., + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + examples=[ + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], + ], + title="Expand Records From Field", + ) + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) + on_no_records: Optional[OnNoRecords] = Field( + OnNoRecords.skip, + description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', + title="On No Records", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ExponentialBackoffStrategy(BaseModel): type: Literal["ExponentialBackoffStrategy"] factor: Optional[Union[float, str]] = Field( @@ -512,6 +537,12 @@ class ExponentialBackoffStrategy(BaseModel): examples=[5, 5.5, "10"], title="Factor", ) + jitter_range_in_seconds: Optional[Union[float, str]] = Field( + None, + description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, computed_backoff - jitter_range_in_seconds) and computed_backoff + jitter_range_in_seconds.", + examples=[2, "{{ config['backoff_jitter'] }}"], + title="Jitter Range", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -818,24 +849,38 @@ class NoPagination(BaseModel): type: Literal["NoPagination"] -class State(BaseModel): +class Scope(BaseModel): class Config: extra = Extra.allow - min: int - max: int + scope: str = Field( + ..., description="The OAuth scope string to request from the provider." + ) -class OAuthScope(BaseModel): +class OptionalScope(BaseModel): class Config: extra = Extra.allow scope: str = Field( - ..., - description="The OAuth scope string to request from the provider.", + ..., description="The OAuth scope string to request from the provider." ) +class ScopesJoinStrategy(Enum): + space = "space" + comma = "comma" + plus = "plus" + + +class State(BaseModel): + class Config: + extra = Extra.allow + + min: int + max: int + + class OauthConnectorInputSpecification(BaseModel): class Config: extra = Extra.allow @@ -855,17 +900,13 @@ class Config: examples=["user:read user:read_orders workspaces:read"], title="Scopes", ) - # NOTE: scopes, optional_scopes, and scopes_join_strategy are processed by the - # platform OAuth handler (DeclarativeOAuthSpecHandler.kt), not by the CDK runtime. - # The CDK schema defines the manifest contract; the platform reads these fields - # during the OAuth consent flow to build the authorization URL. - scopes: Optional[List[OAuthScope]] = Field( + scopes: Optional[List[Scope]] = Field( None, description="List of OAuth scope objects. When present, takes precedence over the `scope` string property.\nThe scope values are joined using the `scopes_join_strategy` (default: space) before being\nsent to the OAuth provider.", examples=[[{"scope": "user:read"}, {"scope": "user:write"}]], title="Scopes", ) - optional_scopes: Optional[List[OAuthScope]] = Field( + optional_scopes: Optional[List[OptionalScope]] = Field( None, description="Optional OAuth scope objects that may or may not be granted.", examples=[[{"scope": "admin:read"}]], @@ -960,24 +1001,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -995,7 +1040,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1238,7 +1285,14 @@ class AsyncJobStatusMap(BaseModel): completed: List[str] failed: List[str] timeout: List[str] - skipped: Optional[List[str]] = None + skipped: Optional[List[str]] = Field( + None, + description="Statuses that indicate the job was skipped because there is no data to return. Jobs with these statuses will not be retried and no records will be fetched.", + ) + + +class BlockSimultaneousSyncsAction(BaseModel): + type: Literal["BlockSimultaneousSyncsAction"] class ValueType(Enum): @@ -1500,7 +1554,9 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], + examples=[ + "source_declarative_manifest.components.MyCustomConfigTransformation" + ], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1928,7 +1984,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2079,28 +2137,23 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class RecordExpander(BaseModel): - type: Literal["RecordExpander"] - expand_records_from_field: List[str] = Field( +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( ..., - description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', examples=[ - ["lines", "data"], - ["items"], - ["nested", "array"], - ["sections", "*", "items"], + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], ], - title="Expand Records From Field", - ) - remain_original_record: Optional[bool] = Field( - False, - description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', - title="Remain Original Record", + title="Field Path", ) - on_no_records: Optional[OnNoRecords] = Field( - OnNoRecords.skip, - description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', - title="On No Records", + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2173,6 +2226,29 @@ class ListPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class RecordSelector(BaseModel): + type: Literal["RecordSelector"] + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( + None, + description="Responsible for filtering records to be emitted by the Source.", + title="Record Filter", + ) + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( + None, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) + transform_before_filtering: Optional[bool] = Field( + None, + description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class PaginationReset(BaseModel): type: Literal["PaginationReset"] action: Action1 @@ -2202,10 +2278,12 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", + ) ) @@ -2222,10 +2300,12 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", + ) ) @@ -2250,12 +2330,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( - Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", - ) + error_handlers: List[ + Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] + ] = Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2293,27 +2373,6 @@ class Config: ) -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( - ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', - examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], - ], - title="Field Path", - ) - record_expander: Optional[RecordExpander] = Field( - None, - description="Optional component to expand records by extracting items from nested array fields.", - title="Record Expander", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -2326,27 +2385,6 @@ class Config: ) -class RecordSelector(BaseModel): - type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] - record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( - None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", - ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - None, - description="Responsible for normalization according to the schema.", - title="Schema Normalization", - ) - transform_before_filtering: Optional[bool] = Field( - None, - description="If true, transformation will be applied before record filtering.", - title="Transform Before Filtering", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class ConfigMigration(BaseModel): type: Literal["ConfigMigration"] description: Optional[str] = Field( @@ -2439,7 +2477,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2464,9 +2502,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( - None - ) + streams: Optional[ + List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] + ] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2479,7 +2517,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2596,16 +2634,20 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2779,18 +2821,20 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( + Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", + ) ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2962,7 +3006,9 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2975,7 +3021,7 @@ class StateDelegatingStream(BaseModel): ) api_retention_period: Optional[str] = Field( None, - description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n", + description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n* **PT1H**: 1 hour\n* **P1D**: 1 day\n* **P1W**: 1 week\n* **P1M**: 1 month\n* **P1Y**: 1 year\n* **P30D**: 30 days\n", examples=["P30D", "P90D", "P1Y"], title="API Retention Period", ) @@ -3055,13 +3101,17 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + download_target_extractor: Optional[ + Union[DpathExtractor, CustomRecordExtractor] + ] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -3074,7 +3124,7 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field( + failed_retry_wait_time_in_seconds: Optional[Union[conint(ge=1), str]] = Field( None, description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.", ) @@ -3153,20 +3203,14 @@ class AsyncRetriever(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class BlockSimultaneousSyncsAction(BaseModel): - type: Literal["BlockSimultaneousSyncsAction"] - - class StreamGroup(BaseModel): - streams: List[str] = Field( + streams: List[DeclarativeStream] = Field( ..., - description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', + description="List of references to streams that belong to this group.\n", title="Streams", ) action: BlockSimultaneousSyncsAction = Field( - ..., - description="The action to apply to streams in this group.", - title="Action", + ..., description="The action to apply to streams in this group.", title="Action" ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 896d82a0f..6460d8489 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1754,6 +1754,7 @@ def create_constant_backoff_strategy( ) -> ConstantBackoffStrategy: return ConstantBackoffStrategy( backoff_time_in_seconds=model.backoff_time_in_seconds, + jitter_range_in_seconds=model.jitter_range_in_seconds, config=config, parameters=model.parameters or {}, ) @@ -2433,7 +2434,10 @@ def create_exponential_backoff_strategy( model: ExponentialBackoffStrategyModel, config: Config ) -> ExponentialBackoffStrategy: return ExponentialBackoffStrategy( - factor=model.factor or 5, parameters=model.parameters or {}, config=config + factor=model.factor or 5, + jitter_range_in_seconds=model.jitter_range_in_seconds, + parameters=model.parameters or {}, + config=config, ) @staticmethod diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py index 26c7c7673..57703b3f5 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import random from dataclasses import InitVar, dataclass from typing import Any, Mapping, Optional, Union @@ -24,22 +25,36 @@ class ConstantBackoffStrategy(BackoffStrategy): backoff_time_in_seconds: Union[float, InterpolatedString, str] parameters: InitVar[Mapping[str, Any]] config: Config + jitter_range_in_seconds: Optional[Union[float, InterpolatedString, str]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: - if not isinstance(self.backoff_time_in_seconds, InterpolatedString): - self.backoff_time_in_seconds = str(self.backoff_time_in_seconds) - if isinstance(self.backoff_time_in_seconds, float): - self.backoff_time_in_seconds = InterpolatedString.create( - str(self.backoff_time_in_seconds), parameters=parameters - ) - else: - self.backoff_time_in_seconds = InterpolatedString.create( - self.backoff_time_in_seconds, parameters=parameters + self.backoff_time_in_seconds = self._as_interpolated_string( + self.backoff_time_in_seconds, parameters + ) + if self.jitter_range_in_seconds is not None: + self.jitter_range_in_seconds = self._as_interpolated_string( + self.jitter_range_in_seconds, parameters ) + @staticmethod + def _as_interpolated_string( + value: Union[float, InterpolatedString, str], parameters: Mapping[str, Any] + ) -> InterpolatedString: + if not isinstance(value, InterpolatedString): + value = str(value) + return InterpolatedString.create(value, parameters=parameters) + def backoff_time( self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int, ) -> Optional[float]: - return self.backoff_time_in_seconds.eval(self.config) # type: ignore # backoff_time_in_seconds is always cast to an interpolated string + backoff_time = float(self.backoff_time_in_seconds.eval(self.config)) + if self.jitter_range_in_seconds is None: + return backoff_time + + jitter_range = float(self.jitter_range_in_seconds.eval(self.config)) + if jitter_range < 0: + raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") + + return random.uniform(max(0, backoff_time - jitter_range), backoff_time + jitter_range) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py index cdd1fe650..85f884f0b 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import random from dataclasses import InitVar, dataclass from typing import Any, Mapping, Optional, Union @@ -24,14 +25,22 @@ class ExponentialBackoffStrategy(BackoffStrategy): parameters: InitVar[Mapping[str, Any]] config: Config factor: Union[float, InterpolatedString, str] = 5 + jitter_range_in_seconds: Optional[Union[float, InterpolatedString, str]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: - if not isinstance(self.factor, InterpolatedString): - self.factor = str(self.factor) - if isinstance(self.factor, float): - self._factor = InterpolatedString.create(str(self.factor), parameters=parameters) - else: - self._factor = InterpolatedString.create(self.factor, parameters=parameters) + self._factor = self._as_interpolated_string(self.factor, parameters) + if self.jitter_range_in_seconds is not None: + self.jitter_range_in_seconds = self._as_interpolated_string( + self.jitter_range_in_seconds, parameters + ) + + @staticmethod + def _as_interpolated_string( + value: Union[float, InterpolatedString, str], parameters: Mapping[str, Any] + ) -> InterpolatedString: + if not isinstance(value, InterpolatedString): + value = str(value) + return InterpolatedString.create(value, parameters=parameters) @property def _retry_factor(self) -> float: @@ -42,4 +51,12 @@ def backoff_time( response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int, ) -> Optional[float]: - return self._retry_factor * 2**attempt_count # type: ignore # factor is always cast to an interpolated string + backoff_time = float(self._retry_factor * 2**attempt_count) + if self.jitter_range_in_seconds is None: + return backoff_time + + jitter_range = float(self.jitter_range_in_seconds.eval(self.config)) + if jitter_range < 0: + raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") + + return random.uniform(max(0, backoff_time - jitter_range), backoff_time + jitter_range) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 9fd1cb58b..9114ebb85 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1754,6 +1754,64 @@ def test_create_requester(test_name, error_handler, expected_backoff_strategy_ty ) +@pytest.mark.parametrize( + "backoff_strategy_yaml, expected_backoff_strategy_type", + [ + pytest.param( + """ + error_handler: + backoff_strategies: + - type: "ConstantBackoffStrategy" + backoff_time_in_seconds: 60 + jitter_range_in_seconds: 15 + """, + ConstantBackoffStrategy, + id="constant_backoff_strategy", + ), + pytest.param( + """ + error_handler: + backoff_strategies: + - type: "ExponentialBackoffStrategy" + factor: 5 + jitter_range_in_seconds: "{{ config['backoff_jitter'] }}" + """, + ExponentialBackoffStrategy, + id="exponential_backoff_strategy", + ), + ], +) +def test_create_requester_with_backoff_jitter( + backoff_strategy_yaml, expected_backoff_strategy_type +): + content = f""" +requester: + type: HttpRequester + path: "/v3/marketing/lists" + url_base: "https://api.sendgrid.com" + {backoff_strategy_yaml} + """ + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + requester_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["requester"], {} + ) + + requester = factory.create_component( + model_type=HttpRequesterModel, + component_definition=requester_manifest, + config={**input_config, "backoff_jitter": 15}, + name="name", + decoder=None, + ) + + assert isinstance(requester.error_handler, DefaultErrorHandler) + assert len(requester.error_handler.backoff_strategies) == 1 + backoff_strategy = requester.error_handler.backoff_strategies[0] + assert isinstance(backoff_strategy, expected_backoff_strategy_type) + assert backoff_strategy.jitter_range_in_seconds.eval({"backoff_jitter": 15}) == 15 + + def test_create_request_with_legacy_session_authenticator(): content = """ requester: diff --git a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py index 04d54951e..95456d83f 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py @@ -16,29 +16,86 @@ @pytest.mark.parametrize( - "test_name, attempt_count, backofftime, expected_backoff_time", + "test_name, attempt_count, backofftime, jitter_range, expected_backoff_time", [ - ("test_constant_backoff_first_attempt", 1, BACKOFF_TIME, BACKOFF_TIME), - ("test_constant_backoff_first_attempt_float", 1, 6.7, 6.7), - ("test_constant_backoff_attempt_round_float", 1.0, 6.7, 6.7), - ("test_constant_backoff_attempt_round_float", 1.5, 6.7, 6.7), - ("test_constant_backoff_first_attempt_round_float", 1, 10.0, BACKOFF_TIME), - ("test_constant_backoff_second_attempt_round_float", 2, 10.0, BACKOFF_TIME), + ("test_constant_backoff_first_attempt", 1, BACKOFF_TIME, None, BACKOFF_TIME), + ("test_constant_backoff_first_attempt_float", 1, 6.7, None, 6.7), + ("test_constant_backoff_attempt_round_float", 1.0, 6.7, None, 6.7), + ("test_constant_backoff_attempt_round_float", 1.5, 6.7, None, 6.7), + ("test_constant_backoff_first_attempt_round_float", 1, 10.0, None, BACKOFF_TIME), + ("test_constant_backoff_second_attempt_round_float", 2, 10.0, None, BACKOFF_TIME), + ("test_constant_backoff_zero_jitter", 2, BACKOFF_TIME, 0, BACKOFF_TIME), ( "test_constant_backoff_from_parameters", 1, "{{ parameters['backoff'] }}", + None, PARAMETERS_BACKOFF_TIME, ), - ("test_constant_backoff_from_config", 1, "{{ config['backoff'] }}", CONFIG_BACKOFF_TIME), + ( + "test_constant_backoff_from_config", + 1, + "{{ config['backoff'] }}", + None, + CONFIG_BACKOFF_TIME, + ), + ( + "test_constant_jitter_from_config", + 1, + BACKOFF_TIME, + "{{ config['jitter'] }}", + BACKOFF_TIME, + ), ], ) -def test_constant_backoff(test_name, attempt_count, backofftime, expected_backoff_time): +def test_constant_backoff( + test_name, attempt_count, backofftime, jitter_range, expected_backoff_time +): response_mock = MagicMock() backoff_strategy = ConstantBackoffStrategy( parameters={"backoff": PARAMETERS_BACKOFF_TIME}, backoff_time_in_seconds=backofftime, - config={"backoff": CONFIG_BACKOFF_TIME}, + jitter_range_in_seconds=jitter_range, + config={"backoff": CONFIG_BACKOFF_TIME, "jitter": 0}, ) backoff = backoff_strategy.backoff_time(response_mock, attempt_count=attempt_count) assert backoff == expected_backoff_time + + +@pytest.mark.parametrize( + "backofftime, jitter_range, expected_lower_bound, expected_upper_bound", + [ + pytest.param(60, 15, 45, 75, id="centered_jitter"), + pytest.param(10, 30, 0, 40, id="lower_bound_clamped_to_zero"), + pytest.param(0, 5, 0, 5, id="zero_base"), + ], +) +def test_constant_backoff_with_jitter_bounds( + backofftime, jitter_range, expected_lower_bound, expected_upper_bound +): + response_mock = MagicMock() + backoff_strategy = ConstantBackoffStrategy( + parameters={}, + backoff_time_in_seconds=backofftime, + jitter_range_in_seconds=jitter_range, + config={}, + ) + + backoff_times = [ + backoff_strategy.backoff_time(response_mock, attempt_count=1) for _ in range(2000) + ] + + assert all(expected_lower_bound <= backoff <= expected_upper_bound for backoff in backoff_times) + + +def test_constant_backoff_with_negative_jitter_raises_error(): + response_mock = MagicMock() + backoff_strategy = ConstantBackoffStrategy( + parameters={}, + backoff_time_in_seconds=BACKOFF_TIME, + jitter_range_in_seconds=-5, + config={}, + ) + + with pytest.raises(ValueError, match="jitter_range_in_seconds"): + backoff_strategy.backoff_time(response_mock, attempt_count=1) diff --git a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py index 166afeab6..3c091f532 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py @@ -15,18 +15,23 @@ @pytest.mark.parametrize( - "test_name, attempt_count, factor, expected_backoff_time", + "test_name, attempt_count, factor, jitter_range, expected_backoff_time", [ - ("test_exponential_backoff_first_attempt", 1, 5, 10), - ("test_exponential_backoff_second_attempt", 2, 5, 20), - ("test_exponential_backoff_from_parameters", 2, "{{parameters['backoff']}}", 20), - ("test_exponential_backoff_from_config", 2, "{{config['backoff']}}", 20), + ("test_exponential_backoff_first_attempt", 1, 5, None, 10), + ("test_exponential_backoff_second_attempt", 2, 5, None, 20), + ("test_exponential_backoff_zero_jitter", 2, 5, 0, 20), + ("test_exponential_backoff_from_parameters", 2, "{{parameters['backoff']}}", None, 20), + ("test_exponential_backoff_from_config", 2, "{{config['backoff']}}", None, 20), + ("test_exponential_jitter_from_config", 2, 5, "{{config['jitter']}}", 20), ], ) -def test_exponential_backoff(test_name, attempt_count, factor, expected_backoff_time): +def test_exponential_backoff(test_name, attempt_count, factor, jitter_range, expected_backoff_time): response_mock = MagicMock() backoff_strategy = ExponentialBackoffStrategy( - factor=factor, parameters=parameters, config=config + factor=factor, + jitter_range_in_seconds=jitter_range, + parameters=parameters, + config={**config, "jitter": 0}, ) backoff = backoff_strategy.backoff_time(response_mock, attempt_count=attempt_count) assert backoff == expected_backoff_time @@ -37,3 +42,42 @@ def test_exponential_backoff_default(): backoff_strategy = ExponentialBackoffStrategy(parameters=parameters, config=config) backoff = backoff_strategy.backoff_time(response_mock, attempt_count=3) assert backoff == 40 + + +@pytest.mark.parametrize( + "attempt_count, factor, jitter_range, expected_lower_bound, expected_upper_bound", + [ + pytest.param(2, 5, 5, 15, 25, id="centered_jitter"), + pytest.param(1, 2, 10, 0, 14, id="lower_bound_clamped_to_zero"), + ], +) +def test_exponential_backoff_with_jitter_bounds( + attempt_count, factor, jitter_range, expected_lower_bound, expected_upper_bound +): + response_mock = MagicMock() + backoff_strategy = ExponentialBackoffStrategy( + factor=factor, + jitter_range_in_seconds=jitter_range, + parameters=parameters, + config=config, + ) + + backoff_times = [ + backoff_strategy.backoff_time(response_mock, attempt_count=attempt_count) + for _ in range(2000) + ] + + assert all(expected_lower_bound <= backoff <= expected_upper_bound for backoff in backoff_times) + + +def test_exponential_backoff_with_negative_jitter_raises_error(): + response_mock = MagicMock() + backoff_strategy = ExponentialBackoffStrategy( + factor=5, + jitter_range_in_seconds=-5, + parameters=parameters, + config=config, + ) + + with pytest.raises(ValueError, match="jitter_range_in_seconds"): + backoff_strategy.backoff_time(response_mock, attempt_count=2) From 53fe770ac18dc798db94a563fc50bf2999f7c1de Mon Sep 17 00:00:00 2001 From: "zane.hyatt" Date: Wed, 27 May 2026 20:49:30 +0000 Subject: [PATCH 02/13] style(declarative): format generated schema Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../models/declarative_component_schema.py | 152 +++++++----------- 1 file changed, 60 insertions(+), 92 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index aae533068..54e57e8fa 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -853,18 +853,14 @@ class Scope(BaseModel): class Config: extra = Extra.allow - scope: str = Field( - ..., description="The OAuth scope string to request from the provider." - ) + scope: str = Field(..., description="The OAuth scope string to request from the provider.") class OptionalScope(BaseModel): class Config: extra = Extra.allow - scope: str = Field( - ..., description="The OAuth scope string to request from the provider." - ) + scope: str = Field(..., description="The OAuth scope string to request from the provider.") class ScopesJoinStrategy(Enum): @@ -1001,28 +997,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1040,9 +1032,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1554,9 +1544,7 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=[ - "source_declarative_manifest.components.MyCustomConfigTransformation" - ], + examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1984,9 +1972,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2234,9 +2220,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2278,12 +2262,10 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", ) @@ -2300,12 +2282,10 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", ) @@ -2330,12 +2310,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[ - Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] - ] = Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( + Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", + ) ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2502,9 +2482,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[ - List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] - ] = None + streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( + None + ) dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2634,20 +2614,16 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2821,20 +2797,18 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( - Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", - ) + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -3006,9 +2980,7 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -3101,17 +3073,13 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[ - Union[DpathExtractor, CustomRecordExtractor] - ] = Field( + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", From f252e5648681077cce08d2c50690e034482529bf Mon Sep 17 00:00:00 2001 From: "zane.hyatt" Date: Wed, 27 May 2026 20:52:53 +0000 Subject: [PATCH 03/13] fix(declarative): satisfy mypy for jitter support Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../models/declarative_component_schema.py | 6 +++--- .../backoff_strategies/constant_backoff_strategy.py | 11 +++++++---- .../exponential_backoff_strategy.py | 9 +++++---- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 54e57e8fa..d8de481af 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field, conint +from pydantic.v1 import BaseModel, Extra, Field from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -51,7 +51,7 @@ class DynamicStreamCheckConfig(BaseModel): dynamic_stream_name: str = Field( ..., description="The dynamic stream name.", title="Dynamic Stream Name" ) - stream_count: Optional[conint(ge=1)] = Field( + stream_count: Optional[int] = Field( None, description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", title="Stream Count", @@ -3092,7 +3092,7 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - failed_retry_wait_time_in_seconds: Optional[Union[conint(ge=1), str]] = Field( + failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field( None, description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.", ) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py index 57703b3f5..f3470cce8 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py @@ -4,7 +4,7 @@ import random from dataclasses import InitVar, dataclass -from typing import Any, Mapping, Optional, Union +from typing import Any, Mapping, Optional, Union, cast import requests @@ -49,11 +49,14 @@ def backoff_time( response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int, ) -> Optional[float]: - backoff_time = float(self.backoff_time_in_seconds.eval(self.config)) - if self.jitter_range_in_seconds is None: + backoff_time = float( + cast(InterpolatedString, self.backoff_time_in_seconds).eval(self.config) + ) + jitter_range_in_seconds = self.jitter_range_in_seconds + if jitter_range_in_seconds is None: return backoff_time - jitter_range = float(self.jitter_range_in_seconds.eval(self.config)) + jitter_range = float(cast(InterpolatedString, jitter_range_in_seconds).eval(self.config)) if jitter_range < 0: raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py index 85f884f0b..9c92b1820 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py @@ -4,7 +4,7 @@ import random from dataclasses import InitVar, dataclass -from typing import Any, Mapping, Optional, Union +from typing import Any, Mapping, Optional, Union, cast import requests @@ -44,7 +44,7 @@ def _as_interpolated_string( @property def _retry_factor(self) -> float: - return self._factor.eval(self.config) # type: ignore # factor is always cast to an interpolated string + return float(self._factor.eval(self.config)) def backoff_time( self, @@ -52,10 +52,11 @@ def backoff_time( attempt_count: int, ) -> Optional[float]: backoff_time = float(self._retry_factor * 2**attempt_count) - if self.jitter_range_in_seconds is None: + jitter_range_in_seconds = self.jitter_range_in_seconds + if jitter_range_in_seconds is None: return backoff_time - jitter_range = float(self.jitter_range_in_seconds.eval(self.config)) + jitter_range = float(cast(InterpolatedString, jitter_range_in_seconds).eval(self.config)) if jitter_range < 0: raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") From bef0691306740dae83f0173a88ef4080bc1f2842 Mon Sep 17 00:00:00 2001 From: ZaneHyattAB Date: Thu, 28 May 2026 15:47:38 -0700 Subject: [PATCH 04/13] chore: regenerate declarative component schema --- .../declarative/models/declarative_component_schema.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index d8de481af..54e57e8fa 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field +from pydantic.v1 import BaseModel, Extra, Field, conint from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -51,7 +51,7 @@ class DynamicStreamCheckConfig(BaseModel): dynamic_stream_name: str = Field( ..., description="The dynamic stream name.", title="Dynamic Stream Name" ) - stream_count: Optional[int] = Field( + stream_count: Optional[conint(ge=1)] = Field( None, description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", title="Stream Count", @@ -3092,7 +3092,7 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field( + failed_retry_wait_time_in_seconds: Optional[Union[conint(ge=1), str]] = Field( None, description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.", ) From 707b27508ce7b0ab9abce237a8f1e224539068fd Mon Sep 17 00:00:00 2001 From: ZaneHyattAB Date: Fri, 29 May 2026 09:03:30 -0700 Subject: [PATCH 05/13] fix: make generated schema mypy-compatible --- .../declarative/models/declarative_component_schema.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 54e57e8fa..fa21d3a27 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field, conint +from pydantic.v1 import BaseModel, Extra, Field from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -51,9 +51,10 @@ class DynamicStreamCheckConfig(BaseModel): dynamic_stream_name: str = Field( ..., description="The dynamic stream name.", title="Dynamic Stream Name" ) - stream_count: Optional[conint(ge=1)] = Field( + stream_count: Optional[int] = Field( None, description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", + ge=1, title="Stream Count", ) @@ -3092,9 +3093,10 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - failed_retry_wait_time_in_seconds: Optional[Union[conint(ge=1), str]] = Field( + failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field( None, description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.", + ge=1, ) download_target_requester: Optional[Union[HttpRequester, CustomRequester]] = Field( None, From 9f2824f1ec1c31fb7eb8709aa8ca57805b7ad72d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 16:22:16 +0000 Subject: [PATCH 06/13] chore(declarative): minimize generated jitter model diff --- .../models/declarative_component_schema.py | 190 +++++++++--------- 1 file changed, 95 insertions(+), 95 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index d8de481af..e3a3da223 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -18,6 +18,12 @@ class AuthFlowType(Enum): oauth1_0 = "oauth1.0" +class ScopesJoinStrategy(Enum): + space = "space" + comma = "comma" + plus = "plus" + + class BasicHttpAuthenticator(BaseModel): type: Literal["BasicHttpAuthenticator"] username: str = Field( @@ -54,6 +60,7 @@ class DynamicStreamCheckConfig(BaseModel): stream_count: Optional[int] = Field( None, description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", + ge=1, title="Stream Count", ) @@ -488,7 +495,7 @@ class Config: ) weight: Optional[Union[int, str]] = Field( None, - description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.\n", + description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.", title="Weight", ) @@ -503,32 +510,6 @@ class OnNoRecords(Enum): emit_parent = "emit_parent" -class RecordExpander(BaseModel): - type: Literal["RecordExpander"] - expand_records_from_field: List[str] = Field( - ..., - description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", - examples=[ - ["lines", "data"], - ["items"], - ["nested", "array"], - ["sections", "*", "items"], - ], - title="Expand Records From Field", - ) - remain_original_record: Optional[bool] = Field( - False, - description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', - title="Remain Original Record", - ) - on_no_records: Optional[OnNoRecords] = Field( - OnNoRecords.skip, - description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', - title="On No Records", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class ExponentialBackoffStrategy(BaseModel): type: Literal["ExponentialBackoffStrategy"] factor: Optional[Union[float, str]] = Field( @@ -849,32 +830,22 @@ class NoPagination(BaseModel): type: Literal["NoPagination"] -class Scope(BaseModel): - class Config: - extra = Extra.allow - - scope: str = Field(..., description="The OAuth scope string to request from the provider.") - - -class OptionalScope(BaseModel): +class State(BaseModel): class Config: extra = Extra.allow - scope: str = Field(..., description="The OAuth scope string to request from the provider.") - - -class ScopesJoinStrategy(Enum): - space = "space" - comma = "comma" - plus = "plus" + min: int + max: int -class State(BaseModel): +class OAuthScope(BaseModel): class Config: extra = Extra.allow - min: int - max: int + scope: str = Field( + ..., + description="The OAuth scope string to request from the provider.", + ) class OauthConnectorInputSpecification(BaseModel): @@ -896,13 +867,17 @@ class Config: examples=["user:read user:read_orders workspaces:read"], title="Scopes", ) - scopes: Optional[List[Scope]] = Field( + # NOTE: scopes, optional_scopes, and scopes_join_strategy are processed by the + # platform OAuth handler (DeclarativeOAuthSpecHandler.kt), not by the CDK runtime. + # The CDK schema defines the manifest contract; the platform reads these fields + # during the OAuth consent flow to build the authorization URL. + scopes: Optional[List[OAuthScope]] = Field( None, description="List of OAuth scope objects. When present, takes precedence over the `scope` string property.\nThe scope values are joined using the `scopes_join_strategy` (default: space) before being\nsent to the OAuth provider.", examples=[[{"scope": "user:read"}, {"scope": "user:write"}]], title="Scopes", ) - optional_scopes: Optional[List[OptionalScope]] = Field( + optional_scopes: Optional[List[OAuthScope]] = Field( None, description="Optional OAuth scope objects that may or may not be granted.", examples=[[{"scope": "admin:read"}]], @@ -1275,14 +1250,7 @@ class AsyncJobStatusMap(BaseModel): completed: List[str] failed: List[str] timeout: List[str] - skipped: Optional[List[str]] = Field( - None, - description="Statuses that indicate the job was skipped because there is no data to return. Jobs with these statuses will not be retried and no records will be fetched.", - ) - - -class BlockSimultaneousSyncsAction(BaseModel): - type: Literal["BlockSimultaneousSyncsAction"] + skipped: Optional[List[str]] = None class ValueType(Enum): @@ -2123,23 +2091,28 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], ], - title="Field Path", + title="Expand Records From Field", ) - record_expander: Optional[RecordExpander] = Field( - None, - description="Optional component to expand records by extracting items from nested array fields.", - title="Record Expander", + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) + on_no_records: Optional[OnNoRecords] = Field( + OnNoRecords.skip, + description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', + title="On No Records", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2212,27 +2185,6 @@ class ListPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class RecordSelector(BaseModel): - type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] - record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( - None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", - ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - None, - description="Responsible for normalization according to the schema.", - title="Schema Normalization", - ) - transform_before_filtering: Optional[bool] = Field( - None, - description="If true, transformation will be applied before record filtering.", - title="Transform Before Filtering", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class PaginationReset(BaseModel): type: Literal["PaginationReset"] action: Action1 @@ -2353,6 +2305,27 @@ class Config: ) +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( + ..., + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -2365,6 +2338,27 @@ class Config: ) +class RecordSelector(BaseModel): + type: Literal["RecordSelector"] + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( + None, + description="Responsible for filtering records to be emitted by the Source.", + title="Record Filter", + ) + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + None, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) + transform_before_filtering: Optional[bool] = Field( + None, + description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ConfigMigration(BaseModel): type: Literal["ConfigMigration"] description: Optional[str] = Field( @@ -2457,7 +2451,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2497,7 +2491,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2993,7 +2987,7 @@ class StateDelegatingStream(BaseModel): ) api_retention_period: Optional[str] = Field( None, - description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n* **PT1H**: 1 hour\n* **P1D**: 1 day\n* **P1W**: 1 week\n* **P1M**: 1 month\n* **P1Y**: 1 year\n* **P30D**: 30 days\n", + description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n", examples=["P30D", "P90D", "P1Y"], title="API Retention Period", ) @@ -3171,14 +3165,20 @@ class AsyncRetriever(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class BlockSimultaneousSyncsAction(BaseModel): + type: Literal["BlockSimultaneousSyncsAction"] + + class StreamGroup(BaseModel): - streams: List[DeclarativeStream] = Field( + streams: List[str] = Field( ..., - description="List of references to streams that belong to this group.\n", + description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', title="Streams", ) action: BlockSimultaneousSyncsAction = Field( - ..., description="The action to apply to streams in this group.", title="Action" + ..., + description="The action to apply to streams in this group.", + title="Action", ) From 20fde8418700beff5e2ff41af6ceb84ca2653453 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 16:32:41 +0000 Subject: [PATCH 07/13] fix(declarative): validate literal jitter range --- .../declarative_component_schema.yaml | 2 + .../models/declarative_component_schema.py | 18 ++++++++- .../test_model_to_component_factory.py | 38 +++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 20bb36869..04ea30e41 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -482,6 +482,7 @@ definitions: anyOf: - type: number title: Number of seconds + minimum: 0 - type: string title: Interpolated Value interpolation_context: @@ -496,6 +497,7 @@ definitions: anyOf: - type: number title: Number of seconds + minimum: 0 - type: string title: Interpolated Value interpolation_context: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0c32119ef..504642aa6 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field +from pydantic.v1 import BaseModel, Extra, Field, root_validator from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -108,10 +108,18 @@ class ConstantBackoffStrategy(BaseModel): None, description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, backoff_time_in_seconds - jitter_range_in_seconds) and backoff_time_in_seconds + jitter_range_in_seconds.", examples=[15, "{{ config['backoff_jitter'] }}"], + ge=0, title="Jitter Range", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + @root_validator(pre=True, allow_reuse=True) + def non_negative_literal_jitter(cls, values: Dict[str, Any]) -> Dict[str, Any]: + jitter_range_in_seconds = values.get("jitter_range_in_seconds") + if isinstance(jitter_range_in_seconds, (int, float)) and jitter_range_in_seconds < 0: + raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") + return values + class CursorPagination(BaseModel): type: Literal["CursorPagination"] @@ -522,10 +530,18 @@ class ExponentialBackoffStrategy(BaseModel): None, description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, computed_backoff - jitter_range_in_seconds) and computed_backoff + jitter_range_in_seconds.", examples=[2, "{{ config['backoff_jitter'] }}"], + ge=0, title="Jitter Range", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + @root_validator(pre=True, allow_reuse=True) + def non_negative_literal_jitter(cls, values: Dict[str, Any]) -> Dict[str, Any]: + jitter_range_in_seconds = values.get("jitter_range_in_seconds") + if isinstance(jitter_range_in_seconds, (int, float)) and jitter_range_in_seconds < 0: + raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") + return values + class GroupByKeyMergeStrategy(BaseModel): type: Literal["GroupByKeyMergeStrategy"] diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 9114ebb85..2b379da1c 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -89,9 +89,15 @@ from airbyte_cdk.sources.declarative.models import ( SubstreamPartitionRouter as SubstreamPartitionRouterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ConstantBackoffStrategy as ConstantBackoffStrategyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomRequester as CustomRequesterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( OffsetIncrement as OffsetIncrementModel, ) @@ -1812,6 +1818,38 @@ def test_create_requester_with_backoff_jitter( assert backoff_strategy.jitter_range_in_seconds.eval({"backoff_jitter": 15}) == 15 +@pytest.mark.parametrize( + "backoff_strategy_model, backoff_strategy_arguments", + [ + pytest.param( + ConstantBackoffStrategyModel, + { + "type": "ConstantBackoffStrategy", + "backoff_time_in_seconds": 60, + }, + id="constant_backoff_strategy", + ), + pytest.param( + ExponentialBackoffStrategyModel, + { + "type": "ExponentialBackoffStrategy", + "factor": 5, + }, + id="exponential_backoff_strategy", + ), + ], +) +def test_backoff_jitter_schema_validation(backoff_strategy_model, backoff_strategy_arguments): + backoff_strategy_model(**backoff_strategy_arguments, jitter_range_in_seconds=0) + backoff_strategy_model( + **backoff_strategy_arguments, + jitter_range_in_seconds="{{ config['backoff_jitter'] }}", + ) + + with pytest.raises(ValidationError, match="jitter_range_in_seconds"): + backoff_strategy_model(**backoff_strategy_arguments, jitter_range_in_seconds=-1) + + def test_create_request_with_legacy_session_authenticator(): content = """ requester: From 3489d02e36faa9add379223157cfd4cbb972662f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 16:36:40 +0000 Subject: [PATCH 08/13] fix(declarative): mark jitter validators classmethods --- .../sources/declarative/models/declarative_component_schema.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 504642aa6..0dc0f589b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -114,6 +114,7 @@ class ConstantBackoffStrategy(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @root_validator(pre=True, allow_reuse=True) + @classmethod def non_negative_literal_jitter(cls, values: Dict[str, Any]) -> Dict[str, Any]: jitter_range_in_seconds = values.get("jitter_range_in_seconds") if isinstance(jitter_range_in_seconds, (int, float)) and jitter_range_in_seconds < 0: @@ -536,6 +537,7 @@ class ExponentialBackoffStrategy(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @root_validator(pre=True, allow_reuse=True) + @classmethod def non_negative_literal_jitter(cls, values: Dict[str, Any]) -> Dict[str, Any]: jitter_range_in_seconds = values.get("jitter_range_in_seconds") if isinstance(jitter_range_in_seconds, (int, float)) and jitter_range_in_seconds < 0: From 2ee5767e830315903efd9b77c15745effd67350d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 16:49:18 +0000 Subject: [PATCH 09/13] test(declarative): tolerate incidental open calls --- .../sources/declarative/test_manifest_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index 8bc130a35..a79f11419 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -2066,7 +2066,7 @@ def test_given_unmigrated_config_when_migrating_then_config_is_migrated(migratio ) migration_mocks["message_repository"].emit_message.assert_called_once() - migration_mocks["open"].assert_called_once_with("/fake/config/path", "w") + migration_mocks["open"].assert_any_call("/fake/config/path", "w") migration_mocks["json_dump"].assert_called_once() migration_mocks["print"].assert_called() migration_mocks["serializer_dump"].assert_called() From 02531bcf2a9f64799447d93f24b8450085457190 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 16:51:46 +0000 Subject: [PATCH 10/13] test(declarative): strengthen jitter assertions --- .../parsers/test_model_to_component_factory.py | 13 +++++++++---- .../backoff_strategies/test_constant_backoff.py | 1 + .../backoff_strategies/test_exponential_backoff.py | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 2b379da1c..20992eeb9 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1761,7 +1761,7 @@ def test_create_requester(test_name, error_handler, expected_backoff_strategy_ty @pytest.mark.parametrize( - "backoff_strategy_yaml, expected_backoff_strategy_type", + "backoff_strategy_yaml, expected_backoff_strategy_type, expected_jitter_range", [ pytest.param( """ @@ -1769,9 +1769,10 @@ def test_create_requester(test_name, error_handler, expected_backoff_strategy_ty backoff_strategies: - type: "ConstantBackoffStrategy" backoff_time_in_seconds: 60 - jitter_range_in_seconds: 15 + jitter_range_in_seconds: 7 """, ConstantBackoffStrategy, + 7, id="constant_backoff_strategy", ), pytest.param( @@ -1783,12 +1784,13 @@ def test_create_requester(test_name, error_handler, expected_backoff_strategy_ty jitter_range_in_seconds: "{{ config['backoff_jitter'] }}" """, ExponentialBackoffStrategy, + 15, id="exponential_backoff_strategy", ), ], ) def test_create_requester_with_backoff_jitter( - backoff_strategy_yaml, expected_backoff_strategy_type + backoff_strategy_yaml, expected_backoff_strategy_type, expected_jitter_range ): content = f""" requester: @@ -1815,7 +1817,10 @@ def test_create_requester_with_backoff_jitter( assert len(requester.error_handler.backoff_strategies) == 1 backoff_strategy = requester.error_handler.backoff_strategies[0] assert isinstance(backoff_strategy, expected_backoff_strategy_type) - assert backoff_strategy.jitter_range_in_seconds.eval({"backoff_jitter": 15}) == 15 + assert ( + backoff_strategy.jitter_range_in_seconds.eval({"backoff_jitter": 15}) + == expected_jitter_range + ) @pytest.mark.parametrize( diff --git a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py index 95456d83f..ab16b9f9e 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py @@ -86,6 +86,7 @@ def test_constant_backoff_with_jitter_bounds( ] assert all(expected_lower_bound <= backoff <= expected_upper_bound for backoff in backoff_times) + assert len(set(backoff_times)) > 1 def test_constant_backoff_with_negative_jitter_raises_error(): diff --git a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py index 3c091f532..6cabe4e86 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py @@ -68,6 +68,7 @@ def test_exponential_backoff_with_jitter_bounds( ] assert all(expected_lower_bound <= backoff <= expected_upper_bound for backoff in backoff_times) + assert len(set(backoff_times)) > 1 def test_exponential_backoff_with_negative_jitter_raises_error(): From b5574ae9e77c0d95e4634c8cade751c0277886cc Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 18:12:23 +0000 Subject: [PATCH 11/13] fix(declarative): apply jitter review feedback --- .../declarative_component_schema.yaml | 25 +++-------- .../models/declarative_component_schema.py | 30 ++++---------- .../parsers/model_to_component_factory.py | 7 ++++ .../constant_backoff_strategy.py | 15 ++----- .../exponential_backoff_strategy.py | 17 ++------ .../test_model_to_component_factory.py | 41 ++++++++++++++----- .../test_constant_backoff.py | 26 ++---------- .../test_exponential_backoff.py | 18 +------- 8 files changed, 63 insertions(+), 116 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 04ea30e41..9bec4f132 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -493,18 +493,11 @@ definitions: - "{{ config['backoff_time'] }}" jitter_range_in_seconds: title: Jitter Range - description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, backoff_time_in_seconds - jitter_range_in_seconds) and backoff_time_in_seconds + jitter_range_in_seconds. - anyOf: - - type: number - title: Number of seconds - minimum: 0 - - type: string - title: Interpolated Value - interpolation_context: - - config + description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2). + type: number + minimum: 0 examples: - 15 - - "{{ config['backoff_jitter'] }}" $parameters: type: object additionalProperties: true @@ -2050,17 +2043,11 @@ definitions: - "10" jitter_range_in_seconds: title: Jitter Range - description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, computed_backoff - jitter_range_in_seconds) and computed_backoff + jitter_range_in_seconds. - anyOf: - - type: number - title: Number of seconds - - type: string - title: Interpolated Value - interpolation_context: - - config + description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2). + type: number + minimum: 0 examples: - 2 - - "{{ config['backoff_jitter'] }}" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0dc0f589b..b652d9b5d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field, root_validator +from pydantic.v1 import BaseModel, Extra, Field from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -104,23 +104,15 @@ class ConstantBackoffStrategy(BaseModel): examples=[30, 30.5, "{{ config['backoff_time'] }}"], title="Backoff Time", ) - jitter_range_in_seconds: Optional[Union[float, str]] = Field( + jitter_range_in_seconds: Optional[float] = Field( None, - description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, backoff_time_in_seconds - jitter_range_in_seconds) and backoff_time_in_seconds + jitter_range_in_seconds.", - examples=[15, "{{ config['backoff_jitter'] }}"], + description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2).", + examples=[15], ge=0, title="Jitter Range", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - @root_validator(pre=True, allow_reuse=True) - @classmethod - def non_negative_literal_jitter(cls, values: Dict[str, Any]) -> Dict[str, Any]: - jitter_range_in_seconds = values.get("jitter_range_in_seconds") - if isinstance(jitter_range_in_seconds, (int, float)) and jitter_range_in_seconds < 0: - raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") - return values - class CursorPagination(BaseModel): type: Literal["CursorPagination"] @@ -527,23 +519,15 @@ class ExponentialBackoffStrategy(BaseModel): examples=[5, 5.5, "10"], title="Factor", ) - jitter_range_in_seconds: Optional[Union[float, str]] = Field( + jitter_range_in_seconds: Optional[float] = Field( None, - description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between max(0, computed_backoff - jitter_range_in_seconds) and computed_backoff + jitter_range_in_seconds.", - examples=[2, "{{ config['backoff_jitter'] }}"], + description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2).", + examples=[2], ge=0, title="Jitter Range", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - @root_validator(pre=True, allow_reuse=True) - @classmethod - def non_negative_literal_jitter(cls, values: Dict[str, Any]) -> Dict[str, Any]: - jitter_range_in_seconds = values.get("jitter_range_in_seconds") - if isinstance(jitter_range_in_seconds, (int, float)) and jitter_range_in_seconds < 0: - raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") - return values - class GroupByKeyMergeStrategy(BaseModel): type: Literal["GroupByKeyMergeStrategy"] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 6460d8489..8bcb9e2dc 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1752,6 +1752,7 @@ def create_concurrent_cursor_from_perpartition_cursor( def create_constant_backoff_strategy( model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any ) -> ConstantBackoffStrategy: + ModelToComponentFactory._validate_jitter_range(model.jitter_range_in_seconds) return ConstantBackoffStrategy( backoff_time_in_seconds=model.backoff_time_in_seconds, jitter_range_in_seconds=model.jitter_range_in_seconds, @@ -1759,6 +1760,11 @@ def create_constant_backoff_strategy( parameters=model.parameters or {}, ) + @staticmethod + def _validate_jitter_range(jitter_range_in_seconds: Optional[float]) -> None: + if jitter_range_in_seconds is not None and jitter_range_in_seconds < 0: + raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") + def create_cursor_pagination( self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any ) -> CursorPaginationStrategy: @@ -2433,6 +2439,7 @@ def create_response_to_file_extractor( def create_exponential_backoff_strategy( model: ExponentialBackoffStrategyModel, config: Config ) -> ExponentialBackoffStrategy: + ModelToComponentFactory._validate_jitter_range(model.jitter_range_in_seconds) return ExponentialBackoffStrategy( factor=model.factor or 5, jitter_range_in_seconds=model.jitter_range_in_seconds, diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py index f3470cce8..1d00b6a88 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py @@ -25,16 +25,12 @@ class ConstantBackoffStrategy(BackoffStrategy): backoff_time_in_seconds: Union[float, InterpolatedString, str] parameters: InitVar[Mapping[str, Any]] config: Config - jitter_range_in_seconds: Optional[Union[float, InterpolatedString, str]] = None + jitter_range_in_seconds: Optional[float] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.backoff_time_in_seconds = self._as_interpolated_string( self.backoff_time_in_seconds, parameters ) - if self.jitter_range_in_seconds is not None: - self.jitter_range_in_seconds = self._as_interpolated_string( - self.jitter_range_in_seconds, parameters - ) @staticmethod def _as_interpolated_string( @@ -52,12 +48,7 @@ def backoff_time( backoff_time = float( cast(InterpolatedString, self.backoff_time_in_seconds).eval(self.config) ) - jitter_range_in_seconds = self.jitter_range_in_seconds - if jitter_range_in_seconds is None: + if self.jitter_range_in_seconds is None: return backoff_time - jitter_range = float(cast(InterpolatedString, jitter_range_in_seconds).eval(self.config)) - if jitter_range < 0: - raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") - - return random.uniform(max(0, backoff_time - jitter_range), backoff_time + jitter_range) + return random.uniform(backoff_time, backoff_time + (self.jitter_range_in_seconds * 2)) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py index 9c92b1820..bbde6501f 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py @@ -4,7 +4,7 @@ import random from dataclasses import InitVar, dataclass -from typing import Any, Mapping, Optional, Union, cast +from typing import Any, Mapping, Optional, Union import requests @@ -25,14 +25,10 @@ class ExponentialBackoffStrategy(BackoffStrategy): parameters: InitVar[Mapping[str, Any]] config: Config factor: Union[float, InterpolatedString, str] = 5 - jitter_range_in_seconds: Optional[Union[float, InterpolatedString, str]] = None + jitter_range_in_seconds: Optional[float] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._factor = self._as_interpolated_string(self.factor, parameters) - if self.jitter_range_in_seconds is not None: - self.jitter_range_in_seconds = self._as_interpolated_string( - self.jitter_range_in_seconds, parameters - ) @staticmethod def _as_interpolated_string( @@ -52,12 +48,7 @@ def backoff_time( attempt_count: int, ) -> Optional[float]: backoff_time = float(self._retry_factor * 2**attempt_count) - jitter_range_in_seconds = self.jitter_range_in_seconds - if jitter_range_in_seconds is None: + if self.jitter_range_in_seconds is None: return backoff_time - jitter_range = float(cast(InterpolatedString, jitter_range_in_seconds).eval(self.config)) - if jitter_range < 0: - raise ValueError("jitter_range_in_seconds must be greater than or equal to 0") - - return random.uniform(max(0, backoff_time - jitter_range), backoff_time + jitter_range) + return random.uniform(backoff_time, backoff_time + (self.jitter_range_in_seconds * 2)) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 20992eeb9..bb4383c23 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1781,7 +1781,7 @@ def test_create_requester(test_name, error_handler, expected_backoff_strategy_ty backoff_strategies: - type: "ExponentialBackoffStrategy" factor: 5 - jitter_range_in_seconds: "{{ config['backoff_jitter'] }}" + jitter_range_in_seconds: 15 """, ExponentialBackoffStrategy, 15, @@ -1808,7 +1808,7 @@ def test_create_requester_with_backoff_jitter( requester = factory.create_component( model_type=HttpRequesterModel, component_definition=requester_manifest, - config={**input_config, "backoff_jitter": 15}, + config=input_config, name="name", decoder=None, ) @@ -1817,10 +1817,7 @@ def test_create_requester_with_backoff_jitter( assert len(requester.error_handler.backoff_strategies) == 1 backoff_strategy = requester.error_handler.backoff_strategies[0] assert isinstance(backoff_strategy, expected_backoff_strategy_type) - assert ( - backoff_strategy.jitter_range_in_seconds.eval({"backoff_jitter": 15}) - == expected_jitter_range - ) + assert backoff_strategy.jitter_range_in_seconds == expected_jitter_range @pytest.mark.parametrize( @@ -1846,15 +1843,39 @@ def test_create_requester_with_backoff_jitter( ) def test_backoff_jitter_schema_validation(backoff_strategy_model, backoff_strategy_arguments): backoff_strategy_model(**backoff_strategy_arguments, jitter_range_in_seconds=0) - backoff_strategy_model( - **backoff_strategy_arguments, - jitter_range_in_seconds="{{ config['backoff_jitter'] }}", - ) + + with pytest.raises(ValidationError, match="jitter_range_in_seconds"): + backoff_strategy_model( + **backoff_strategy_arguments, + jitter_range_in_seconds="{{ config['backoff_jitter'] }}", + ) with pytest.raises(ValidationError, match="jitter_range_in_seconds"): backoff_strategy_model(**backoff_strategy_arguments, jitter_range_in_seconds=-1) +@pytest.mark.parametrize( + "backoff_strategy_model", + [ + pytest.param( + ConstantBackoffStrategyModel( + type="ConstantBackoffStrategy", backoff_time_in_seconds=60 + ), + id="constant_backoff_strategy", + ), + pytest.param( + ExponentialBackoffStrategyModel(type="ExponentialBackoffStrategy", factor=5), + id="exponential_backoff_strategy", + ), + ], +) +def test_create_backoff_strategy_with_negative_jitter_raises_error(backoff_strategy_model): + backoff_strategy_model.__dict__["jitter_range_in_seconds"] = -1 + + with pytest.raises(ValueError, match="jitter_range_in_seconds"): + factory._create_component_from_model(backoff_strategy_model, config=input_config) + + def test_create_request_with_legacy_session_authenticator(): content = """ requester: diff --git a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py index ab16b9f9e..7fae93fa1 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py @@ -39,13 +39,6 @@ None, CONFIG_BACKOFF_TIME, ), - ( - "test_constant_jitter_from_config", - 1, - BACKOFF_TIME, - "{{ config['jitter'] }}", - BACKOFF_TIME, - ), ], ) def test_constant_backoff( @@ -65,9 +58,9 @@ def test_constant_backoff( @pytest.mark.parametrize( "backofftime, jitter_range, expected_lower_bound, expected_upper_bound", [ - pytest.param(60, 15, 45, 75, id="centered_jitter"), - pytest.param(10, 30, 0, 40, id="lower_bound_clamped_to_zero"), - pytest.param(0, 5, 0, 5, id="zero_base"), + pytest.param(60, 15, 60, 90, id="base_backoff_floor"), + pytest.param(10, 30, 10, 70, id="large_jitter"), + pytest.param(0, 5, 0, 10, id="zero_base"), ], ) def test_constant_backoff_with_jitter_bounds( @@ -87,16 +80,3 @@ def test_constant_backoff_with_jitter_bounds( assert all(expected_lower_bound <= backoff <= expected_upper_bound for backoff in backoff_times) assert len(set(backoff_times)) > 1 - - -def test_constant_backoff_with_negative_jitter_raises_error(): - response_mock = MagicMock() - backoff_strategy = ConstantBackoffStrategy( - parameters={}, - backoff_time_in_seconds=BACKOFF_TIME, - jitter_range_in_seconds=-5, - config={}, - ) - - with pytest.raises(ValueError, match="jitter_range_in_seconds"): - backoff_strategy.backoff_time(response_mock, attempt_count=1) diff --git a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py index 6cabe4e86..67a561a82 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py @@ -22,7 +22,6 @@ ("test_exponential_backoff_zero_jitter", 2, 5, 0, 20), ("test_exponential_backoff_from_parameters", 2, "{{parameters['backoff']}}", None, 20), ("test_exponential_backoff_from_config", 2, "{{config['backoff']}}", None, 20), - ("test_exponential_jitter_from_config", 2, 5, "{{config['jitter']}}", 20), ], ) def test_exponential_backoff(test_name, attempt_count, factor, jitter_range, expected_backoff_time): @@ -47,8 +46,8 @@ def test_exponential_backoff_default(): @pytest.mark.parametrize( "attempt_count, factor, jitter_range, expected_lower_bound, expected_upper_bound", [ - pytest.param(2, 5, 5, 15, 25, id="centered_jitter"), - pytest.param(1, 2, 10, 0, 14, id="lower_bound_clamped_to_zero"), + pytest.param(2, 5, 5, 20, 30, id="base_backoff_floor"), + pytest.param(1, 2, 10, 4, 24, id="large_jitter"), ], ) def test_exponential_backoff_with_jitter_bounds( @@ -69,16 +68,3 @@ def test_exponential_backoff_with_jitter_bounds( assert all(expected_lower_bound <= backoff <= expected_upper_bound for backoff in backoff_times) assert len(set(backoff_times)) > 1 - - -def test_exponential_backoff_with_negative_jitter_raises_error(): - response_mock = MagicMock() - backoff_strategy = ExponentialBackoffStrategy( - factor=5, - jitter_range_in_seconds=-5, - parameters=parameters, - config=config, - ) - - with pytest.raises(ValueError, match="jitter_range_in_seconds"): - backoff_strategy.backoff_time(response_mock, attempt_count=2) From 5cf2c4c35252e1027204c13d1c1ea194f1b2811b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 18:25:34 +0000 Subject: [PATCH 12/13] test(declarative): address jitter review nits --- .../sources/declarative/declarative_component_schema.yaml | 4 ++-- .../declarative/models/declarative_component_schema.py | 4 ++-- .../sources/declarative/test_manifest_declarative_source.py | 2 +- .../declarative/parsers/test_model_to_component_factory.py | 1 + 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 9bec4f132..45ecd98dc 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -493,7 +493,7 @@ definitions: - "{{ config['backoff_time'] }}" jitter_range_in_seconds: title: Jitter Range - description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2). + description: Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2), so jitter only increases the base backoff. type: number minimum: 0 examples: @@ -2043,7 +2043,7 @@ definitions: - "10" jitter_range_in_seconds: title: Jitter Range - description: Optional jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2). + description: Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2), so jitter only increases the computed backoff. type: number minimum: 0 examples: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b652d9b5d..f14c04da4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -106,7 +106,7 @@ class ConstantBackoffStrategy(BaseModel): ) jitter_range_in_seconds: Optional[float] = Field( None, - description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2).", + description="Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between backoff_time_in_seconds and backoff_time_in_seconds + (jitter_range_in_seconds * 2), so jitter only increases the base backoff.", examples=[15], ge=0, title="Jitter Range", @@ -521,7 +521,7 @@ class ExponentialBackoffStrategy(BaseModel): ) jitter_range_in_seconds: Optional[float] = Field( None, - description="Optional jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2).", + description="Optional additive jitter range in seconds. When set, the backoff time is uniformly distributed between computed_backoff and computed_backoff + (jitter_range_in_seconds * 2), so jitter only increases the computed backoff.", examples=[2], ge=0, title="Jitter Range", diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index a79f11419..8bc130a35 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -2066,7 +2066,7 @@ def test_given_unmigrated_config_when_migrating_then_config_is_migrated(migratio ) migration_mocks["message_repository"].emit_message.assert_called_once() - migration_mocks["open"].assert_any_call("/fake/config/path", "w") + migration_mocks["open"].assert_called_once_with("/fake/config/path", "w") migration_mocks["json_dump"].assert_called_once() migration_mocks["print"].assert_called() migration_mocks["serializer_dump"].assert_called() diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index bb4383c23..5b39b069e 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1870,6 +1870,7 @@ def test_backoff_jitter_schema_validation(backoff_strategy_model, backoff_strate ], ) def test_create_backoff_strategy_with_negative_jitter_raises_error(backoff_strategy_model): + # Verify factory validation catches negative jitter even if Pydantic validation is bypassed. backoff_strategy_model.__dict__["jitter_range_in_seconds"] = -1 with pytest.raises(ValueError, match="jitter_range_in_seconds"): From 97ba087f34ce99e9ec65bcd8c4565dccd38f0642 Mon Sep 17 00:00:00 2001 From: "zane.hyatt" Date: Fri, 29 May 2026 20:01:27 +0000 Subject: [PATCH 13/13] fix(declarative): remove backoff helper extraction Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../constant_backoff_strategy.py | 21 +++++++++---------- .../exponential_backoff_strategy.py | 15 ++++++------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py index 1d00b6a88..931183b2e 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py @@ -28,17 +28,16 @@ class ConstantBackoffStrategy(BackoffStrategy): jitter_range_in_seconds: Optional[float] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self.backoff_time_in_seconds = self._as_interpolated_string( - self.backoff_time_in_seconds, parameters - ) - - @staticmethod - def _as_interpolated_string( - value: Union[float, InterpolatedString, str], parameters: Mapping[str, Any] - ) -> InterpolatedString: - if not isinstance(value, InterpolatedString): - value = str(value) - return InterpolatedString.create(value, parameters=parameters) + if not isinstance(self.backoff_time_in_seconds, InterpolatedString): + self.backoff_time_in_seconds = str(self.backoff_time_in_seconds) + if isinstance(self.backoff_time_in_seconds, float): + self.backoff_time_in_seconds = InterpolatedString.create( + str(self.backoff_time_in_seconds), parameters=parameters + ) + else: + self.backoff_time_in_seconds = InterpolatedString.create( + self.backoff_time_in_seconds, parameters=parameters + ) def backoff_time( self, diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py index bbde6501f..dd96ff18b 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py @@ -28,15 +28,12 @@ class ExponentialBackoffStrategy(BackoffStrategy): jitter_range_in_seconds: Optional[float] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._factor = self._as_interpolated_string(self.factor, parameters) - - @staticmethod - def _as_interpolated_string( - value: Union[float, InterpolatedString, str], parameters: Mapping[str, Any] - ) -> InterpolatedString: - if not isinstance(value, InterpolatedString): - value = str(value) - return InterpolatedString.create(value, parameters=parameters) + if not isinstance(self.factor, InterpolatedString): + self.factor = str(self.factor) + if isinstance(self.factor, float): + self._factor = InterpolatedString.create(str(self.factor), parameters=parameters) + else: + self._factor = InterpolatedString.create(self.factor, parameters=parameters) @property def _retry_factor(self) -> float: