Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib
from collections import deque
from datetime import datetime, timedelta, timezone
from logging import getLogger
Expand Down Expand Up @@ -73,9 +74,6 @@ def __init__(
self._queue_has_locked_requests: bool | None = None
"""Whether the queue contains requests currently locked by other clients."""

self._should_check_for_forefront_requests = False
"""Flag indicating whether to refresh the queue head to check for newly added forefront requests."""

self._fetch_lock = asyncio.Lock()
"""Lock to prevent race conditions during concurrent fetch operations."""

Expand Down Expand Up @@ -106,7 +104,7 @@ async def add_batch_of_requests(
)

else:
# Add new request to the cache.
# Add new request to the cache, hydrated so subsequent fetches don't need an API roundtrip.
processed_request = ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
Expand All @@ -116,6 +114,7 @@ async def add_batch_of_requests(
self._cache_request(
request_id,
processed_request,
hydrated_request=request,
)
new_requests.append(request)

Expand All @@ -137,9 +136,23 @@ async def add_batch_of_requests(
api_response.processed_requests.extend(already_present_requests)

# Remove unprocessed requests from the cache
unprocessed_ids = set[str]()
for unprocessed_request in api_response.unprocessed_requests:
unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key)
self._requests_cache.pop(unprocessed_request_id, None)
unprocessed_ids.add(unprocessed_request_id)

# When adding to the forefront, eagerly insert successfully-added requests at the front of the
# local queue head. The Apify API has a propagation delay between `batch_add_requests(forefront)`
# and the result being reflected in `list_and_lock_head` (#808), so trusting the local intent
# makes the new requests fetchable immediately. Iterate in reverse so the first request in the
# batch ends up at the very front.
if forefront:
for request in reversed(new_requests):
request_id = unique_key_to_request_id(request.unique_key)
if request_id in unprocessed_ids or request_id in self._queue_head:
continue
self._queue_head.appendleft(request_id)

else:
api_response = AddRequestsResponse.model_validate(
Expand Down Expand Up @@ -274,10 +287,15 @@ async def reclaim_request(
hydrated_request=request,
)

# If we're adding to the forefront, we need to check for forefront requests
# in the next list_head call
# When reclaiming to the forefront, eagerly insert the request at the front of the local
# queue head. The Apify API has a propagation delay between `update_request(forefront=true)`
# and the result being reflected in `list_and_lock_head` (#808), so trusting the local intent
# avoids returning a different request on the next fetch. Dedupe in case the request_id was
# re-added by a concurrent refresh between fetch and reclaim.
if forefront:
self._should_check_for_forefront_requests = True
with contextlib.suppress(ValueError):
self._queue_head.remove(request_id)
self._queue_head.appendleft(request_id)

except Exception:
logger.exception(f'Error reclaiming request {request.unique_key}')
Expand Down Expand Up @@ -317,7 +335,7 @@ async def _get_request_by_id(self, request_id: str) -> Request | None:
async def _ensure_head_is_non_empty(self) -> None:
"""Ensure that the queue head has requests if they are available in the queue."""
# If queue head has adequate requests, skip fetching more
if len(self._queue_head) > 1 and not self._should_check_for_forefront_requests:
if len(self._queue_head) > 1:
return

# Fetch requests from the API and populate the queue head
Expand Down Expand Up @@ -404,8 +422,10 @@ async def _list_head(
Returns:
A collection of requests from the beginning of the queue.
"""
# Return from cache if available and we're not checking for new forefront requests
if self._queue_head and not self._should_check_for_forefront_requests:
# Return from cache if available. Local-head updates from `add_batch_of_requests(forefront=True)` and
# `reclaim_request(forefront=True)` keep the cache authoritative for our own forefront ops, so we don't
# need to force an API refresh after them.
if self._queue_head:
logger.debug(f'Using cached queue head with {len(self._queue_head)} requests')
# Create a list of requests from the cached queue head
items = []
Expand All @@ -424,11 +444,6 @@ async def _list_head(
lock_time=None,
queue_has_locked_requests=self._queue_has_locked_requests,
)
leftover_buffer = list[str]()
if self._should_check_for_forefront_requests:
leftover_buffer = list(self._queue_head)
self._queue_head.clear()
self._should_check_for_forefront_requests = False

# Otherwise fetch from API
response = await self._api_client.list_and_lock_head(
Expand Down Expand Up @@ -469,10 +484,6 @@ async def _list_head(
)
self._queue_head.append(request_id)

for leftover_id in leftover_buffer:
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
self._queue_head.append(leftover_id)

return RequestQueueHead.model_validate(response)

def _cache_request(
Expand Down
Loading