-
Notifications
You must be signed in to change notification settings - Fork 74
Expand file tree
/
Copy pathcelery.py
More file actions
34 lines (30 loc) · 1.31 KB
/
celery.py
File metadata and controls
34 lines (30 loc) · 1.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import os
from model_engine_server.common.dtos.model_endpoints import BrokerType
from model_engine_server.core.celery import TaskVisibility, celery_app
from model_engine_server.inference.common import unset_sensitive_envvars
unset_sensitive_envvars()
broker_type_str = os.getenv("BROKER_TYPE")
broker_type = BrokerType(broker_type_str)
s3_bucket: str = os.environ.get("CELERY_S3_BUCKET") # type: ignore
celery_kwargs = dict(
name="model_engine_server.inference.async_inference",
modules=["model_engine_server.inference.async_inference.tasks"],
aws_role=os.environ["AWS_PROFILE"],
s3_bucket=s3_bucket,
# s3_base_path = TODO get from env var/config
task_reject_on_worker_lost=False,
worker_proc_alive_timeout=1500,
broker_type=broker_type_str,
task_visibility=TaskVisibility.VISIBILITY_24H,
# We're using SQS so this only changes task_time_limit
)
if broker_type == BrokerType.SQS:
queue_name = os.getenv("SQS_QUEUE_NAME")
queue_url = os.getenv("SQS_QUEUE_URL")
celery_kwargs.update(
dict(broker_transport_options={"predefined_queues": {queue_name: {"url": queue_url}}})
)
# TODO: is this unused or something? how come we don't have ABS here?
async_inference_service = celery_app(**celery_kwargs) # type: ignore
if __name__ == "__main__":
async_inference_service.start()