-
Notifications
You must be signed in to change notification settings - Fork 22
fix: Keep track of synthetic apify-default-dataset-item events #814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
209ee79
d49ca40
b097706
7347d30
55f13e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -637,21 +637,23 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non | |
|
|
||
| data = data if isinstance(data, list) else [data] | ||
|
|
||
| # No charging, just push the data without locking. | ||
| if charged_event_name is None: | ||
| dataset = await self.open_dataset() | ||
| await dataset.push_data(data) | ||
| return None | ||
| if charged_event_name and charged_event_name.startswith('apify-'): | ||
| raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually') | ||
|
|
||
| # If charging is requested, acquire the charge lock to prevent race conditions between concurrent | ||
| # Acquire the charge lock to prevent race conditions between concurrent | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both flows (with and without Possible solution? Move the lock to the charging manager? Or expose the Actor's lock to the dataset client? Not sure whether this is really a problem in practise. @janbuchar? |
||
| # push_data calls. We need to hold the lock for the entire push_data + charge sequence. | ||
| async with self._charge_lock: | ||
| max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit( | ||
| charged_event_name | ||
| ) | ||
| # No explicit charging requested; synthetic events are handled within dataset.push_data. | ||
| if charged_event_name is None: | ||
| dataset = await self.open_dataset() | ||
| await dataset.push_data(data) | ||
| return None | ||
|
|
||
| # Push as many items as we can charge for. | ||
| pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) | ||
| pushed_items_count = self.get_charging_manager().calculate_push_data_limit( | ||
| items_count=len(data), | ||
| event_name=charged_event_name, | ||
| is_default_dataset=True, | ||
| ) | ||
|
|
||
| dataset = await self.open_dataset() | ||
|
|
||
|
|
@@ -660,6 +662,7 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non | |
| elif pushed_items_count > 0: | ||
| await dataset.push_data(data) | ||
|
|
||
| # Only charge explicit events; synthetic events will be processed within the client. | ||
| return await self.get_charging_manager().charge( | ||
| event_name=charged_event_name, | ||
| count=pushed_items_count, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,6 +1,7 @@ | ||||||
| from __future__ import annotations | ||||||
|
|
||||||
| import math | ||||||
| from contextvars import ContextVar | ||||||
| from dataclasses import dataclass | ||||||
| from datetime import datetime, timezone | ||||||
| from decimal import Decimal | ||||||
|
|
@@ -31,6 +32,14 @@ | |||||
|
|
||||||
| run_validator = TypeAdapter[ActorRun | None](ActorRun | None) | ||||||
|
|
||||||
| DEFAULT_DATASET_ITEM_EVENT = 'apify-default-dataset-item' | ||||||
|
|
||||||
| # Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to | ||||||
| # access the charging manager without needing to pass it explicitly. | ||||||
| charging_manager_ctx: ContextVar[ChargingManagerImplementation | None] = ContextVar( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we use the protocol as a type hint rather than its implementation?
Suggested change
|
||||||
| 'charging_manager_ctx', default=None | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| @docs_group('Charging') | ||||||
| class ChargingManager(Protocol): | ||||||
|
|
@@ -81,6 +90,28 @@ def get_charged_event_count(self, event_name: str) -> int: | |||||
| def get_max_total_charge_usd(self) -> Decimal: | ||||||
| """Get the configured maximum total charge for this Actor run.""" | ||||||
|
|
||||||
| def calculate_push_data_limit( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use "compute" instead of "calculate" (I know calculate was used before as well, but let's fix it) |
||||||
| self, | ||||||
| items_count: int, | ||||||
| event_name: str, | ||||||
| *, | ||||||
| is_default_dataset: bool, | ||||||
| ) -> int: | ||||||
| """Calculate how many items can be pushed and charged within the current budget. | ||||||
|
|
||||||
| Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event, | ||||||
| so that the combined cost per item does not exceed the remaining budget. | ||||||
|
|
||||||
| Args: | ||||||
| items_count: The number of items to be pushed. | ||||||
| event_name: The explicit event name to charge for each item. | ||||||
| is_default_dataset: Whether the data is pushed to the default dataset. | ||||||
| If True, the synthetic event cost is included in the combined price. | ||||||
|
|
||||||
| Returns: | ||||||
| Max number of items that can be pushed within the budget. | ||||||
| """ | ||||||
|
|
||||||
|
|
||||||
| @docs_group('Charging') | ||||||
| @dataclass(frozen=True) | ||||||
|
|
@@ -190,6 +221,11 @@ async def __aenter__(self) -> None: | |||||
|
|
||||||
| self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) | ||||||
|
|
||||||
| # if the Actor runs with the pay-per-event pricing model, set the context variable so that PPE-aware dataset | ||||||
| # clients can access the charging manager and charge for synthetic events. | ||||||
| if self._pricing_model == 'PAY_PER_EVENT': | ||||||
| charging_manager_ctx.set(self) | ||||||
|
|
||||||
| async def __aexit__( | ||||||
| self, | ||||||
| exc_type: type[BaseException] | None, | ||||||
|
|
@@ -199,6 +235,7 @@ async def __aexit__( | |||||
| if not self.active: | ||||||
| raise RuntimeError('Exiting an uninitialized ChargingManager') | ||||||
|
|
||||||
| charging_manager_ctx.set(None) | ||||||
| self.active = False | ||||||
|
|
||||||
| @ensure_context | ||||||
|
|
@@ -258,7 +295,11 @@ def calculate_chargeable() -> dict[str, int | None]: | |||||
| if self._actor_run_id is None: | ||||||
| raise RuntimeError('Actor run ID not configured') | ||||||
|
|
||||||
| if event_name in self._pricing_info: | ||||||
| if event_name.startswith('apify-'): | ||||||
| # Synthetic events (e.g. apify-default-dataset-item) are tracked internally only, | ||||||
| # the platform handles them automatically based on dataset writes. | ||||||
| pass | ||||||
| elif event_name in self._pricing_info: | ||||||
| await self._client.run(self._actor_run_id).charge(event_name, charged_count) | ||||||
| else: | ||||||
| logger.warning(f"Attempting to charge for an unknown event '{event_name}'") | ||||||
|
|
@@ -300,14 +341,7 @@ def calculate_total_charged_amount(self) -> Decimal: | |||||
|
|
||||||
| @ensure_context | ||||||
| def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: | ||||||
| pricing_info = self._pricing_info.get(event_name) | ||||||
|
|
||||||
| if pricing_info is not None: | ||||||
| price = pricing_info.price | ||||||
| elif not self._is_at_home: | ||||||
| price = Decimal(1) # Use a nonzero price for local development so that the maximum budget can be reached | ||||||
| else: | ||||||
| price = Decimal() | ||||||
| price = self._get_event_price(event_name) | ||||||
|
|
||||||
| if not price: | ||||||
| return None | ||||||
|
|
@@ -337,6 +371,25 @@ def get_charged_event_count(self, event_name: str) -> int: | |||||
| def get_max_total_charge_usd(self) -> Decimal: | ||||||
| return self._max_total_charge_usd | ||||||
|
|
||||||
| @ensure_context | ||||||
| def calculate_push_data_limit( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use "compute" instead of "calculate" (I know calculate was used before as well, but let's fix it) |
||||||
| self, | ||||||
| items_count: int, | ||||||
| event_name: str, | ||||||
| *, | ||||||
| is_default_dataset: bool, | ||||||
| ) -> int: | ||||||
| explicit_price = self._get_event_price(event_name) | ||||||
| synthetic_price = self._get_event_price(DEFAULT_DATASET_ITEM_EVENT) if is_default_dataset else Decimal(0) | ||||||
| combined_price = explicit_price + synthetic_price | ||||||
|
|
||||||
| if not combined_price: | ||||||
| return items_count | ||||||
|
|
||||||
| result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / combined_price | ||||||
| max_count = max(0, math.floor(result)) if result.is_finite() else items_count | ||||||
| return min(items_count, max_count) | ||||||
|
|
||||||
| async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: | ||||||
| """Fetch pricing information from environment variables or API.""" | ||||||
| # Check if pricing info is available via environment variables | ||||||
|
|
@@ -370,6 +423,12 @@ async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: | |||||
| max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'), | ||||||
| ) | ||||||
|
|
||||||
| def _get_event_price(self, event_name: str) -> Decimal: | ||||||
| pricing_info = self._pricing_info.get(event_name) | ||||||
| if pricing_info is not None: | ||||||
| return pricing_info.price | ||||||
| return Decimal(0) if self._is_at_home else Decimal(1) | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class ChargingStateItem: | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| from typing_extensions import Self, override | ||
|
|
||
| from crawlee.storage_clients._file_system import FileSystemDatasetClient | ||
|
|
||
| from apify.storage_clients._ppe_dataset_mixin import DatasetClientPpeMixin | ||
|
|
||
| if TYPE_CHECKING: | ||
| from crawlee.configuration import Configuration | ||
|
|
||
|
|
||
| class ApifyFileSystemDatasetClient(FileSystemDatasetClient, DatasetClientPpeMixin): | ||
Mantisus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Apify-specific implementation of the `FileSystemDatasetClient`. | ||
|
|
||
| It extends the functionality of `FileSystemDatasetClient` using `DatasetClientPpeMixin` and updates `push_data` to | ||
| limit and charge for the synthetic `apify-default-dataset-item` event. This is necessary for consistent behavior | ||
| when locally testing the `PAY_PER_EVENT` pricing model. | ||
| """ | ||
|
|
||
| def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
| FileSystemDatasetClient.__init__(self, *args, **kwargs) | ||
| DatasetClientPpeMixin.__init__(self) | ||
|
|
||
| @override | ||
| @classmethod | ||
| async def open( | ||
| cls, | ||
| *, | ||
| id: str | None, | ||
| name: str | None, | ||
| alias: str | None, | ||
| configuration: Configuration, | ||
| ) -> Self: | ||
|
|
||
| dataset_client = await super().open( | ||
| id=id, | ||
| name=name, | ||
| alias=alias, | ||
| configuration=configuration, | ||
| ) | ||
|
|
||
| dataset_client.is_default_dataset = all(v is None for v in (id, name, alias)) | ||
|
|
||
| return dataset_client | ||
|
|
||
| @override | ||
| async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: | ||
Mantisus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| async with self._lock: | ||
| items = data if isinstance(data, list) else [data] | ||
| limit = self._calculate_limit_for_push(len(items)) | ||
|
|
||
| new_item_count = self._metadata.item_count | ||
| for item in items: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we iterate only over |
||
| new_item_count += 1 | ||
| await self._push_item(item, new_item_count) | ||
|
|
||
| # now update metadata under the same lock | ||
| await self._update_metadata( | ||
| update_accessed_at=True, | ||
| update_modified_at=True, | ||
| new_item_count=new_item_count, | ||
| ) | ||
|
|
||
| await self._charge_for_items(limit) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx | ||
|
|
||
|
|
||
| class DatasetClientPpeMixin: | ||
| """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self.is_default_dataset = False | ||
|
|
||
| def _calculate_limit_for_push(self, items_count: int) -> int: | ||
| if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): | ||
| max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit( | ||
| event_name=DEFAULT_DATASET_ITEM_EVENT | ||
| ) | ||
| return min(max_charged_count, items_count) if max_charged_count is not None else items_count | ||
| return items_count | ||
|
|
||
| async def _charge_for_items(self, count_items: int) -> None: | ||
| if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): | ||
| await charging_manager.charge( | ||
| event_name=DEFAULT_DATASET_ITEM_EVENT, | ||
| count=count_items, | ||
| ) |
Uh oh!
There was an error while loading. Please reload this page.