From 209ee797ef1a196334ca270df12c2e8098bfd1e5 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Thu, 26 Feb 2026 00:37:52 +0000 Subject: [PATCH 1/7] Keep track of synthetic apify-default-dataset-item events --- src/apify/_actor.py | 13 ++- src/apify/_charging.py | 77 +++++++++++-- .../storage_clients/_apify/_dataset_client.py | 31 +++--- .../_file_system/_dataset_client.py | 49 +++++++++ .../_file_system/_storage_client.py | 18 ++++ .../storage_clients/_ppe_dataset_mixin.py | 31 ++++++ tests/e2e/test_actor_charge.py | 102 ++++++++++++++++++ tests/unit/actor/test_actor_charge.py | 43 ++++++++ tests/unit/actor/test_charging_manager.py | 72 +++++++++++++ 9 files changed, 409 insertions(+), 27 deletions(-) create mode 100644 src/apify/storage_clients/_file_system/_dataset_client.py create mode 100644 src/apify/storage_clients/_ppe_dataset_mixin.py diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 76f4475b..c63b819b 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -628,6 +628,9 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non data = data if isinstance(data, list) else [data] + if charged_event_name and charged_event_name.startswith('apify-'): + raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually') + # No charging, just push the data without locking. if charged_event_name is None: dataset = await self.open_dataset() @@ -637,13 +640,12 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non # If charging is requested, acquire the charge lock to prevent race conditions between concurrent # push_data calls. We need to hold the lock for the entire push_data + charge sequence. async with self._charge_lock: - max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit( - charged_event_name + pushed_items_count = self.get_charging_manager().calculate_push_data_limit( + items_count=len(data), + event_name=charged_event_name, + is_default_dataset=True, ) - # Push as many items as we can charge for. - pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) - dataset = await self.open_dataset() if pushed_items_count < len(data): @@ -651,6 +653,7 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non elif pushed_items_count > 0: await dataset.push_data(data) + # Only charge explicit events; synthetic events will be processed within the client. return await self.get_charging_manager().charge( event_name=charged_event_name, count=pushed_items_count, diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 7e2bfeab..fd4b6163 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from contextvars import ContextVar from dataclasses import dataclass from datetime import datetime, timezone from decimal import Decimal @@ -31,6 +32,14 @@ run_validator = TypeAdapter[ActorRun | None](ActorRun | None) +DEFAULT_DATASET_ITEM_EVENT = 'apify-default-dataset-item' + +# Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to +# access the charging manager without needing to pass it explicitly. +charging_manager_ctx: ContextVar[ChargingManagerImplementation | None] = ContextVar( + 'charging_manager_ctx', default=None +) + @docs_group('Charging') class ChargingManager(Protocol): @@ -81,6 +90,28 @@ def get_charged_event_count(self, event_name: str) -> int: def get_max_total_charge_usd(self) -> Decimal: """Get the configured maximum total charge for this Actor run.""" + def calculate_push_data_limit( + self, + items_count: int, + event_name: str, + *, + is_default_dataset: bool, + ) -> int: + """Calculate how many items can be pushed and charged within the current budget. + + Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event, + so that the combined cost per item does not exceed the remaining budget. + + Args: + items_count: The number of items to be pushed. + event_name: The explicit event name to charge for each item. + is_default_dataset: Whether the data is pushed to the default dataset. + If True, the synthetic event cost is included in the combined price. + + Returns: + Max number of items that can be pushed within the budget. + """ + @docs_group('Charging') @dataclass(frozen=True) @@ -190,6 +221,11 @@ async def __aenter__(self) -> None: self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) + # if the Actor runs with the pay-per-event pricing model, set the context variable so that PPE-aware dataset + # clients can access the charging manager and charge for synthetic events. + if self._pricing_model == 'PAY_PER_EVENT': + charging_manager_ctx.set(self) + async def __aexit__( self, exc_type: type[BaseException] | None, @@ -199,6 +235,7 @@ async def __aexit__( if not self.active: raise RuntimeError('Exiting an uninitialized ChargingManager') + charging_manager_ctx.set(None) self.active = False @ensure_context @@ -258,7 +295,11 @@ def calculate_chargeable() -> dict[str, int | None]: if self._actor_run_id is None: raise RuntimeError('Actor run ID not configured') - if event_name in self._pricing_info: + if event_name.startswith('apify-'): + # Synthetic events (e.g. apify-default-dataset-item) are tracked internally only, + # the platform handles them automatically based on dataset writes. + pass + elif event_name in self._pricing_info: await self._client.run(self._actor_run_id).charge(event_name, charged_count) else: logger.warning(f"Attempting to charge for an unknown event '{event_name}'") @@ -300,14 +341,7 @@ def calculate_total_charged_amount(self) -> Decimal: @ensure_context def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: - pricing_info = self._pricing_info.get(event_name) - - if pricing_info is not None: - price = pricing_info.price - elif not self._is_at_home: - price = Decimal(1) # Use a nonzero price for local development so that the maximum budget can be reached - else: - price = Decimal() + price = self._get_event_price(event_name) if not price: return None @@ -337,6 +371,25 @@ def get_charged_event_count(self, event_name: str) -> int: def get_max_total_charge_usd(self) -> Decimal: return self._max_total_charge_usd + @ensure_context + def calculate_push_data_limit( + self, + items_count: int, + event_name: str, + *, + is_default_dataset: bool, + ) -> int: + explicit_price = self._get_event_price(event_name) + synthetic_price = self._get_event_price(DEFAULT_DATASET_ITEM_EVENT) if is_default_dataset else Decimal(0) + combined_price = explicit_price + synthetic_price + + if not combined_price: + return items_count + + result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / combined_price + max_count = max(0, math.floor(result)) if result.is_finite() else items_count + return min(items_count, max_count) + async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: """Fetch pricing information from environment variables or API.""" # Check if pricing info is available via environment variables @@ -370,6 +423,12 @@ async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'), ) + def _get_event_price(self, event_name: str) -> Decimal: + pricing_info = self._pricing_info.get(event_name) + if pricing_info is not None: + return pricing_info.price + return Decimal(0) if self._is_at_home else Decimal(1) + @dataclass class ChargingStateItem: diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index a918bddd..5aa4c3d8 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -13,6 +13,7 @@ from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata from ._api_client_creation import create_storage_api_client +from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -25,7 +26,7 @@ logger = getLogger(__name__) -class ApifyDatasetClient(DatasetClient): +class ApifyDatasetClient(DatasetClient, _DatasetClientPPEMixin): """An Apify platform implementation of the dataset client.""" _MAX_PAYLOAD_SIZE = ByteSize.from_mb(9) @@ -48,6 +49,8 @@ def __init__( Preferably use the `ApifyDatasetClient.open` class method to create a new instance. """ + super().__init__() + self._api_client = api_client """The Apify dataset client for API operations.""" @@ -108,12 +111,16 @@ async def open( id=id, ) - return cls( + dataset_client = cls( api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 lock=asyncio.Lock(), ) + dataset_client.is_default_dataset = (await dataset_client.get_metadata()).id == configuration.default_dataset_id + + return dataset_client + @override async def purge(self) -> None: raise NotImplementedError( @@ -128,21 +135,19 @@ async def drop(self) -> None: @override async def push_data(self, data: list[Any] | dict[str, Any]) -> None: - async def payloads_generator() -> AsyncIterator[str]: - for index, item in enumerate(data): + async def payloads_generator(items: list[Any]) -> AsyncIterator[str]: + for index, item in enumerate(items): yield await self._check_and_serialize(item, index) async with self._lock: - # Handle lists - if isinstance(data, list): - # Invoke client in series to preserve the order of data - async for items in self._chunk_by_size(payloads_generator()): - await self._api_client.push_items(items=items) + items = data if isinstance(data, list) else [data] + limit = await self._calculate_limit_for_push(len(items)) + items = items[:limit] - # Handle singular items - else: - items = await self._check_and_serialize(data) - await self._api_client.push_items(items=items) + async for chunk in self._chunk_by_size(payloads_generator(items)): + await self._api_client.push_items(items=chunk) + + await self._charge_for_items(count_items=limit) @override async def get_data( diff --git a/src/apify/storage_clients/_file_system/_dataset_client.py b/src/apify/storage_clients/_file_system/_dataset_client.py new file mode 100644 index 00000000..e87534e0 --- /dev/null +++ b/src/apify/storage_clients/_file_system/_dataset_client.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self, override + +from crawlee.storage_clients._file_system import FileSystemDatasetClient + +from apify._configuration import Configuration as ApifyConfiguration +from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin + +if TYPE_CHECKING: + from crawlee.configuration import Configuration + + +class ApifyFileSystemDatasetClient(FileSystemDatasetClient, _DatasetClientPPEMixin): + def __init__(self, *args: Any, **kwargs: Any) -> None: + FileSystemDatasetClient.__init__(self, *args, **kwargs) + _DatasetClientPPEMixin.__init__(self) + + @override + @classmethod + async def open( + cls, + *, + id: str | None, + name: str | None, + alias: str | None, + configuration: Configuration | ApifyConfiguration, + ) -> Self: + + dataset_client = await super().open( + id=id, + name=name, + alias=alias, + configuration=configuration, + ) + + if isinstance(configuration, ApifyConfiguration) and all(v is None for v in (id, name, alias)): + dataset_client.is_default_dataset = True + + return dataset_client + + @override + async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: + items = data if isinstance(data, list) else [data] + limit = await self._calculate_limit_for_push(len(items)) + await super().push_data(items[:limit]) + await self._charge_for_items(limit) diff --git a/src/apify/storage_clients/_file_system/_storage_client.py b/src/apify/storage_clients/_file_system/_storage_client.py index 2b7134c7..f6cb66cc 100644 --- a/src/apify/storage_clients/_file_system/_storage_client.py +++ b/src/apify/storage_clients/_file_system/_storage_client.py @@ -7,6 +7,7 @@ from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient +from ._dataset_client import ApifyFileSystemDatasetClient from ._key_value_store_client import ApifyFileSystemKeyValueStoreClient if TYPE_CHECKING: @@ -48,3 +49,20 @@ async def create_kvs_client( ) await self._purge_if_needed(client, configuration) return client + + @override + async def create_dataset_client( + self, + *, + id: str | None = None, + name: str | None = None, + alias: str | None = None, + configuration: Configuration | None = None, + ) -> ApifyFileSystemDatasetClient: + configuration = configuration or Configuration.get_global_configuration() + return await ApifyFileSystemDatasetClient.open( + id=id, + name=name, + alias=alias, + configuration=configuration, + ) diff --git a/src/apify/storage_clients/_ppe_dataset_mixin.py b/src/apify/storage_clients/_ppe_dataset_mixin.py new file mode 100644 index 00000000..973bfcf8 --- /dev/null +++ b/src/apify/storage_clients/_ppe_dataset_mixin.py @@ -0,0 +1,31 @@ +from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx + + +class _DatasetClientPPEMixin: + """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" + + def __init__(self) -> None: + self._is_default_dataset: bool = False + + @property + def is_default_dataset(self) -> bool: + return self._is_default_dataset + + @is_default_dataset.setter + def is_default_dataset(self, value: bool) -> None: + self._is_default_dataset = value + + async def _calculate_limit_for_push(self, items_count: int) -> int: + if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): + max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit( + event_name=DEFAULT_DATASET_ITEM_EVENT + ) + return min(max_charged_count, items_count) if max_charged_count is not None else items_count + return items_count + + async def _charge_for_items(self, count_items: int) -> None: + if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): + await charging_manager.charge( + event_name=DEFAULT_DATASET_ITEM_EVENT, + count=count_items, + ) diff --git a/tests/e2e/test_actor_charge.py b/tests/e2e/test_actor_charge.py index d72062bc..0e2e98a0 100644 --- a/tests/e2e/test_actor_charge.py +++ b/tests/e2e/test_actor_charge.py @@ -20,6 +20,53 @@ from .conftest import MakeActorFunction, RunActorFunction +@pytest_asyncio.fixture(scope='module', loop_scope='module') +async def ppe_push_data_actor_build(make_actor: MakeActorFunction) -> str: + async def main() -> None: + async with Actor: + await Actor.push_data( + [{'id': i} for i in range(5)], + 'push-item', + ) + + actor_client = await make_actor('ppe-push-data', main_func=main) + + await actor_client.update( + pricing_infos=[ + { + 'pricingModel': 'PAY_PER_EVENT', + 'pricingPerEvent': { + 'actorChargeEvents': { + 'push-item': { + 'eventTitle': 'Push item', + 'eventPriceUsd': 0.05, + 'eventDescription': 'One pushed item', + }, + 'apify-default-dataset-item': { + 'eventTitle': 'Default dataset item', + 'eventPriceUsd': 0.05, + 'eventDescription': 'One item written to the default dataset', + }, + }, + }, + }, + ] + ) + + actor = await actor_client.get() + + assert actor is not None + return str(actor['id']) + + +@pytest_asyncio.fixture(scope='function', loop_scope='module') +async def ppe_push_data_actor( + ppe_push_data_actor_build: str, + apify_client_async: ApifyClientAsync, +) -> ActorClientAsync: + return apify_client_async.actor(ppe_push_data_actor_build) + + @pytest_asyncio.fixture(scope='module', loop_scope='module') async def ppe_actor_build(make_actor: MakeActorFunction) -> str: async def main() -> None: @@ -114,3 +161,58 @@ async def test_actor_charge_limit( except AssertionError: if is_last_attempt: raise + + +async def test_actor_push_data_charges_both_events( + ppe_push_data_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" + run = await run_actor(ppe_push_data_actor) + + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == { + 'push-item': 5, + 'apify-default-dataset-item': 5, + } + break + except AssertionError: + if is_last_attempt: + raise + + +async def test_actor_push_data_combined_budget_limit( + ppe_push_data_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + """Test that push_data respects combined budget: explicit ($0.05) + synthetic ($0.05) = $0.10/item. + + With max_total_charge_usd=$0.20, only 2 of 5 items fit in the budget. + """ + run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20')) + + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == { + 'push-item': 2, + 'apify-default-dataset-item': 2, + } + break + except AssertionError: + if is_last_attempt: + raise diff --git a/tests/unit/actor/test_actor_charge.py b/tests/unit/actor/test_actor_charge.py index c9b14a88..3070aa4f 100644 --- a/tests/unit/actor/test_actor_charge.py +++ b/tests/unit/actor/test_actor_charge.py @@ -143,6 +143,49 @@ async def test_max_event_charge_count_within_limit_tolerates_overdraw() -> None: assert max_count == 0 +async def test_push_data_combined_price_limits_items() -> None: + """Test that push_data limits items when the combined explicit + synthetic event price exceeds the budget.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('3.00'), test_pay_per_event=True) + ) as setup: + setup.charging_mgr._pricing_info['scrape'] = PricingInfoItem(Decimal('1.00'), 'Scrape') + setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem( + Decimal('1.00'), 'Default dataset item' + ) + + data = [{'id': i} for i in range(5)] + result = await Actor.push_data(data, 'scrape') + + assert result is not None + assert result.charged_count == 1 + + dataset = await Actor.open_dataset() + items = await dataset.get_data() + assert len(items.items) == 1 + assert items.items[0] == {'id': 0} + + +async def test_push_data_charges_synthetic_event_for_default_dataset() -> None: + """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('10.00'), test_pay_per_event=True) + ) as setup: + setup.charging_mgr._pricing_info['test'] = PricingInfoItem(Decimal('0.10'), 'Test') + setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem( + Decimal('0.05'), 'Dataset item' + ) + + data = [{'id': i} for i in range(3)] + result = await Actor.push_data(data, 'test') + + assert result is not None + assert result.charged_count == 3 + + # Both explicit and synthetic events should be charged + assert setup.charging_mgr.get_charged_event_count('test') == 3 + assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 3 + + async def test_charge_with_overdrawn_budget() -> None: configuration = Configuration( max_total_charge_usd=Decimal('0.00025'), diff --git a/tests/unit/actor/test_charging_manager.py b/tests/unit/actor/test_charging_manager.py index 4a1ef480..94f31759 100644 --- a/tests/unit/actor/test_charging_manager.py +++ b/tests/unit/actor/test_charging_manager.py @@ -247,6 +247,78 @@ async def test_get_max_total_charge_usd(mock_client: MagicMock) -> None: assert cm.get_max_total_charge_usd() == Decimal('42.50') +async def test_calculate_push_data_limit_no_ppe(mock_client: MagicMock) -> None: + """Returns items_count when no PPE pricing is configured (prices are zero).""" + config = _make_config(actor_pricing_info=None, charged_event_counts={}) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + result = cm.calculate_push_data_limit(10, 'some-event', is_default_dataset=True) + assert result == 10 + + +async def test_calculate_push_data_limit_within_budget(mock_client: MagicMock) -> None: + """Returns full items_count when combined budget is sufficient for all items.""" + pricing_info = _make_ppe_pricing_info({'click': Decimal('0.01'), 'apify-default-dataset-item': Decimal('0.01')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('10.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # combined price = 0.02/item, budget = 10.00, max = 500 + result = cm.calculate_push_data_limit(5, 'click', is_default_dataset=True) + assert result == 5 + + +async def test_calculate_push_data_limit_budget_exceeded(mock_client: MagicMock) -> None: + """Returns capped count when combined price (explicit + synthetic) exceeds budget.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # combined price = 2.00/item, budget = 3.00, max = floor(3/2) = 1 + result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=True) + assert result == 1 + + +async def test_calculate_push_data_limit_without_default_dataset(mock_client: MagicMock) -> None: + """When not pushing to the default dataset, only explicit event price is considered.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # explicit price only = 1.00/item, budget = 3.00, max = floor(3/1) = 3 + result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False) + assert result == 3 + + +async def test_calculate_push_data_limit_exhausted_budget(mock_client: MagicMock) -> None: + """Returns 0 when the budget is fully exhausted before the push.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={'scrape': 3}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False) + assert result == 0 + + async def test_charge_limit_reached(mock_client: MagicMock) -> None: """Test that event_charge_limit_reached is True when budget is exhausted.""" pricing_info = _make_ppe_pricing_info({'search': Decimal('5.00')}) From d49ca4040e23669a32f86c1b055cdc8b6a7ab064 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Fri, 27 Feb 2026 20:19:34 +0000 Subject: [PATCH 2/7] update name --- src/apify/storage_clients/_apify/_dataset_client.py | 4 ++-- src/apify/storage_clients/_file_system/_dataset_client.py | 6 +++--- src/apify/storage_clients/_ppe_dataset_mixin.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index 5aa4c3d8..421d8af9 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -13,7 +13,7 @@ from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata from ._api_client_creation import create_storage_api_client -from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin +from apify.storage_clients._ppe_dataset_mixin import DatasetClientPpeMixin if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -26,7 +26,7 @@ logger = getLogger(__name__) -class ApifyDatasetClient(DatasetClient, _DatasetClientPPEMixin): +class ApifyDatasetClient(DatasetClient, DatasetClientPpeMixin): """An Apify platform implementation of the dataset client.""" _MAX_PAYLOAD_SIZE = ByteSize.from_mb(9) diff --git a/src/apify/storage_clients/_file_system/_dataset_client.py b/src/apify/storage_clients/_file_system/_dataset_client.py index e87534e0..89449af8 100644 --- a/src/apify/storage_clients/_file_system/_dataset_client.py +++ b/src/apify/storage_clients/_file_system/_dataset_client.py @@ -7,16 +7,16 @@ from crawlee.storage_clients._file_system import FileSystemDatasetClient from apify._configuration import Configuration as ApifyConfiguration -from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin +from apify.storage_clients._ppe_dataset_mixin import DatasetClientPpeMixin if TYPE_CHECKING: from crawlee.configuration import Configuration -class ApifyFileSystemDatasetClient(FileSystemDatasetClient, _DatasetClientPPEMixin): +class ApifyFileSystemDatasetClient(FileSystemDatasetClient, DatasetClientPpeMixin): def __init__(self, *args: Any, **kwargs: Any) -> None: FileSystemDatasetClient.__init__(self, *args, **kwargs) - _DatasetClientPPEMixin.__init__(self) + DatasetClientPpeMixin.__init__(self) @override @classmethod diff --git a/src/apify/storage_clients/_ppe_dataset_mixin.py b/src/apify/storage_clients/_ppe_dataset_mixin.py index 973bfcf8..f0b465cc 100644 --- a/src/apify/storage_clients/_ppe_dataset_mixin.py +++ b/src/apify/storage_clients/_ppe_dataset_mixin.py @@ -1,7 +1,7 @@ from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx -class _DatasetClientPPEMixin: +class DatasetClientPpeMixin: """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" def __init__(self) -> None: From b097706969ed26617277d303f8cb7dcc533bd6be Mon Sep 17 00:00:00 2001 From: Max Bohomolov <34358312+Mantisus@users.noreply.github.com> Date: Mon, 2 Mar 2026 22:19:43 +0200 Subject: [PATCH 3/7] Update src/apify/storage_clients/_ppe_dataset_mixin.py Co-authored-by: Vlada Dusek --- src/apify/storage_clients/_ppe_dataset_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_ppe_dataset_mixin.py b/src/apify/storage_clients/_ppe_dataset_mixin.py index f0b465cc..39308159 100644 --- a/src/apify/storage_clients/_ppe_dataset_mixin.py +++ b/src/apify/storage_clients/_ppe_dataset_mixin.py @@ -5,7 +5,7 @@ class DatasetClientPpeMixin: """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" def __init__(self) -> None: - self._is_default_dataset: bool = False + self._is_default_dataset = False @property def is_default_dataset(self) -> bool: From 7347d30f5c84c94131d9421d91aeb6f0259f7eb9 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Mon, 2 Mar 2026 23:25:40 +0000 Subject: [PATCH 4/7] add docstring and fix --- src/apify/_actor.py | 14 ++++---- .../storage_clients/_apify/_dataset_client.py | 9 +++-- .../_file_system/_dataset_client.py | 34 ++++++++++++++----- .../_file_system/_storage_client.py | 4 ++- .../storage_clients/_ppe_dataset_mixin.py | 12 ++----- 5 files changed, 44 insertions(+), 29 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index c63b819b..f90959ef 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -631,15 +631,15 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non if charged_event_name and charged_event_name.startswith('apify-'): raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually') - # No charging, just push the data without locking. - if charged_event_name is None: - dataset = await self.open_dataset() - await dataset.push_data(data) - return None - - # If charging is requested, acquire the charge lock to prevent race conditions between concurrent + # Acquire the charge lock to prevent race conditions between concurrent # push_data calls. We need to hold the lock for the entire push_data + charge sequence. async with self._charge_lock: + # No explicit charging requested; synthetic events are handled within dataset.push_data. + if charged_event_name is None: + dataset = await self.open_dataset() + await dataset.push_data(data) + return None + pushed_items_count = self.get_charging_manager().calculate_push_data_limit( items_count=len(data), event_name=charged_event_name, diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index 421d8af9..16bab34d 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -49,7 +49,8 @@ def __init__( Preferably use the `ApifyDatasetClient.open` class method to create a new instance. """ - super().__init__() + DatasetClient.__init__(self) + DatasetClientPpeMixin.__init__(self) self._api_client = api_client """The Apify dataset client for API operations.""" @@ -117,7 +118,9 @@ async def open( lock=asyncio.Lock(), ) - dataset_client.is_default_dataset = (await dataset_client.get_metadata()).id == configuration.default_dataset_id + dataset_client.is_default_dataset = ( + alias is None and name is None and (id is None or id == configuration.default_dataset_id) + ) return dataset_client @@ -141,7 +144,7 @@ async def payloads_generator(items: list[Any]) -> AsyncIterator[str]: async with self._lock: items = data if isinstance(data, list) else [data] - limit = await self._calculate_limit_for_push(len(items)) + limit = self._calculate_limit_for_push(len(items)) items = items[:limit] async for chunk in self._chunk_by_size(payloads_generator(items)): diff --git a/src/apify/storage_clients/_file_system/_dataset_client.py b/src/apify/storage_clients/_file_system/_dataset_client.py index 89449af8..f8da8aef 100644 --- a/src/apify/storage_clients/_file_system/_dataset_client.py +++ b/src/apify/storage_clients/_file_system/_dataset_client.py @@ -6,7 +6,6 @@ from crawlee.storage_clients._file_system import FileSystemDatasetClient -from apify._configuration import Configuration as ApifyConfiguration from apify.storage_clients._ppe_dataset_mixin import DatasetClientPpeMixin if TYPE_CHECKING: @@ -14,6 +13,13 @@ class ApifyFileSystemDatasetClient(FileSystemDatasetClient, DatasetClientPpeMixin): + """Apify-specific implementation of the `FileSystemDatasetClient`. + + It extends the functionality of `FileSystemDatasetClient` using `DatasetClientPpeMixin` and updates `push_data` to + limit and charge for the synthetic `apify-default-dataset-item` event. This is necessary for consistent behavior + when locally testing the `PAY_PER_EVENT` pricing model. + """ + def __init__(self, *args: Any, **kwargs: Any) -> None: FileSystemDatasetClient.__init__(self, *args, **kwargs) DatasetClientPpeMixin.__init__(self) @@ -26,7 +32,7 @@ async def open( id: str | None, name: str | None, alias: str | None, - configuration: Configuration | ApifyConfiguration, + configuration: Configuration, ) -> Self: dataset_client = await super().open( @@ -36,14 +42,26 @@ async def open( configuration=configuration, ) - if isinstance(configuration, ApifyConfiguration) and all(v is None for v in (id, name, alias)): - dataset_client.is_default_dataset = True + dataset_client.is_default_dataset = all(v is None for v in (id, name, alias)) return dataset_client @override async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: - items = data if isinstance(data, list) else [data] - limit = await self._calculate_limit_for_push(len(items)) - await super().push_data(items[:limit]) - await self._charge_for_items(limit) + async with self._lock: + items = data if isinstance(data, list) else [data] + limit = self._calculate_limit_for_push(len(items)) + + new_item_count = self._metadata.item_count + for item in items: + new_item_count += 1 + await self._push_item(item, new_item_count) + + # now update metadata under the same lock + await self._update_metadata( + update_accessed_at=True, + update_modified_at=True, + new_item_count=new_item_count, + ) + + await self._charge_for_items(limit) diff --git a/src/apify/storage_clients/_file_system/_storage_client.py b/src/apify/storage_clients/_file_system/_storage_client.py index f6cb66cc..8332f0fd 100644 --- a/src/apify/storage_clients/_file_system/_storage_client.py +++ b/src/apify/storage_clients/_file_system/_storage_client.py @@ -60,9 +60,11 @@ async def create_dataset_client( configuration: Configuration | None = None, ) -> ApifyFileSystemDatasetClient: configuration = configuration or Configuration.get_global_configuration() - return await ApifyFileSystemDatasetClient.open( + client = await ApifyFileSystemDatasetClient.open( id=id, name=name, alias=alias, configuration=configuration, ) + await self._purge_if_needed(client, configuration) + return client diff --git a/src/apify/storage_clients/_ppe_dataset_mixin.py b/src/apify/storage_clients/_ppe_dataset_mixin.py index 39308159..627a5384 100644 --- a/src/apify/storage_clients/_ppe_dataset_mixin.py +++ b/src/apify/storage_clients/_ppe_dataset_mixin.py @@ -5,17 +5,9 @@ class DatasetClientPpeMixin: """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" def __init__(self) -> None: - self._is_default_dataset = False + self.is_default_dataset = False - @property - def is_default_dataset(self) -> bool: - return self._is_default_dataset - - @is_default_dataset.setter - def is_default_dataset(self, value: bool) -> None: - self._is_default_dataset = value - - async def _calculate_limit_for_push(self, items_count: int) -> int: + def _calculate_limit_for_push(self, items_count: int) -> int: if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit( event_name=DEFAULT_DATASET_ITEM_EVENT From e4fc4bff4b02fc2d55a2eb1965845cac1a161285 Mon Sep 17 00:00:00 2001 From: Max Bohomolov <34358312+Mantisus@users.noreply.github.com> Date: Tue, 3 Mar 2026 16:18:08 +0200 Subject: [PATCH 5/7] Update src/apify/_charging.py Co-authored-by: Vlada Dusek --- src/apify/_charging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index bd7c25fd..105ab374 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -36,7 +36,7 @@ # Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to # access the charging manager without needing to pass it explicitly. -charging_manager_ctx: ContextVar[ChargingManagerImplementation | None] = ContextVar( +charging_manager_ctx: ContextVar[ChargingManager | None] = ContextVar( 'charging_manager_ctx', default=None ) From 34e4617ebd9bb3ab1d0d7111d6c08cc78f4a1a0a Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 3 Mar 2026 14:39:28 +0000 Subject: [PATCH 6/7] merge master --- codecov.yaml | 5 ++ src/apify/_actor.py | 82 +++++++++--------------- src/apify/_charging.py | 24 ++++--- src/apify/_utils.py | 41 +++++++++++- tests/e2e/conftest.py | 2 +- tests/e2e/test_actor_lifecycle.py | 10 +-- tests/integration/conftest.py | 2 +- tests/unit/actor/test_actor_lifecycle.py | 32 ++++----- tests/unit/conftest.py | 2 +- website/package.json | 2 +- website/yarn.lock | 10 +-- 11 files changed, 116 insertions(+), 96 deletions(-) diff --git a/codecov.yaml b/codecov.yaml index ccd8530b..7e2c34ea 100644 --- a/codecov.yaml +++ b/codecov.yaml @@ -4,3 +4,8 @@ coverage: default: target: auto threshold: 0.10% # tolerate up to 0.10% decrease + informational: true # CI check reports status but never fails + patch: + default: + target: 50% # error only if patch coverage drops below 50% + informational: true # CI check reports status but never fails diff --git a/src/apify/_actor.py b/src/apify/_actor.py index ca12a847..96a4f3c6 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -32,7 +32,7 @@ from apify._crypto import decrypt_input_secrets, load_private_key from apify._models import ActorRun from apify._proxy_configuration import ProxyConfiguration -from apify._utils import docs_group, docs_name, get_system_info, is_running_in_ipython +from apify._utils import docs_group, docs_name, ensure_context, get_system_info, is_running_in_ipython from apify.events import ApifyEventManager, EventManager, LocalEventManager from apify.log import _configure_logging, logger from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient @@ -53,6 +53,8 @@ MainReturnType = TypeVar('MainReturnType') +_ensure_context = ensure_context('_active') + @docs_name('Actor') @docs_group('Actor') @@ -139,8 +141,8 @@ def __init__( # Keep track of all used state stores to persist their values on exit self._use_state_stores: set[str | None] = set() - self._is_initialized = False - """Whether any Actor instance is currently initialized.""" + self._active = False + """Whether the Actor instance is currently active (initialized and within context).""" self._is_rebooting = False """Whether the Actor is currently rebooting.""" @@ -161,7 +163,7 @@ async def __aenter__(self) -> Self: This method must be called exactly once per Actor instance. Re-initializing an Actor or having multiple active Actor instances is not standard usage and may lead to warnings or unexpected behavior. """ - if self._is_initialized: + if self._active: raise RuntimeError('The Actor was already initialized!') # Initialize configuration first - it's required for the next steps. @@ -198,7 +200,7 @@ async def __aenter__(self) -> Self: self.log.debug('Charging manager initialized') # Mark initialization as complete and update global state. - self._is_initialized = True + self._active = True if not Actor.is_at_home(): # Make sure that the input related KVS is initialized to ensure that the input aware client is used @@ -225,7 +227,8 @@ async def __aexit__( if self._is_exiting: return - self._raise_if_not_initialized() + if not self._active: + raise RuntimeError('The _ActorType is not active. Use it within the async context.') if exc_value and not is_running_in_ipython(): # In IPython, we don't run `sys.exit()` during Actor exits, @@ -257,7 +260,7 @@ async def finalize() -> None: except TimeoutError: self.log.exception('Actor cleanup timed out') finally: - self._is_initialized = False + self._active = False if self._exit_process: sys.exit(self.exit_code) @@ -513,6 +516,7 @@ def new_client( timeout_secs=int(timeout.total_seconds()) if timeout else None, ) + @_ensure_context async def open_dataset( self, *, @@ -540,7 +544,6 @@ async def open_dataset( Returns: An instance of the `Dataset` class for the given ID or name. """ - self._raise_if_not_initialized() return await Dataset.open( id=id, name=name, @@ -548,6 +551,7 @@ async def open_dataset( storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud), ) + @_ensure_context async def open_key_value_store( self, *, @@ -574,7 +578,6 @@ async def open_key_value_store( Returns: An instance of the `KeyValueStore` class for the given ID or name. """ - self._raise_if_not_initialized() return await KeyValueStore.open( id=id, name=name, @@ -582,6 +585,7 @@ async def open_key_value_store( storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud), ) + @_ensure_context async def open_request_queue( self, *, @@ -610,7 +614,6 @@ async def open_request_queue( Returns: An instance of the `RequestQueue` class for the given ID or name. """ - self._raise_if_not_initialized() return await RequestQueue.open( id=id, name=name, @@ -622,6 +625,7 @@ async def open_request_queue( async def push_data(self, data: dict | list[dict]) -> None: ... @overload async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ... + @_ensure_context async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None: """Store an object or a list of objects to the default dataset of the current Actor run. @@ -630,8 +634,6 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non charged_event_name: If provided and if the Actor uses the pay-per-event pricing model, the method will attempt to charge for the event for each pushed item. """ - self._raise_if_not_initialized() - if not data: return None @@ -668,10 +670,9 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non count=pushed_items_count, ) + @_ensure_context async def get_input(self) -> Any: """Get the Actor input value from the default key-value store associated with the current Actor run.""" - self._raise_if_not_initialized() - input_value = await self.get_value(self.configuration.input_key) input_secrets_private_key = self.configuration.input_secrets_private_key_file input_secrets_key_passphrase = self.configuration.input_secrets_private_key_passphrase @@ -684,6 +685,7 @@ async def get_input(self) -> Any: return input_value + @_ensure_context async def get_value(self, key: str, default_value: Any = None) -> Any: """Get a value from the default key-value store associated with the current Actor run. @@ -691,11 +693,10 @@ async def get_value(self, key: str, default_value: Any = None) -> Any: key: The key of the record which to retrieve. default_value: Default value returned in case the record does not exist. """ - self._raise_if_not_initialized() - key_value_store = await self.open_key_value_store() return await key_value_store.get_value(key, default_value) + @_ensure_context async def set_value( self, key: str, @@ -710,16 +711,15 @@ async def set_value( value: The value of the record which to set, or None, if the record should be deleted. content_type: The content type which should be set to the value. """ - self._raise_if_not_initialized() - key_value_store = await self.open_key_value_store() return await key_value_store.set_value(key, value, content_type=content_type) + @_ensure_context def get_charging_manager(self) -> ChargingManager: """Retrieve the charging manager to access granular pricing information.""" - self._raise_if_not_initialized() return self._charging_manager_implementation + @_ensure_context async def charge(self, event_name: str, count: int = 1) -> ChargeResult: """Charge for a specified number of events - sub-operations of the Actor. @@ -729,7 +729,6 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult: event_name: Name of the event to be charged for. count: Number of events to charge for. """ - self._raise_if_not_initialized() # Acquire lock to prevent race conditions with concurrent charge/push_data calls. async with self._charge_lock: return await self.get_charging_manager().charge(event_name, count) @@ -757,6 +756,7 @@ def on( @overload def on(self, event_name: Event, listener: EventListener[None]) -> EventListener[Any]: ... + @_ensure_context def on(self, event_name: Event, listener: EventListener[Any]) -> EventListener[Any]: """Add an event listener to the Actor's event manager. @@ -781,8 +781,6 @@ def on(self, event_name: Event, listener: EventListener[Any]) -> EventListener[A event_name: The Actor event to listen for. listener: The function to be called when the event is emitted (can be async). """ - self._raise_if_not_initialized() - self.event_manager.on(event=event_name, listener=listener) return listener @@ -799,6 +797,7 @@ def off(self, event_name: Literal[Event.EXIT], listener: EventListener[EventExit @overload def off(self, event_name: Event, listener: EventListener[None]) -> None: ... + @_ensure_context def off(self, event_name: Event, listener: Callable | None = None) -> None: """Remove a listener, or all listeners, from an Actor event. @@ -807,14 +806,13 @@ def off(self, event_name: Event, listener: Callable | None = None) -> None: listener: The listener which is supposed to be removed. If not passed, all listeners of this event are removed. """ - self._raise_if_not_initialized() - self.event_manager.off(event=event_name, listener=listener) def is_at_home(self) -> bool: """Return `True` when the Actor is running on the Apify platform, and `False` otherwise (e.g. local run).""" return self.configuration.is_at_home + @_ensure_context def get_env(self) -> dict: """Return a dictionary with information parsed from all the `APIFY_XXX` environment variables. @@ -822,8 +820,6 @@ def get_env(self) -> dict: [Actor documentation](https://docs.apify.com/actors/development/environment-variables). If some variables are not defined or are invalid, the corresponding value in the resulting dictionary will be None. """ - self._raise_if_not_initialized() - config = dict[str, Any]() for field_name, field in Configuration.model_fields.items(): if field.deprecated: @@ -844,6 +840,7 @@ def get_env(self) -> dict: env_vars = {env_var.value.lower(): env_var.name.lower() for env_var in [*ActorEnvVars, *ApifyEnvVars]} return {option_name: config[env_var] for env_var, option_name in env_vars.items() if env_var in config} + @_ensure_context async def start( self, actor_id: str, @@ -882,8 +879,6 @@ async def start( Returns: Info about the started Actor run """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self.apify_client if webhooks: @@ -922,6 +917,7 @@ async def start( return ActorRun.model_validate(api_result) + @_ensure_context async def abort( self, run_id: str, @@ -945,8 +941,6 @@ async def abort( Returns: Info about the aborted Actor run. """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self.apify_client if status_message: @@ -956,6 +950,7 @@ async def abort( return ActorRun.model_validate(api_result) + @_ensure_context async def call( self, actor_id: str, @@ -998,8 +993,6 @@ async def call( Returns: Info about the started Actor run. """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self.apify_client if webhooks: @@ -1040,6 +1033,7 @@ async def call( return ActorRun.model_validate(api_result) + @_ensure_context async def call_task( self, task_id: str, @@ -1080,8 +1074,6 @@ async def call_task( Returns: Info about the started Actor run. """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self.apify_client if webhooks: @@ -1111,6 +1103,7 @@ async def call_task( return ActorRun.model_validate(api_result) + @_ensure_context async def metamorph( self, target_actor_id: str, @@ -1135,8 +1128,6 @@ async def metamorph( content_type: The content type of the input. custom_after_sleep: How long to sleep for after the metamorph, to wait for the container to be stopped. """ - self._raise_if_not_initialized() - if not self.is_at_home(): self.log.error('Actor.metamorph() is only supported when running on the Apify platform.') return @@ -1158,6 +1149,7 @@ async def metamorph( if custom_after_sleep: await asyncio.sleep(custom_after_sleep.total_seconds()) + @_ensure_context async def reboot( self, *, @@ -1172,8 +1164,6 @@ async def reboot( event_listeners_timeout: How long should the Actor wait for Actor event listeners to finish before exiting. custom_after_sleep: How long to sleep for after the reboot, to wait for the container to be stopped. """ - self._raise_if_not_initialized() - if not self.is_at_home(): self.log.error('Actor.reboot() is only supported when running on the Apify platform.') return @@ -1213,6 +1203,7 @@ async def reboot( if custom_after_sleep: await asyncio.sleep(custom_after_sleep.total_seconds()) + @_ensure_context async def add_webhook( self, webhook: Webhook, @@ -1240,8 +1231,6 @@ async def add_webhook( Returns: The created webhook. """ - self._raise_if_not_initialized() - if not self.is_at_home(): self.log.error('Actor.add_webhook() is only supported when running on the Apify platform.') return @@ -1260,6 +1249,7 @@ async def add_webhook( idempotency_key=idempotency_key, ) + @_ensure_context async def set_status_message( self, status_message: str, @@ -1275,8 +1265,6 @@ async def set_status_message( Returns: The updated Actor run object. """ - self._raise_if_not_initialized() - if not self.is_at_home(): title = 'Terminal status message' if is_terminal else 'Status message' self.log.info(f'[{title}]: {status_message}') @@ -1292,6 +1280,7 @@ async def set_status_message( return ActorRun.model_validate(api_result) + @_ensure_context async def create_proxy_configuration( self, *, @@ -1324,8 +1313,6 @@ async def create_proxy_configuration( ProxyConfiguration object with the passed configuration, or None, if no proxy should be used based on the configuration. """ - self._raise_if_not_initialized() - if actor_proxy_input is not None: if actor_proxy_input.get('useApifyProxy', False): country_code = country_code or actor_proxy_input.get('apifyProxyCountry') @@ -1349,6 +1336,7 @@ async def create_proxy_configuration( return proxy_configuration + @_ensure_context async def use_state( self, default_value: dict[str, JsonSerializable] | None = None, @@ -1368,8 +1356,6 @@ async def use_state( Returns: The state dictionary with automatic persistence. """ - self._raise_if_not_initialized() - self._use_state_stores.add(kvs_name) kvs = await self.open_key_value_store(name=kvs_name) return await kvs.get_auto_saved_value(key or self._ACTOR_STATE_KEY, default_value) @@ -1379,10 +1365,6 @@ async def _save_actor_state(self) -> None: store = await self.open_key_value_store(name=kvs_name) await store.persist_autosaved_values() - def _raise_if_not_initialized(self) -> None: - if not self._is_initialized: - raise RuntimeError('The Actor was not initialized!') - def _get_default_exit_process(self) -> bool: """Return False for IPython and Scrapy environments, True otherwise.""" if is_running_in_ipython(): diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 105ab374..44baeea0 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -9,8 +9,6 @@ from pydantic import TypeAdapter -from crawlee._utils.context import ensure_context - from apify._models import ( ActorRun, FlatPricePerMonthActorPricingInfo, @@ -19,7 +17,7 @@ PricePerDatasetItemActorPricingInfo, PricingModel, ) -from apify._utils import docs_group +from apify._utils import docs_group, ensure_context from apify.log import logger from apify.storages import Dataset @@ -36,9 +34,9 @@ # Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to # access the charging manager without needing to pass it explicitly. -charging_manager_ctx: ContextVar[ChargingManager | None] = ContextVar( - 'charging_manager_ctx', default=None -) +charging_manager_ctx: ContextVar[ChargingManager | None] = ContextVar('charging_manager_ctx', default=None) + +_ensure_context = ensure_context('active') @docs_group('Charging') @@ -238,7 +236,7 @@ async def __aexit__( charging_manager_ctx.set(None) self.active = False - @ensure_context + @_ensure_context async def charge(self, event_name: str, count: int = 1) -> ChargeResult: def calculate_chargeable() -> dict[str, int | None]: """Calculate the maximum number of events of each type that can be charged within the current budget.""" @@ -332,14 +330,14 @@ def calculate_chargeable() -> dict[str, int | None]: chargeable_within_limit=calculate_chargeable(), ) - @ensure_context + @_ensure_context def calculate_total_charged_amount(self) -> Decimal: return sum( (item.total_charged_amount for item in self._charging_state.values()), start=Decimal(), ) - @ensure_context + @_ensure_context def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: price = self._get_event_price(event_name) @@ -349,7 +347,7 @@ def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / price return max(0, math.floor(result)) if result.is_finite() else None - @ensure_context + @_ensure_context def get_pricing_info(self) -> ActorPricingInfo: return ActorPricingInfo( pricing_model=self._pricing_model, @@ -362,16 +360,16 @@ def get_pricing_info(self) -> ActorPricingInfo: }, ) - @ensure_context + @_ensure_context def get_charged_event_count(self, event_name: str) -> int: item = self._charging_state.get(event_name) return item.charge_count if item is not None else 0 - @ensure_context + @_ensure_context def get_max_total_charge_usd(self) -> Decimal: return self._max_total_charge_usd - @ensure_context + @_ensure_context def calculate_push_data_limit( self, items_count: int, diff --git a/src/apify/_utils.py b/src/apify/_utils.py index 4198bf5d..33cb3d54 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -1,13 +1,48 @@ from __future__ import annotations import builtins +import inspect import sys +from collections.abc import Callable from enum import Enum +from functools import wraps from importlib import metadata -from typing import TYPE_CHECKING, Any, Literal +from typing import Any, Literal, TypeVar, cast -if TYPE_CHECKING: - from collections.abc import Callable +T = TypeVar('T', bound=Callable[..., Any]) + + +def ensure_context(attribute_name: str) -> Callable[[T], T]: + """Create a decorator that ensures the context manager is initialized before executing the method. + + The decorator checks if the calling instance has the specified attribute and verifies that it is set to `True`. + If the instance is inactive, it raises a `RuntimeError`. Works for both synchronous and asynchronous methods. + + Args: + attribute_name: The name of the boolean attribute to check on the instance. + + Returns: + A decorator that wraps methods with context checking. + """ + + def decorator(method: T) -> T: + @wraps(method) + def sync_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: + if not getattr(self, attribute_name, False): + raise RuntimeError(f'The {self.__class__.__name__} is not active. Use it within the context.') + + return method(self, *args, **kwargs) + + @wraps(method) + async def async_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: + if not getattr(self, attribute_name, False): + raise RuntimeError(f'The {self.__class__.__name__} is not active. Use it within the async context.') + + return await method(self, *args, **kwargs) + + return cast('T', async_wrapper if inspect.iscoroutinefunction(method) else sync_wrapper) + + return decorator def get_system_info() -> dict: diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 4ae56fc6..bdcc9883 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -69,7 +69,7 @@ def _prepare_test_env() -> None: if hasattr(apify._actor.Actor, '__wrapped__'): delattr(apify._actor.Actor, '__wrapped__') - apify._actor.Actor._is_initialized = False + apify._actor.Actor._active = False # Set the environment variable for the local storage directory to the temporary path. monkeypatch.setenv(ApifyEnvVars.LOCAL_STORAGE_DIR, str(tmp_path)) diff --git a/tests/e2e/test_actor_lifecycle.py b/tests/e2e/test_actor_lifecycle.py index 47fa06e0..983b8ca3 100644 --- a/tests/e2e/test_actor_lifecycle.py +++ b/tests/e2e/test_actor_lifecycle.py @@ -118,24 +118,24 @@ async def test_actor_sequential_contexts(make_actor: MakeActorFunction, run_acto async def main() -> None: async with Actor as actor: actor._exit_process = False - assert actor._is_initialized is True + assert actor._active is True # Actor after Actor. async with Actor as actor: actor._exit_process = False - assert actor._is_initialized is True + assert actor._active is True # Actor() after Actor. async with Actor(exit_process=False) as actor: - assert actor._is_initialized is True + assert actor._active is True # Actor() after Actor(). async with Actor(exit_process=False) as actor: - assert actor._is_initialized is True + assert actor._active is True # Actor after Actor(). async with Actor as actor: - assert actor._is_initialized is True + assert actor._active is True actor = await make_actor(label='actor-sequential-contexts', main_func=main) run_result = await run_actor(actor) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f0fd1e0e..30aa077d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -60,7 +60,7 @@ def _prepare_test_env() -> None: if hasattr(apify._actor.Actor, '__wrapped__'): delattr(apify._actor.Actor, '__wrapped__') - apify._actor.Actor._is_initialized = False + apify._actor.Actor._active = False # Set the environment variable for the local storage directory to the temporary path. monkeypatch.setenv(ApifyEnvVars.LOCAL_STORAGE_DIR, str(tmp_path)) diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index 09338514..03fdd00e 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -68,41 +68,41 @@ async def test_actor_init_instance_manual() -> None: """Test that Actor instance can be properly initialized and cleaned up manually.""" actor = Actor() await actor.init() - assert actor._is_initialized is True + assert actor._active is True await actor.exit() - assert actor._is_initialized is False + assert actor._active is False async def test_actor_init_instance_async_with() -> None: """Test that Actor instance can be properly initialized and cleaned up using async context manager.""" actor = Actor() async with actor: - assert actor._is_initialized is True + assert actor._active is True - assert actor._is_initialized is False + assert actor._active is False async def test_actor_init_class_manual() -> None: """Test that Actor class can be properly initialized and cleaned up manually.""" await Actor.init() - assert Actor._is_initialized is True + assert Actor._active is True await Actor.exit() - assert not Actor._is_initialized + assert not Actor._active async def test_actor_init_class_async_with() -> None: """Test that Actor class can be properly initialized and cleaned up using async context manager.""" async with Actor: - assert Actor._is_initialized is True + assert Actor._active is True - assert not Actor._is_initialized + assert not Actor._active async def test_fail_properly_deinitializes_actor(actor: _ActorType) -> None: """Test that fail() method properly deinitializes the Actor.""" - assert actor._is_initialized + assert actor._active await actor.fail() - assert actor._is_initialized is False + assert actor._active is False async def test_actor_handles_exceptions_and_cleans_up_properly() -> None: @@ -111,16 +111,16 @@ async def test_actor_handles_exceptions_and_cleans_up_properly() -> None: with contextlib.suppress(Exception): async with Actor() as actor: - assert actor._is_initialized + assert actor._active raise Exception('Failed') # noqa: TRY002 assert actor is not None - assert actor._is_initialized is False + assert actor._active is False async def test_double_init_raises_runtime_error(actor: _ActorType) -> None: """Test that attempting to initialize an already initialized Actor raises RuntimeError.""" - assert actor._is_initialized + assert actor._active with pytest.raises(RuntimeError): await actor.init() @@ -196,7 +196,7 @@ def on_event(event_type: Event) -> Callable: actor = Actor() async with actor: - assert actor._is_initialized + assert actor._active actor.on(Event.PERSIST_STATE, on_event(Event.PERSIST_STATE)) actor.on(Event.SYSTEM_INFO, on_event(Event.SYSTEM_INFO)) await asyncio.sleep(1) @@ -249,12 +249,12 @@ async def test_actor_sequential_contexts(*, first_with_call: bool, second_with_c mock = AsyncMock() async with Actor(exit_process=False) if first_with_call else Actor as actor: await mock() - assert actor._is_initialized is True + assert actor._active is True # After exiting the context, new Actor instance can be created without conflicts. async with Actor() if second_with_call else Actor as actor: await mock() - assert actor._is_initialized is True + assert actor._active is True # The mock should have been called twice, once in each context. assert mock.call_count == 2 diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 3f792ad8..8d8297c5 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -62,7 +62,7 @@ def prepare_test_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Callabl def _prepare_test_env() -> None: if hasattr(apify._actor.Actor, '__wrapped__'): delattr(apify._actor.Actor, '__wrapped__') - apify._actor.Actor._is_initialized = False + apify._actor.Actor._active = False # Set the environment variable for the local storage directory to the temporary path. monkeypatch.setenv(ApifyEnvVars.LOCAL_STORAGE_DIR, str(tmp_path)) diff --git a/website/package.json b/website/package.json index b561d22c..00a4982f 100644 --- a/website/package.json +++ b/website/package.json @@ -18,7 +18,7 @@ "lint:code:fix": "eslint . --fix" }, "dependencies": { - "@apify/docs-theme": "^1.0.237", + "@apify/docs-theme": "^1.0.239", "@apify/docusaurus-plugin-typedoc-api": "^5.0.0", "@docusaurus/core": "^3.8.1", "@docusaurus/faster": "^3.8.1", diff --git a/website/yarn.lock b/website/yarn.lock index 68705fd8..5baffcd5 100644 --- a/website/yarn.lock +++ b/website/yarn.lock @@ -433,9 +433,9 @@ __metadata: languageName: node linkType: hard -"@apify/docs-theme@npm:^1.0.237": - version: 1.0.237 - resolution: "@apify/docs-theme@npm:1.0.237" +"@apify/docs-theme@npm:^1.0.239": + version: 1.0.239 + resolution: "@apify/docs-theme@npm:1.0.239" dependencies: "@apify/docs-search-modal": "npm:^1.3.3" "@apify/ui-icons": "npm:^1.19.0" @@ -458,7 +458,7 @@ __metadata: clsx: "*" react: "*" react-dom: "*" - checksum: 10c0/7eab2f752ab71a5d36b9ce88188350cfcfdefa61430cb9881ad1eeda934fbf0461dc23e764278951686e57a8146c23c5bf474c05475433f8ca1196aec1fbb537 + checksum: 10c0/cbb328cc515b44ed68dfb06b98b56308c4e6da9bcd4463ae4fce173e48c771f07c560691a29279d1c588cd34b1644bdfd81ae7871179b7397807a88dffabb764 languageName: node linkType: hard @@ -6584,7 +6584,7 @@ __metadata: version: 0.0.0-use.local resolution: "apify-sdk-python@workspace:." dependencies: - "@apify/docs-theme": "npm:^1.0.237" + "@apify/docs-theme": "npm:^1.0.239" "@apify/docusaurus-plugin-typedoc-api": "npm:^5.0.0" "@apify/eslint-config-ts": "npm:^0.4.0" "@apify/tsconfig": "npm:^0.1.0" From 51e28b5227c7070047ec486aaa646a7b95360c77 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 3 Mar 2026 15:40:16 +0000 Subject: [PATCH 7/7] update cherge_lock --- src/apify/_actor.py | 19 +++---- src/apify/_charging.py | 12 +++- .../storage_clients/_apify/_dataset_client.py | 4 +- .../_file_system/_dataset_client.py | 6 +- .../storage_clients/_ppe_dataset_mixin.py | 25 ++++++++- tests/unit/actor/test_actor_charge.py | 56 +++++++++++++++++++ tests/unit/actor/test_charging_manager.py | 20 +++---- 7 files changed, 111 insertions(+), 31 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 96a4f3c6..b8af17ed 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -380,14 +380,6 @@ def event_manager(self) -> EventManager: def _charging_manager_implementation(self) -> ChargingManagerImplementation: return ChargingManagerImplementation(self.configuration, self.apify_client) - @cached_property - def _charge_lock(self) -> asyncio.Lock: - """Lock to synchronize charge operations. - - Prevents race conditions between Actor.charge and Actor.push_data calls. - """ - return asyncio.Lock() - @cached_property def _storage_client(self) -> SmartApifyStorageClient: """Storage client used by the Actor. @@ -642,16 +634,18 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non if charged_event_name and charged_event_name.startswith('apify-'): raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually') + charging_manager = self.get_charging_manager() + # Acquire the charge lock to prevent race conditions between concurrent # push_data calls. We need to hold the lock for the entire push_data + charge sequence. - async with self._charge_lock: + async with charging_manager.charge_lock: # No explicit charging requested; synthetic events are handled within dataset.push_data. if charged_event_name is None: dataset = await self.open_dataset() await dataset.push_data(data) return None - pushed_items_count = self.get_charging_manager().calculate_push_data_limit( + pushed_items_count = self.get_charging_manager().compute_push_data_limit( items_count=len(data), event_name=charged_event_name, is_default_dataset=True, @@ -730,8 +724,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult: count: Number of events to charge for. """ # Acquire lock to prevent race conditions with concurrent charge/push_data calls. - async with self._charge_lock: - return await self.get_charging_manager().charge(event_name, count) + charging_manager = self.get_charging_manager() + async with charging_manager.charge_lock: + return await charging_manager.charge(event_name, count) @overload def on( diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 44baeea0..eac883d0 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import math from contextvars import ContextVar from dataclasses import dataclass @@ -52,6 +53,9 @@ class ChargingManager(Protocol): - Apify platform documentation: https://docs.apify.com/platform/actors/publishing/monetize """ + charge_lock: asyncio.Lock + """Lock to synchronize charge operations. Prevents race conditions between `charge` and `push_data` calls.""" + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: """Charge for a specified number of events - sub-operations of the Actor. @@ -88,14 +92,14 @@ def get_charged_event_count(self, event_name: str) -> int: def get_max_total_charge_usd(self) -> Decimal: """Get the configured maximum total charge for this Actor run.""" - def calculate_push_data_limit( + def compute_push_data_limit( self, items_count: int, event_name: str, *, is_default_dataset: bool, ) -> int: - """Calculate how many items can be pushed and charged within the current budget. + """Compute how many items can be pushed and charged within the current budget. Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event, so that the combined cost per item does not exceed the remaining budget. @@ -166,6 +170,8 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._not_ppe_warning_printed = False self.active = False + self.charge_lock = asyncio.Lock() + async def __aenter__(self) -> None: """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" # Validate config @@ -370,7 +376,7 @@ def get_max_total_charge_usd(self) -> Decimal: return self._max_total_charge_usd @_ensure_context - def calculate_push_data_limit( + def compute_push_data_limit( self, items_count: int, event_name: str, diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index 16bab34d..c26676cf 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -142,9 +142,9 @@ async def payloads_generator(items: list[Any]) -> AsyncIterator[str]: for index, item in enumerate(items): yield await self._check_and_serialize(item, index) - async with self._lock: + async with self._lock, self._charge_lock(): items = data if isinstance(data, list) else [data] - limit = self._calculate_limit_for_push(len(items)) + limit = self._compute_limit_for_push(len(items)) items = items[:limit] async for chunk in self._chunk_by_size(payloads_generator(items)): diff --git a/src/apify/storage_clients/_file_system/_dataset_client.py b/src/apify/storage_clients/_file_system/_dataset_client.py index f8da8aef..6949e309 100644 --- a/src/apify/storage_clients/_file_system/_dataset_client.py +++ b/src/apify/storage_clients/_file_system/_dataset_client.py @@ -48,12 +48,12 @@ async def open( @override async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: - async with self._lock: + async with self._lock, self._charge_lock(): items = data if isinstance(data, list) else [data] - limit = self._calculate_limit_for_push(len(items)) + limit = self._compute_limit_for_push(len(items)) new_item_count = self._metadata.item_count - for item in items: + for item in items[:limit]: new_item_count += 1 await self._push_item(item, new_item_count) diff --git a/src/apify/storage_clients/_ppe_dataset_mixin.py b/src/apify/storage_clients/_ppe_dataset_mixin.py index 627a5384..8de6f45d 100644 --- a/src/apify/storage_clients/_ppe_dataset_mixin.py +++ b/src/apify/storage_clients/_ppe_dataset_mixin.py @@ -1,5 +1,13 @@ +from __future__ import annotations + +from contextlib import asynccontextmanager +from typing import TYPE_CHECKING + from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx +if TYPE_CHECKING: + from collections.abc import AsyncIterator + class DatasetClientPpeMixin: """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" @@ -7,7 +15,7 @@ class DatasetClientPpeMixin: def __init__(self) -> None: self.is_default_dataset = False - def _calculate_limit_for_push(self, items_count: int) -> int: + def _compute_limit_for_push(self, items_count: int) -> int: if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit( event_name=DEFAULT_DATASET_ITEM_EVENT @@ -21,3 +29,18 @@ async def _charge_for_items(self, count_items: int) -> None: event_name=DEFAULT_DATASET_ITEM_EVENT, count=count_items, ) + + @asynccontextmanager + async def _charge_lock(self) -> AsyncIterator[None]: + """Context manager to acquire the charge lock if PPE charging manager is active.""" + charging_manager = charging_manager_ctx.get() + if charging_manager: + if charging_manager.charge_lock.locked(): + # If the charge lock is already locked, it means we're called from within Actor.push_data which + # already holds the lock. asyncio.Lock is not reentrant, so re-acquiring would deadlock. + yield + else: + async with charging_manager.charge_lock: + yield + else: + yield diff --git a/tests/unit/actor/test_actor_charge.py b/tests/unit/actor/test_actor_charge.py index 3070aa4f..d696010a 100644 --- a/tests/unit/actor/test_actor_charge.py +++ b/tests/unit/actor/test_actor_charge.py @@ -1,3 +1,4 @@ +import asyncio from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from decimal import Decimal @@ -186,6 +187,61 @@ async def test_push_data_charges_synthetic_event_for_default_dataset() -> None: assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 3 +async def test_charge_lock_concurrent_actor_and_dataset_push() -> None: + """Test that charge_lock properly synchronizes concurrent Actor.push_data and dataset.push_data calls.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('10.00'), test_pay_per_event=True) + ) as setup: + setup.charging_mgr._pricing_info['event'] = PricingInfoItem(Decimal('0.10'), 'Event') + setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem( + Decimal('0.10'), 'Dataset item' + ) + + dataset = await Actor.open_dataset() + + # Run concurrent pushes - Actor.push_data and direct dataset.push_data + await asyncio.gather( + Actor.push_data([{'source': 'actor', 'id': i} for i in range(5)], 'event'), + dataset.push_data([{'source': 'dataset', 'id': i} for i in range(5)]), + ) + + # Verify all items were pushed + items = await dataset.get_data() + assert len(items.items) == 10 + + # Verify charging was tracked correctly: + # - Actor.push_data charged 'event' (5) + 'apify-default-dataset-item' (5) + # - dataset.push_data charged 'apify-default-dataset-item' (5) + assert setup.charging_mgr.get_charged_event_count('event') == 5 + assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 10 + + +async def test_charge_lock_concurrent_with_limited_budget() -> None: + """Test that charge_lock correctly limits items when concurrent pushes compete for limited budget.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('0.50'), test_pay_per_event=True) + ) as setup: + # Each default dataset item costs $0.10, so max 5 items total + setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem( + Decimal('0.10'), 'Dataset item' + ) + + dataset = await Actor.open_dataset() + + # Both try to push 5 items, but budget only allows 5 total + await asyncio.gather( + dataset.push_data([{'source': 'a', 'id': i} for i in range(5)]), + dataset.push_data([{'source': 'b', 'id': i} for i in range(5)]), + ) + + # Verify total items pushed does not exceed budget limit + items = await dataset.get_data() + assert len(items.items) == 5 # Budget allows max 5 items at $0.10 each + + # Verify total charged events matches items pushed + assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 5 + + async def test_charge_with_overdrawn_budget() -> None: configuration = Configuration( max_total_charge_usd=Decimal('0.00025'), diff --git a/tests/unit/actor/test_charging_manager.py b/tests/unit/actor/test_charging_manager.py index 94f31759..3b8f9399 100644 --- a/tests/unit/actor/test_charging_manager.py +++ b/tests/unit/actor/test_charging_manager.py @@ -247,16 +247,16 @@ async def test_get_max_total_charge_usd(mock_client: MagicMock) -> None: assert cm.get_max_total_charge_usd() == Decimal('42.50') -async def test_calculate_push_data_limit_no_ppe(mock_client: MagicMock) -> None: +async def test_compute_push_data_limit_no_ppe(mock_client: MagicMock) -> None: """Returns items_count when no PPE pricing is configured (prices are zero).""" config = _make_config(actor_pricing_info=None, charged_event_counts={}) cm = ChargingManagerImplementation(config, mock_client) async with cm: - result = cm.calculate_push_data_limit(10, 'some-event', is_default_dataset=True) + result = cm.compute_push_data_limit(10, 'some-event', is_default_dataset=True) assert result == 10 -async def test_calculate_push_data_limit_within_budget(mock_client: MagicMock) -> None: +async def test_compute_push_data_limit_within_budget(mock_client: MagicMock) -> None: """Returns full items_count when combined budget is sufficient for all items.""" pricing_info = _make_ppe_pricing_info({'click': Decimal('0.01'), 'apify-default-dataset-item': Decimal('0.01')}) config = _make_config( @@ -268,11 +268,11 @@ async def test_calculate_push_data_limit_within_budget(mock_client: MagicMock) - cm = ChargingManagerImplementation(config, mock_client) async with cm: # combined price = 0.02/item, budget = 10.00, max = 500 - result = cm.calculate_push_data_limit(5, 'click', is_default_dataset=True) + result = cm.compute_push_data_limit(5, 'click', is_default_dataset=True) assert result == 5 -async def test_calculate_push_data_limit_budget_exceeded(mock_client: MagicMock) -> None: +async def test_compute_push_data_limit_budget_exceeded(mock_client: MagicMock) -> None: """Returns capped count when combined price (explicit + synthetic) exceeds budget.""" pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) config = _make_config( @@ -284,11 +284,11 @@ async def test_calculate_push_data_limit_budget_exceeded(mock_client: MagicMock) cm = ChargingManagerImplementation(config, mock_client) async with cm: # combined price = 2.00/item, budget = 3.00, max = floor(3/2) = 1 - result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=True) + result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=True) assert result == 1 -async def test_calculate_push_data_limit_without_default_dataset(mock_client: MagicMock) -> None: +async def test_compute_push_data_limit_without_default_dataset(mock_client: MagicMock) -> None: """When not pushing to the default dataset, only explicit event price is considered.""" pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) config = _make_config( @@ -300,11 +300,11 @@ async def test_calculate_push_data_limit_without_default_dataset(mock_client: Ma cm = ChargingManagerImplementation(config, mock_client) async with cm: # explicit price only = 1.00/item, budget = 3.00, max = floor(3/1) = 3 - result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False) + result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=False) assert result == 3 -async def test_calculate_push_data_limit_exhausted_budget(mock_client: MagicMock) -> None: +async def test_compute_push_data_limit_exhausted_budget(mock_client: MagicMock) -> None: """Returns 0 when the budget is fully exhausted before the push.""" pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00')}) config = _make_config( @@ -315,7 +315,7 @@ async def test_calculate_push_data_limit_exhausted_budget(mock_client: MagicMock ) cm = ChargingManagerImplementation(config, mock_client) async with cm: - result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False) + result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=False) assert result == 0