From f1f84c9406732c4083c8fb5f46aa3dfa44ecdb35 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 11:05:28 +0000 Subject: [PATCH 1/8] feat: add cost-based rate limiting support to HttpRequestRegexMatcher Add optional 'cost' field to HttpRequestRegexMatcher that allows specifying per-request cost/weight in rate limiting policies. This enables cost-based rate limiting where different endpoints consume different amounts from a shared budget (e.g., Amplitude's Dashboard REST API). Changes: - YAML schema: add 'cost' field (int or string) to HttpRequestRegexMatcher - Python model: add 'cost' field to HttpRequestRegexMatcher model - call_rate.py: store cost on matcher, add get_cost() to BaseCallRatePolicy, update APIBudget._do_acquire() to use cost as weight instead of hardcoded 1 - model_to_component_factory.py: wire up cost field with interpolation support - Tests: 8 new tests for cost-based rate limiting behavior Backward compatible: cost defaults to None (treated as 1). Co-Authored-By: Daryna Ishchenko --- .../declarative_component_schema.yaml | 10 ++ .../models/declarative_component_schema.py | 5 + .../parsers/model_to_component_factory.py | 7 ++ airbyte_cdk/sources/streams/call_rate.py | 29 +++++- unit_tests/sources/streams/test_call_rate.py | 95 +++++++++++++++++++ 5 files changed, 144 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 8c87508cd..bd2616a07 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1832,6 +1832,16 @@ definitions: description: The headers to match. type: object additionalProperties: true + cost: + title: Cost + description: > + The cost of a request matching this matcher in the API's rate limit cost model. + When set, this value is passed as the weight when acquiring a call from the rate limiter, + enabling cost-based rate limiting where different endpoints consume different amounts + from a shared budget. If not set, each request counts as 1. + anyOf: + - type: integer + - type: string additionalProperties: true DefaultErrorHandler: title: Default Error Handler diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 93e6865d8..18e7b4bbc 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -486,6 +486,11 @@ class Config: headers: Optional[Dict[str, Any]] = Field( None, description="The headers to match.", title="Headers" ) + cost: Optional[Union[int, str]] = Field( + None, + description="The cost of a request matching this matcher in the API's rate limit cost model. When set, this value is passed as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1.", + title="Cost", + ) class DpathExtractor(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 560dd4056..d50eb91de 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -4387,12 +4387,19 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: def create_http_request_matcher( self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any ) -> HttpRequestRegexMatcher: + cost = model.cost + if cost is not None: + if isinstance(cost, str): + cost = int(InterpolatedString.create(cost, parameters={}).eval(config)) + else: + cost = int(cost) return HttpRequestRegexMatcher( method=model.method, url_base=model.url_base, url_path_pattern=model.url_path_pattern, params=model.params, headers=model.headers, + cost=cost, ) def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None: diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 14f823e45..0a1384be4 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -166,6 +166,7 @@ def __init__( url_path_pattern: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None, + cost: Optional[int] = None, ): """ :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. @@ -173,7 +174,11 @@ def __init__( :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. :param params: Dictionary of query parameters that must be present in the request. :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). + :param cost: The cost (weight) of a request matching this matcher. If set, this value is used + as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting. + If not set, each request counts as 1. """ + self._cost = cost self._method = method.upper() if method else None # Normalize the url_base if provided: remove trailing slash. @@ -242,11 +247,16 @@ def __call__(self, request: Any) -> bool: return True + @property + def cost(self) -> Optional[int]: + """The cost (weight) of a request matching this matcher, or None if not set.""" + return self._cost + def __str__(self) -> str: regex = self._url_path_pattern.pattern if self._url_path_pattern else None return ( f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, " - f"url_path_pattern={regex}, params={self._params}, headers={self._headers})" + f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, cost={self._cost})" ) @@ -265,6 +275,20 @@ def matches(self, request: Any) -> bool: return True return any(matcher(request) for matcher in self._matchers) + def get_cost(self, request: Any) -> int: + """Get the cost (weight) for a request based on the first matching matcher. + + If a matcher has a cost configured, that cost is used as the weight. + Otherwise, defaults to 1. + + :param request: a request object + :return: the cost/weight for this request + """ + for matcher in self._matchers: + if matcher(request) and isinstance(matcher, HttpRequestRegexMatcher) and matcher.cost is not None: + return matcher.cost + return 1 + class UnlimitedCallRatePolicy(BaseCallRatePolicy): """ @@ -596,7 +620,8 @@ def _do_acquire( # sometimes we spend all budget before a second attempt, so we have a few more attempts for attempt in range(1, self._maximum_attempts_to_acquire): try: - policy.try_acquire(request, weight=1) + weight = policy.get_cost(request) if isinstance(policy, BaseCallRatePolicy) else 1 + policy.try_acquire(request, weight=weight) return except CallRateLimitHit as exc: last_exception = exc diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index b99905870..a81100dd7 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -140,10 +140,13 @@ def test_http_request_matching(mocker): users_policy.matches.side_effect = HttpRequestMatcher( url="http://domain/api/users", method="GET" ) + users_policy.get_cost.return_value = 1 groups_policy.matches.side_effect = HttpRequestMatcher( url="http://domain/api/groups", method="POST" ) + groups_policy.get_cost.return_value = 1 root_policy.matches.side_effect = HttpRequestMatcher(method="GET") + root_policy.get_cost.return_value = 1 api_budget = APIBudget( policies=[ users_policy, @@ -360,6 +363,98 @@ def test_with_cache(self, mocker, requests_mock): assert MovingWindowCallRatePolicy.try_acquire.call_count == 1 +class TestCostBasedRateLimiting: + """Tests for cost-based rate limiting where different endpoints consume different amounts from a shared budget.""" + + def test_matcher_cost_default_none(self): + """HttpRequestRegexMatcher cost defaults to None when not specified.""" + matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test") + assert matcher.cost is None + + def test_matcher_cost_is_stored(self): + """HttpRequestRegexMatcher stores the cost value when provided.""" + matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test", cost=60) + assert matcher.cost == 60 + + def test_policy_get_cost_returns_matcher_cost(self): + """BaseCallRatePolicy.get_cost returns cost from the matching matcher.""" + policy = MovingWindowCallRatePolicy( + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=120)], + rates=[Rate(1000, timedelta(hours=1))], + ) + req = Request("GET", "https://example.com/api/expensive") + assert policy.get_cost(req) == 120 + + def test_policy_get_cost_defaults_to_1(self): + """BaseCallRatePolicy.get_cost returns 1 when no matcher has a cost set.""" + policy = MovingWindowCallRatePolicy( + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/default")], + rates=[Rate(1000, timedelta(hours=1))], + ) + req = Request("GET", "https://example.com/api/default") + assert policy.get_cost(req) == 1 + + def test_policy_get_cost_no_matching_matcher(self): + """BaseCallRatePolicy.get_cost returns 1 when no matcher matches the request.""" + policy = MovingWindowCallRatePolicy( + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/other", cost=50)], + rates=[Rate(1000, timedelta(hours=1))], + ) + req = Request("GET", "https://example.com/api/unmatched") + assert policy.get_cost(req) == 1 + + def test_api_budget_uses_cost_as_weight(self): + """APIBudget._do_acquire passes the matcher's cost as weight to try_acquire.""" + policy = MovingWindowCallRatePolicy( + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", cost=10)], + rates=[Rate(100, timedelta(hours=1))], + ) + budget = APIBudget(policies=[policy]) + + # Make requests — each costs 10 from the budget of 100 + for i in range(10): + budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False) + + # The 11th request should exceed the budget (10 * 10 = 100, one more = 110 > 100) + with pytest.raises(CallRateLimitHit): + budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False) + + def test_cost_1_backward_compatible(self): + """When cost is not set, behavior is identical to the old hardcoded weight=1.""" + policy = MovingWindowCallRatePolicy( + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/normal")], + rates=[Rate(5, timedelta(hours=1))], + ) + budget = APIBudget(policies=[policy]) + + for i in range(5): + budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False) + + with pytest.raises(CallRateLimitHit): + budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False) + + def test_shared_budget_different_costs(self): + """Multiple matchers with different costs sharing one policy correctly consume the shared budget.""" + # Shared policy matches both endpoints via regex + policy = MovingWindowCallRatePolicy( + matchers=[ + HttpRequestRegexMatcher(url_path_pattern=r"/api/cheap", cost=1), + HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=10), + ], + rates=[Rate(20, timedelta(hours=1))], + ) + budget = APIBudget(policies=[policy]) + + # Make 1 expensive request (costs 10) and 10 cheap requests (cost 1 each) = total 20 + budget.acquire_call(Request("GET", "https://example.com/api/expensive"), block=False) + for i in range(10): + budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False) + + # Budget is now at 20/20 — any further request should fail + with pytest.raises(CallRateLimitHit): + budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False) + + class TestHttpRequestRegexMatcher: """ Tests for the new regex-based logic: From 1b8450d8f0e53844a48a3eb7a3544726c673875b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 11:07:33 +0000 Subject: [PATCH 2/8] style: fix ruff formatting in call_rate.py Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/call_rate.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 0a1384be4..893c9cabf 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -285,7 +285,11 @@ def get_cost(self, request: Any) -> int: :return: the cost/weight for this request """ for matcher in self._matchers: - if matcher(request) and isinstance(matcher, HttpRequestRegexMatcher) and matcher.cost is not None: + if ( + matcher(request) + and isinstance(matcher, HttpRequestRegexMatcher) + and matcher.cost is not None + ): return matcher.cost return 1 From 07c11b0ace114ef3f4de57375fe514aa7b57a68d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 11:17:13 +0000 Subject: [PATCH 3/8] refactor: rename 'cost' to 'weight' for consistent naming with try_acquire(weight=...) Co-Authored-By: Daryna Ishchenko --- .../declarative_component_schema.yaml | 11 ++- .../models/declarative_component_schema.py | 6 +- .../parsers/model_to_component_factory.py | 12 ++-- airbyte_cdk/sources/streams/call_rate.py | 31 +++++---- unit_tests/sources/streams/test_call_rate.py | 68 +++++++++---------- 5 files changed, 64 insertions(+), 64 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index bd2616a07..833a00427 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1832,13 +1832,12 @@ definitions: description: The headers to match. type: object additionalProperties: true - cost: - title: Cost + weight: + title: Weight description: > - The cost of a request matching this matcher in the API's rate limit cost model. - When set, this value is passed as the weight when acquiring a call from the rate limiter, - enabling cost-based rate limiting where different endpoints consume different amounts - from a shared budget. If not set, each request counts as 1. + The weight of a request matching this matcher when acquiring a call from the rate limiter. + Different endpoints can consume different amounts from a shared budget by specifying + different weights. If not set, each request counts as 1. anyOf: - type: integer - type: string diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 18e7b4bbc..df904473d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -486,10 +486,10 @@ class Config: headers: Optional[Dict[str, Any]] = Field( None, description="The headers to match.", title="Headers" ) - cost: Optional[Union[int, str]] = Field( + weight: Optional[Union[int, str]] = Field( None, - description="The cost of a request matching this matcher in the API's rate limit cost model. When set, this value is passed as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1.", - title="Cost", + description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.", + title="Weight", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d50eb91de..7c6ebdbfe 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -4387,19 +4387,19 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: def create_http_request_matcher( self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any ) -> HttpRequestRegexMatcher: - cost = model.cost - if cost is not None: - if isinstance(cost, str): - cost = int(InterpolatedString.create(cost, parameters={}).eval(config)) + weight = model.weight + if weight is not None: + if isinstance(weight, str): + weight = int(InterpolatedString.create(weight, parameters={}).eval(config)) else: - cost = int(cost) + weight = int(weight) return HttpRequestRegexMatcher( method=model.method, url_base=model.url_base, url_path_pattern=model.url_path_pattern, params=model.params, headers=model.headers, - cost=cost, + weight=weight, ) def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None: diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 893c9cabf..9da3531b2 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -166,7 +166,7 @@ def __init__( url_path_pattern: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None, - cost: Optional[int] = None, + weight: Optional[int] = None, ): """ :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. @@ -174,11 +174,12 @@ def __init__( :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. :param params: Dictionary of query parameters that must be present in the request. :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). - :param cost: The cost (weight) of a request matching this matcher. If set, this value is used - as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting. + :param weight: The weight of a request matching this matcher. If set, this value is used + when acquiring a call from the rate limiter, enabling cost-based rate limiting + where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1. """ - self._cost = cost + self._weight = weight self._method = method.upper() if method else None # Normalize the url_base if provided: remove trailing slash. @@ -248,15 +249,15 @@ def __call__(self, request: Any) -> bool: return True @property - def cost(self) -> Optional[int]: - """The cost (weight) of a request matching this matcher, or None if not set.""" - return self._cost + def weight(self) -> Optional[int]: + """The weight of a request matching this matcher, or None if not set.""" + return self._weight def __str__(self) -> str: regex = self._url_path_pattern.pattern if self._url_path_pattern else None return ( f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, " - f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, cost={self._cost})" + f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, weight={self._weight})" ) @@ -275,22 +276,22 @@ def matches(self, request: Any) -> bool: return True return any(matcher(request) for matcher in self._matchers) - def get_cost(self, request: Any) -> int: - """Get the cost (weight) for a request based on the first matching matcher. + def get_weight(self, request: Any) -> int: + """Get the weight for a request based on the first matching matcher. - If a matcher has a cost configured, that cost is used as the weight. + If a matcher has a weight configured, that weight is used. Otherwise, defaults to 1. :param request: a request object - :return: the cost/weight for this request + :return: the weight for this request """ for matcher in self._matchers: if ( matcher(request) and isinstance(matcher, HttpRequestRegexMatcher) - and matcher.cost is not None + and matcher.weight is not None ): - return matcher.cost + return matcher.weight return 1 @@ -624,7 +625,7 @@ def _do_acquire( # sometimes we spend all budget before a second attempt, so we have a few more attempts for attempt in range(1, self._maximum_attempts_to_acquire): try: - weight = policy.get_cost(request) if isinstance(policy, BaseCallRatePolicy) else 1 + weight = policy.get_weight(request) if isinstance(policy, BaseCallRatePolicy) else 1 policy.try_acquire(request, weight=weight) return except CallRateLimitHit as exc: diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index a81100dd7..ab68b8e3e 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -140,13 +140,13 @@ def test_http_request_matching(mocker): users_policy.matches.side_effect = HttpRequestMatcher( url="http://domain/api/users", method="GET" ) - users_policy.get_cost.return_value = 1 + users_policy.get_weight.return_value = 1 groups_policy.matches.side_effect = HttpRequestMatcher( url="http://domain/api/groups", method="POST" ) - groups_policy.get_cost.return_value = 1 + groups_policy.get_weight.return_value = 1 root_policy.matches.side_effect = HttpRequestMatcher(method="GET") - root_policy.get_cost.return_value = 1 + root_policy.get_weight.return_value = 1 api_budget = APIBudget( policies=[ users_policy, @@ -363,55 +363,55 @@ def test_with_cache(self, mocker, requests_mock): assert MovingWindowCallRatePolicy.try_acquire.call_count == 1 -class TestCostBasedRateLimiting: - """Tests for cost-based rate limiting where different endpoints consume different amounts from a shared budget.""" +class TestWeightBasedRateLimiting: + """Tests for weight-based rate limiting where different endpoints consume different amounts from a shared budget.""" - def test_matcher_cost_default_none(self): - """HttpRequestRegexMatcher cost defaults to None when not specified.""" + def test_matcher_weight_default_none(self): + """HttpRequestRegexMatcher weight defaults to None when not specified.""" matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test") - assert matcher.cost is None + assert matcher.weight is None - def test_matcher_cost_is_stored(self): - """HttpRequestRegexMatcher stores the cost value when provided.""" - matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test", cost=60) - assert matcher.cost == 60 + def test_matcher_weight_is_stored(self): + """HttpRequestRegexMatcher stores the weight value when provided.""" + matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test", weight=60) + assert matcher.weight == 60 - def test_policy_get_cost_returns_matcher_cost(self): - """BaseCallRatePolicy.get_cost returns cost from the matching matcher.""" + def test_policy_get_weight_returns_matcher_weight(self): + """BaseCallRatePolicy.get_weight returns weight from the matching matcher.""" policy = MovingWindowCallRatePolicy( - matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=120)], + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", weight=120)], rates=[Rate(1000, timedelta(hours=1))], ) req = Request("GET", "https://example.com/api/expensive") - assert policy.get_cost(req) == 120 + assert policy.get_weight(req) == 120 - def test_policy_get_cost_defaults_to_1(self): - """BaseCallRatePolicy.get_cost returns 1 when no matcher has a cost set.""" + def test_policy_get_weight_defaults_to_1(self): + """BaseCallRatePolicy.get_weight returns 1 when no matcher has a weight set.""" policy = MovingWindowCallRatePolicy( matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/default")], rates=[Rate(1000, timedelta(hours=1))], ) req = Request("GET", "https://example.com/api/default") - assert policy.get_cost(req) == 1 + assert policy.get_weight(req) == 1 - def test_policy_get_cost_no_matching_matcher(self): - """BaseCallRatePolicy.get_cost returns 1 when no matcher matches the request.""" + def test_policy_get_weight_no_matching_matcher(self): + """BaseCallRatePolicy.get_weight returns 1 when no matcher matches the request.""" policy = MovingWindowCallRatePolicy( - matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/other", cost=50)], + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/other", weight=50)], rates=[Rate(1000, timedelta(hours=1))], ) req = Request("GET", "https://example.com/api/unmatched") - assert policy.get_cost(req) == 1 + assert policy.get_weight(req) == 1 - def test_api_budget_uses_cost_as_weight(self): - """APIBudget._do_acquire passes the matcher's cost as weight to try_acquire.""" + def test_api_budget_uses_weight(self): + """APIBudget._do_acquire passes the matcher's weight to try_acquire.""" policy = MovingWindowCallRatePolicy( - matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", cost=10)], + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", weight=10)], rates=[Rate(100, timedelta(hours=1))], ) budget = APIBudget(policies=[policy]) - # Make requests — each costs 10 from the budget of 100 + # Make requests — each weighs 10 from the budget of 100 for i in range(10): budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False) @@ -419,8 +419,8 @@ def test_api_budget_uses_cost_as_weight(self): with pytest.raises(CallRateLimitHit): budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False) - def test_cost_1_backward_compatible(self): - """When cost is not set, behavior is identical to the old hardcoded weight=1.""" + def test_weight_1_backward_compatible(self): + """When weight is not set, behavior is identical to the old hardcoded weight=1.""" policy = MovingWindowCallRatePolicy( matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/normal")], rates=[Rate(5, timedelta(hours=1))], @@ -433,19 +433,19 @@ def test_cost_1_backward_compatible(self): with pytest.raises(CallRateLimitHit): budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False) - def test_shared_budget_different_costs(self): - """Multiple matchers with different costs sharing one policy correctly consume the shared budget.""" + def test_shared_budget_different_weights(self): + """Multiple matchers with different weights sharing one policy correctly consume the shared budget.""" # Shared policy matches both endpoints via regex policy = MovingWindowCallRatePolicy( matchers=[ - HttpRequestRegexMatcher(url_path_pattern=r"/api/cheap", cost=1), - HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=10), + HttpRequestRegexMatcher(url_path_pattern=r"/api/cheap", weight=1), + HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", weight=10), ], rates=[Rate(20, timedelta(hours=1))], ) budget = APIBudget(policies=[policy]) - # Make 1 expensive request (costs 10) and 10 cheap requests (cost 1 each) = total 20 + # Make 1 expensive request (weight 10) and 10 cheap requests (weight 1 each) = total 20 budget.acquire_call(Request("GET", "https://example.com/api/expensive"), block=False) for i in range(10): budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False) From 9bdf9db3e0c58945dc76d82b0ddeda05c0d74e0b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 12:27:44 +0000 Subject: [PATCH 4/8] fix: short-circuit get_weight() at first matching matcher Address CodeRabbit review: get_weight() now stops at the first matcher that matches the request, consistent with matches() behavior. If the first matching matcher has a weight, use it; otherwise return 1. Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/call_rate.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 9da3531b2..932879b64 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -286,12 +286,10 @@ def get_weight(self, request: Any) -> int: :return: the weight for this request """ for matcher in self._matchers: - if ( - matcher(request) - and isinstance(matcher, HttpRequestRegexMatcher) - and matcher.weight is not None - ): - return matcher.weight + if matcher(request): + if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None: + return matcher.weight + return 1 return 1 From fea3ca19453c1dac3ce590a32c7b07fb197038e7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 12:33:20 +0000 Subject: [PATCH 5/8] feat: add weight validation (must be >= 1) to HttpRequestRegexMatcher Reject non-positive weight values in the constructor to prevent weight=0 (free requests) or negative weights (budget restoration) from bypassing rate limits. Adds two tests for validation. Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/call_rate.py | 2 ++ unit_tests/sources/streams/test_call_rate.py | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 932879b64..fa109240c 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -179,6 +179,8 @@ def __init__( where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1. """ + if weight is not None and weight < 1: + raise ValueError(f"weight must be >= 1, got {weight}") self._weight = weight self._method = method.upper() if method else None diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index ab68b8e3e..ef1d3d0a6 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -376,6 +376,16 @@ def test_matcher_weight_is_stored(self): matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test", weight=60) assert matcher.weight == 60 + def test_matcher_rejects_zero_weight(self): + """HttpRequestRegexMatcher raises ValueError for weight=0.""" + with pytest.raises(ValueError, match="weight must be >= 1"): + HttpRequestRegexMatcher(url_path_pattern=r"/api/test", weight=0) + + def test_matcher_rejects_negative_weight(self): + """HttpRequestRegexMatcher raises ValueError for negative weight.""" + with pytest.raises(ValueError, match="weight must be >= 1"): + HttpRequestRegexMatcher(url_path_pattern=r"/api/test", weight=-5) + def test_policy_get_weight_returns_matcher_weight(self): """BaseCallRatePolicy.get_weight returns weight from the matching matcher.""" policy = MovingWindowCallRatePolicy( From 4926b4091fd3f7796c4d6e0b96f04e966f645575 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 12:41:52 +0000 Subject: [PATCH 6/8] feat: add weight validation in create_http_request_matcher factory Validates weight >= 1 after interpolation/casting in the factory, catching invalid weights from manifest config before they reach the HttpRequestRegexMatcher constructor. Co-Authored-By: Daryna Ishchenko --- .../sources/declarative/parsers/model_to_component_factory.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7c6ebdbfe..239e5bd51 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -4393,6 +4393,8 @@ def create_http_request_matcher( weight = int(InterpolatedString.create(weight, parameters={}).eval(config)) else: weight = int(weight) + if weight < 1: + raise ValueError(f"weight must be >= 1, got {weight}") return HttpRequestRegexMatcher( method=model.method, url_base=model.url_base, From d8686b1a87d9e598c5622d30ec2612475cb76d37 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 15:29:02 +0000 Subject: [PATCH 7/8] feat: add weight guard to MovingWindowCallRatePolicy.try_acquire() Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/call_rate.py | 5 +++++ unit_tests/sources/streams/test_call_rate.py | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index fa109240c..77a2004e0 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -447,6 +447,11 @@ def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): super().__init__(matchers=matchers) def try_acquire(self, request: Any, weight: int) -> None: + lowest_limit = min(rate.limit for rate in self._bucket.rates) + if weight > lowest_limit: + raise ValueError( + f"Weight can not exceed the lowest configured rate limit ({lowest_limit})" + ) if not self.matches(request): raise ValueError("Request does not match the policy") diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index ef1d3d0a6..a423fe573 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -464,6 +464,18 @@ def test_shared_budget_different_weights(self): with pytest.raises(CallRateLimitHit): budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False) + def test_moving_window_rejects_weight_exceeding_limit(self): + """MovingWindowCallRatePolicy raises ValueError when weight exceeds the lowest configured rate limit.""" + policy = MovingWindowCallRatePolicy( + matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", weight=50)], + rates=[Rate(10, timedelta(hours=1)), Rate(100, timedelta(days=1))], + ) + req = Request("GET", "https://example.com/api/heavy") + with pytest.raises( + ValueError, match="Weight can not exceed the lowest configured rate limit" + ): + policy.try_acquire(req, weight=50) + class TestHttpRequestRegexMatcher: """ From 4fd4174afc5ea6e52094b24272e329dcbb3ace24 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 16:13:17 +0000 Subject: [PATCH 8/8] fix: move matches() check before weight guard in MovingWindowCallRatePolicy Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/call_rate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 77a2004e0..4a06db3b2 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -447,13 +447,13 @@ def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): super().__init__(matchers=matchers) def try_acquire(self, request: Any, weight: int) -> None: + if not self.matches(request): + raise ValueError("Request does not match the policy") lowest_limit = min(rate.limit for rate in self._bucket.rates) if weight > lowest_limit: raise ValueError( f"Weight can not exceed the lowest configured rate limit ({lowest_limit})" ) - if not self.matches(request): - raise ValueError("Request does not match the policy") try: self._limiter.try_acquire(request, weight=weight)