Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/mypy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Python application - MyPy

on:
push:
branches: [ main ]
branches: [ "main", "develop" ]
pull_request:
branches: [ main ]
branches: [ "main", "develop" ]

jobs:
build:
Expand All @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
python-version: ['3.10', '3.11', '3.12', '3.13']
name: Python ${{ matrix.python-version }} sample

steps:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ repos:
rev: v3.20.0
hooks:
- id: pyupgrade
args: [--py39-plus]
args: [--py310-plus]
8 changes: 4 additions & 4 deletions lacuscore/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,16 +359,16 @@ def check_viewport(cls, viewport: dict[str, int] | None) -> dict[str, int] | Non
return viewport
raise CaptureSettingsError(f'A viewport must have a width and a height: {viewport}')

def redis_dump(self) -> Mapping[str | bytes, bytes | float | int | str]:
mapping_capture: dict[str | bytes, bytes | float | int | str] = {}
def redis_dump(self) -> Mapping[str | bytes, bytes | str]:
mapping_capture: dict[str | bytes, bytes | str] = {}
for key, value in dict(self).items():
if value is None:
continue
if isinstance(value, bool):
mapping_capture[key] = 1 if value else 0
mapping_capture[key] = '1' if value else '0'
elif isinstance(value, (list, dict)):
if value:
mapping_capture[key] = orjson.dumps(value)
elif isinstance(value, (bytes, float, int, str)) and value not in ['', b'']: # we're ok with 0 for example
mapping_capture[key] = value
mapping_capture[key] = str(value)
return mapping_capture
107 changes: 60 additions & 47 deletions lacuscore/lacuscore.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
from dns.exception import DNSException
from dns.exception import Timeout as DNSTimeout

from glide_sync import ExpiryType, ExpirySet, ConditionalChange, RequestError
from glide_sync import GlideClient, Batch
from playwrightcapture import Capture, PlaywrightCaptureException, InvalidPlaywrightParameter, TrustedTimestampSettings
from pydantic import ValidationError
from redis import Redis
from redis.exceptions import ConnectionError as RedisConnectionError
from redis.exceptions import DataError

from . import task_logger
from .helpers import (
Expand Down Expand Up @@ -107,7 +106,7 @@ class LacusCore():
:param max_retries: How many times should we re-try a capture if it failed.
"""

def __init__(self, redis_connector: Redis[bytes], /, *,
def __init__(self, redis_connector: GlideClient, /, *,
max_capture_time: int=3600,
expire_results: int=36000,
tor_proxy: dict[str, str] | str | None=None,
Expand Down Expand Up @@ -305,14 +304,15 @@ def enqueue(self, *,
self.master_logger.warning('Cannot trigger trusted timestamp, the remote timestamper service settings are missing.')
to_enqueue.with_trusted_timestamps = False

p = self.redis.pipeline()
p.set(f'lacus:query_hash:{hash_query}', perma_uuid, nx=True, ex=recapture_interval)
p.hset(f'lacus:capture_settings:{perma_uuid}', mapping=to_enqueue.redis_dump())
p = Batch(is_atomic=True)
p.set(f'lacus:query_hash:{hash_query}', perma_uuid,
conditional_set=ConditionalChange.ONLY_IF_DOES_NOT_EXIST, expiry=ExpirySet(ExpiryType.SEC, recapture_interval))
p.hset(f'lacus:capture_settings:{perma_uuid}', to_enqueue.redis_dump())
p.expire(f'lacus:capture_settings:{perma_uuid}', self.max_capture_time * 10)
p.zadd('lacus:to_capture', {perma_uuid: priority if priority is not None else 0})
try:
p.execute()
except DataError:
self.redis.exec(p, raise_on_error=True)
except RequestError:
self.master_logger.exception(f'Unable to enqueue: {to_enqueue}')
raise CaptureSettingsError(f'Unable to enqueue: {to_enqueue}')
return perma_uuid
Expand Down Expand Up @@ -372,7 +372,7 @@ def get_capture_status(self, uuid: str) -> CaptureStatus:
return CaptureStatus.QUEUED
if self.redis.zscore('lacus:ongoing', uuid) is not None:
return CaptureStatus.ONGOING
if self.redis.exists(f'lacus:capture_settings:{uuid}'):
if self.redis.exists([f'lacus:capture_settings:{uuid}']):
# we might have a race condition between when the UUID is popped out of lacus:to_capture,
# and pushed in lacus:ongoing.
# if that's the case, we wait for a sec and check lacus:ongoing again
Expand All @@ -388,11 +388,11 @@ def get_capture_status(self, uuid: str) -> CaptureStatus:
# The capture is actually ongoing now
return CaptureStatus.ONGOING
# The UUID is still no anywhere to be found, it's broken.
self.redis.delete(f'lacus:capture_settings:{uuid}')
self.redis.delete([f'lacus:capture_settings:{uuid}'])
return CaptureStatus.UNKNOWN
if self.redis.exists(f'lacus:capture_results_hash:{uuid}'):
if self.redis.exists([f'lacus:capture_results_hash:{uuid}']):
return CaptureStatus.DONE
if self.redis.exists(f'lacus:capture_results:{uuid}'):
if self.redis.exists([f'lacus:capture_results:{uuid}']):
# TODO: remove in 1.8.* - old format used last in 1.6, and kept no more than 10H in redis
return CaptureStatus.DONE
return CaptureStatus.UNKNOWN
Expand All @@ -402,22 +402,23 @@ async def consume_queue(self, max_consume: int) -> AsyncIterator[Task[None]]:

:yield: Captures.
"""
value: list[tuple[bytes, float]]
while max_consume > 0:
value = self.redis.zpopmax('lacus:to_capture')
if not value:
# Nothing to capture
break
if not value[0]:
v = list(value.items())
if not v[0]:
continue
max_consume -= 1
uuid: str = value[0][0].decode()
priority: int = int(value[0][1])
uuid: str = v[0][0].decode()
priority: int = int(v[0][1])

logger = LacusCoreLogAdapter(self.master_logger, {'uuid': uuid})
yield task_logger.create_task(self._capture(uuid, priority), name=uuid,
logger=logger,
message='Capture raised an uncaught exception')
# Make sur the task starts.
# Make sure the task starts.
await asyncio.sleep(0.1)

async def _capture(self, uuid: str, priority: int) -> None:
Expand All @@ -433,7 +434,7 @@ async def _capture(self, uuid: str, priority: int) -> None:

logger = LacusCoreLogAdapter(self.master_logger, {'uuid': uuid})
self.redis.zadd('lacus:ongoing', {uuid: time.time()})
stats_pipeline = self.redis.pipeline()
stats_pipeline = Batch(is_atomic=True)
today = date.today().isoformat()

retry = False
Expand Down Expand Up @@ -569,7 +570,7 @@ async def _capture(self, uuid: str, priority: int) -> None:
cookies.append(cookie)
try:
logger.debug(f'Capturing {url}')
stats_pipeline.sadd(f'stats:{today}:captures', url)
stats_pipeline.sadd(f'stats:{today}:captures', [url])
async with Capture(
browser=browser_engine,
device_name=to_capture.device_name,
Expand Down Expand Up @@ -645,9 +646,9 @@ async def _capture(self, uuid: str, priority: int) -> None:
# PlaywrightCapture considers this capture elligible for a retry
logger.info('PlaywrightCapture considers it elligible for a retry.')
raise RetryCapture('PlaywrightCapture considers it elligible for a retry.')
elif self.redis.exists(f'lacus:capture_retry:{uuid}'):
elif self.redis.exists([f'lacus:capture_retry:{uuid}']):
# this is a retry that worked
stats_pipeline.sadd(f'stats:{today}:retry_success', url)
stats_pipeline.sadd(f'stats:{today}:retry_success', [url])
except RetryCapture:
if max_retries == 0:
error_msg = result['error'] if result.get('error') else 'Unknown error'
Expand All @@ -659,9 +660,8 @@ async def _capture(self, uuid: str, priority: int) -> None:
# No retry yet
logger.debug(f'Retrying {url} for the first time.')
retry = True
self.redis.setex(f'lacus:capture_retry:{uuid}',
self.max_capture_time * (max_retries + 10),
max_retries - 1)
self.redis.set(f'lacus:capture_retry:{uuid}', str(max_retries - 1),
expiry=ExpirySet(ExpiryType.SEC, self.max_capture_time * (max_retries + 10)))
else:
current_retry = int(_current_retry.decode())
if current_retry > 0:
Expand All @@ -671,7 +671,7 @@ async def _capture(self, uuid: str, priority: int) -> None:
else:
error_msg = result['error'] if result.get('error') else 'Unknown error'
logger.info(f'Retried too many times {url}: {error_msg}')
stats_pipeline.sadd(f'stats:{today}:retry_failed', url)
stats_pipeline.sadd(f'stats:{today}:retry_failed', [url])
except CaptureError:
if not result:
result = {'error': "No result key, shouldn't happen"}
Expand Down Expand Up @@ -712,55 +712,65 @@ async def _capture(self, uuid: str, priority: int) -> None:
if self.redis.zcard('lacus:to_capture') == 0:
# Just wait a little bit before retrying
await asyncio.sleep(random.randint(5, 10))
p = self.redis.pipeline()
p.zrem('lacus:ongoing', uuid)
p = Batch(is_atomic=True)
p.zrem('lacus:ongoing', [uuid])
p.zadd('lacus:to_capture', {uuid: priority - 1})
p.execute()
try:
self.redis.exec(p, raise_on_error=True)
except RequestError as e:
logger.warning(f'Unable to trigger a retry: {e}')
else:
retry_redis_error = 3
while retry_redis_error > 0:
try:
p = self.redis.pipeline()
p = Batch(is_atomic=True)
if result:
self._store_capture_response(p, uuid, result)
else:
logger.warning('Got no result at all for the capture.')
result = {'error': 'No result at all for the capture, Playwright failed.'}
self._store_capture_response(p, uuid, result)
p.delete(f'lacus:capture_settings:{uuid}')
p.zrem('lacus:ongoing', uuid)
p.execute()
p.delete([f'lacus:capture_settings:{uuid}'])
p.zrem('lacus:ongoing', [uuid])
self.redis.exec(p, raise_on_error=True)
break
except RedisConnectionError as e:
except RequestError as e:
logger.warning(f'Unable to store capture result - Redis Connection Error: {e}')
retry_redis_error -= 1
await asyncio.sleep(random.randint(5, 10))
else:
# Unrecoverable redis error, remove the capture settings
p = self.redis.pipeline()
p.delete(f'lacus:capture_settings:{uuid}')
p.zrem('lacus:ongoing', uuid)
p = Batch(is_atomic=True)
p.delete([f'lacus:capture_settings:{uuid}'])
p.zrem('lacus:ongoing', [uuid])
result = {'error': "Unable to store the result of the capture in redis (probably a huge download)."}
self._store_capture_response(p, uuid, result)
p.execute()
try:
self.redis.exec(p, raise_on_error=True)
except RequestError as e:
logger.warning(f'Unable to store Playwright error: {e}')

stats_pipeline.zincrby(f'stats:{today}:errors', 1, 'Redis Connection')
logger.critical('Unable to connect to redis and to push the result of the capture.')

# Expire stats in 10 days
expire_time = timedelta(days=10)
expire_time = timedelta(days=10).seconds
stats_pipeline.expire(f'stats:{today}:errors', expire_time)
stats_pipeline.expire(f'stats:{today}:retry_failed', expire_time)
stats_pipeline.expire(f'stats:{today}:retry_success', expire_time)
stats_pipeline.expire(f'stats:{today}:captures', expire_time)
stats_pipeline.execute()
try:
self.redis.exec(stats_pipeline, raise_on_error=True)
except RequestError as e:
logger.warning(f'Unable to store stats: {e}')

def _store_capture_response(self, pipeline: Redis, capture_uuid: str, results: CaptureResponse, # type: ignore[type-arg]
def _store_capture_response(self, pipeline: Batch, capture_uuid: str, results: CaptureResponse,
root_key: str | None=None) -> None:
logger = LacusCoreLogAdapter(self.master_logger, {'uuid': capture_uuid})
if root_key is None:
root_key = f'lacus:capture_results_hash:{capture_uuid}'

hash_to_set = {}
hash_to_set: dict[str, bytes] = {}
try:
if results.get('har'):
hash_to_set['har'] = pickle.dumps(results['har'])
Expand Down Expand Up @@ -797,7 +807,7 @@ def _store_capture_response(self, pipeline: Redis, capture_uuid: str, results: C
# these entries can be stored directly
hash_to_set[key] = results[key] # type: ignore[literal-required]
if hash_to_set:
pipeline.hset(root_key, mapping=hash_to_set) # type: ignore[arg-type]
pipeline.hset(root_key, hash_to_set) # type: ignore[arg-type]
# Make sure the key expires
pipeline.expire(root_key, self.expire_results)
else:
Expand Down Expand Up @@ -859,11 +869,14 @@ def clear_capture(self, uuid: str, reason: str) -> None:
return
logger.warning(f'Clearing capture: {reason}')
result: CaptureResponse = {'error': reason}
p = self.redis.pipeline()
p = Batch(is_atomic=True)
self._store_capture_response(p, uuid, result)
p.delete(f'lacus:capture_settings:{uuid}')
p.zrem('lacus:ongoing', uuid)
p.execute()
p.delete([f'lacus:capture_settings:{uuid}'])
p.zrem('lacus:ongoing', [uuid])
try:
self.redis.exec(p, raise_on_error=True)
except RequestError as e:
logger.warning(f'Error while clearing capture: {e}.')

async def __get_ips(self, logger: LacusCoreLogAdapter, hostname: str) -> list[IPv4Address | IPv6Address]:
# We need to use dnspython for resolving because socket.getaddrinfo will sometimes be stuck for ~10s
Expand Down
Loading