diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index de673780..0ac74109 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -198,3 +198,6 @@ async def reclaim_request( @override async def is_empty(self) -> bool: return await self._implementation.is_empty() + + async def is_finished(self) -> bool: + return await self._implementation.is_finished() diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index c196b098..09233539 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -8,6 +8,7 @@ from cachetools import LRUCache +from crawlee.storage_clients._base import RequestQueueClient from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata from ._models import ApifyRequestQueueMetadata, CachedRequest, RequestQueueHead @@ -23,6 +24,9 @@ logger = getLogger(__name__) +_CRAWLEE_SUPPORTS_IS_FINISHED = hasattr(RequestQueueClient, 'is_finished') + + class ApifyRequestQueueSharedClient: """Internal request queue client implementation for multi-consumer scenarios on the Apify platform. @@ -289,8 +293,30 @@ async def reclaim_request( async def is_empty(self) -> bool: """Specific implementation of this method for the RQ shared access mode.""" + if not _CRAWLEE_SUPPORTS_IS_FINISHED: + return await self._old_is_empty() + # Check _list_head. # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. + async with self._fetch_lock: + return await self._is_empty() + + async def is_finished(self) -> bool: + """Specific implementation of this method for the RQ shared access mode.""" + if not _CRAWLEE_SUPPORTS_IS_FINISHED: + return await self._old_is_empty() + + async with self._fetch_lock: + # Order of operations is important here, because affects on `_queue_has_locked_requests`. + return await self._is_empty() and not self._queue_has_locked_requests + + async def _is_empty(self) -> bool: + """Check whether anything is available to fetch. Lock-free core of `is_empty`, caller must hold the lock.""" + head = await self._list_head(limit=1) + return len(head.items) == 0 + + async def _old_is_empty(self) -> bool: + """Temporary workaround for compatibility with Crawlee versions earlier than 1.8.0.""" async with self._fetch_lock: head = await self._list_head(limit=1) return len(head.items) == 0 and not self._queue_has_locked_requests diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 4029300d..de4de538 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -7,6 +7,7 @@ from cachetools import LRUCache +from crawlee.storage_clients._base import RequestQueueClient from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata from ._utils import to_crawlee_request, unique_key_to_request_id @@ -21,6 +22,9 @@ logger = getLogger(__name__) +_CRAWLEE_SUPPORTS_IS_FINISHED = hasattr(RequestQueueClient, 'is_finished') + + class ApifyRequestQueueSingleClient: """Internal request queue client implementation for single-consumer scenarios on the Apify platform. @@ -277,7 +281,21 @@ async def reclaim_request( async def is_empty(self) -> bool: """Specific implementation of this method for the RQ single access mode.""" - # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. + if not _CRAWLEE_SUPPORTS_IS_FINISHED: + return await self._old_is_empty() + + await self._ensure_head_is_non_empty() + return not self._head_requests + + async def is_finished(self) -> bool: + """Specific implementation of this method for the RQ single access mode.""" + if not _CRAWLEE_SUPPORTS_IS_FINISHED: + return await self._old_is_empty() + + return await self.is_empty() and not self._requests_in_progress + + async def _old_is_empty(self) -> bool: + """Temporary workaround for compatibility with Crawlee versions earlier than 1.8.0.""" await self._ensure_head_is_non_empty() return not self._head_requests and not self._requests_in_progress diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 955d9e81..3866552a 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -11,6 +11,7 @@ from apify_client._models import BatchAddResult, RequestDraft from crawlee import service_locator from crawlee.crawlers import BasicCrawler +from crawlee.storage_clients._base import RequestQueueClient from .._utils import generate_unique_resource_name, poll_until_condition from apify import Actor, Request @@ -1227,10 +1228,18 @@ async def test_force_cloud( assert str(request_queue_request.url) == 'http://example.com' -async def test_request_queue_is_finished( +async def test_request_queue_is_finished_and_is_empty( request_queue_apify: RequestQueue, rq_poll_timeout: int, ) -> None: + """Test that `is_empty` and `is_finished` behave correctly with Apify request queue.""" + + assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2), ( + 'RequestQueue should be empty initially.' + ) + assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2), ( + 'RequestQueue should be finished initially.' + ) await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() @@ -1239,11 +1248,24 @@ async def test_request_queue_is_finished( request_queue_apify.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2 ) assert fetched is not None - assert not await request_queue_apify.is_finished(), ( - 'RequestQueue should not be finished unless the request is marked as handled.' - ) + + if hasattr(RequestQueueClient, 'is_finished'): + assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2), ( + 'RequestQueue should be empty because queue does not contain any requests for fetching.' + ) + assert not await request_queue_apify.is_finished(), ( + 'RequestQueue should not be finished unless the request is marked as handled.' + ) + else: + assert not await poll_until_condition( + request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2 + ), 'RequestQueue should not be empty because queue contains a request in progress.' + assert not await request_queue_apify.is_finished(), ( + 'RequestQueue should not be finished unless the request is marked as handled.' + ) await request_queue_apify.mark_request_as_handled(fetched) + assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2) assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2)