-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdependencies.py
More file actions
92 lines (72 loc) · 3.05 KB
/
dependencies.py
File metadata and controls
92 lines (72 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import boto3
from botocore.config import Config
from botocore.utils import fix_s3_host
from fastapi import Depends, Request
from fastapi_cloudauth.cognito import CognitoClaims # type: ignore
from workerfacing_api import settings
from workerfacing_api.core import auth, filesystem, queue
# S3 client setup
s3_client = None
if settings.s3_bucket:
s3_client = boto3.client(
"s3",
region_name=settings.s3_region,
config=Config(signature_version="v4", s3={"addressing_style": "path"}),
)
# this and config=... required to avoid DNS problems with new buckets
s3_client.meta.events.unregister("before-sign.s3", fix_s3_host)
# Queue
queue_db_url = settings.queue_db_url
retry_different = settings.retry_different
if queue_db_url.startswith("sqlite"):
queue_: queue.RDSJobQueue = queue.SQLiteRDSJobQueue(
db_url=queue_db_url,
retry_different=retry_different,
s3_client=s3_client,
s3_bucket=settings.s3_bucket,
)
else:
queue_ = queue.RDSJobQueue(db_url=queue_db_url, retry_different=retry_different)
def queue_dep() -> queue.RDSJobQueue:
return queue_
# App-internal authentication (i.e. user-facing API <-> worker-facing API)
authorizer = auth.APIKeyDependency(key=settings.internal_api_key_secret)
# Worker authentication
# Lazy initialization to avoid HTTP calls during module import (important for testing)
_current_user_dep: auth.WorkerGroupCognitoCurrentUser | None = None
def _get_current_user_dep() -> auth.WorkerGroupCognitoCurrentUser:
global _current_user_dep
if _current_user_dep is None:
_current_user_dep = auth.WorkerGroupCognitoCurrentUser(
region=settings.cognito_region,
userPoolId=settings.cognito_user_pool_id,
client_id=settings.cognito_client_id,
)
return _current_user_dep
# Create a property-like object that behaves like the original current_user_dep
# but initializes lazily on first access
class _CurrentUserDepProxy:
def __call__(self, *args, **kwargs): # type: ignore
return _get_current_user_dep()(*args, **kwargs)
def __getattr__(self, name): # type: ignore
return getattr(_get_current_user_dep(), name)
current_user_dep = _CurrentUserDepProxy()
async def current_user_global_dep(
request: Request, current_user: CognitoClaims = Depends(current_user_dep)
) -> CognitoClaims:
request.state.current_user = current_user
return current_user
# Filesystem
async def filesystem_dep() -> filesystem.FileSystem:
if settings.filesystem == "s3":
if s3_client is None or settings.s3_bucket is None:
raise ValueError("S3 bucket or client not configured")
return filesystem.S3Filesystem(s3_client, settings.s3_bucket)
elif settings.filesystem == "local":
if settings.user_data_root_path is None:
raise ValueError("Local filesystem requires user_data_root_path")
return filesystem.LocalFilesystem(
settings.user_data_root_path, settings.user_data_root_path
)
else:
raise ValueError("Invalid filesystem setting")