-
Notifications
You must be signed in to change notification settings - Fork 705
Expand file tree
/
Copy pathscheduler.py
More file actions
140 lines (115 loc) · 4.56 KB
/
scheduler.py
File metadata and controls
140 lines (115 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Scheduler process entry point."""
import asyncio
import logging
import signal
import sentry_sdk
from apscheduler.jobstores.base import BaseJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.triggers.cron import CronTrigger
from intentkit.config.config import config
from intentkit.config.db import init_db
from intentkit.config.redis import (
clean_heartbeat,
get_redis,
init_redis,
send_heartbeat,
)
from intentkit.core.credit.plan_credit import issue_all_plan_credits
from intentkit.core.scheduler import create_scheduler
from intentkit.utils.alert import cleanup_alert
from app.services.twitter.oauth2_refresh import refresh_expiring_tokens
logger = logging.getLogger(__name__)
if config.sentry_dsn:
_ = sentry_sdk.init(
dsn=config.sentry_dsn,
sample_rate=config.sentry_sample_rate,
# traces_sample_rate=config.sentry_traces_sample_rate,
# profiles_sample_rate=config.sentry_profiles_sample_rate,
environment=config.env,
release=config.release,
server_name="intent-scheduler",
)
if __name__ == "__main__":
async def send_scheduler_heartbeat():
"""Send a heartbeat signal to Redis to indicate the scheduler is running."""
logger.info("Sending scheduler heartbeat")
try:
redis_client = get_redis()
await send_heartbeat(redis_client, "scheduler")
logger.info("Sent scheduler heartbeat successfully")
except Exception as e:
logger.error("Error sending scheduler heartbeat: %s", e)
async def main():
# Create a shutdown event for graceful termination
shutdown_event = asyncio.Event()
# Initialize database
await init_db(**config.db)
# Initialize Redis
_ = await init_redis(
host=config.redis_host,
port=config.redis_port,
db=config.redis_db,
password=config.redis_password,
ssl=config.redis_ssl,
)
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
# Define an async function to set the shutdown event
async def set_shutdown():
shutdown_event.set()
# Register signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda: asyncio.create_task(set_shutdown()))
# Define the cleanup function that will be called on exit
async def cleanup_resources():
try:
redis_client = get_redis()
await clean_heartbeat(redis_client, "scheduler")
except Exception as e:
logger.error("Error cleaning up heartbeat: %s", e)
cleanup_alert()
# Initialize scheduler
jobstores: dict[str, BaseJobStore] = {}
jobstores["default"] = MemoryJobStore()
logger.info("scheduler using in-memory job store")
scheduler = create_scheduler(jobstores=jobstores)
_ = scheduler.add_job(
refresh_expiring_tokens,
trigger=CronTrigger(minute="*/5", timezone="UTC"),
id="refresh_twitter_tokens",
name="Refresh expiring Twitter tokens",
replace_existing=True,
)
_ = scheduler.add_job(
issue_all_plan_credits,
trigger=CronTrigger(minute="0", timezone="UTC"),
id="issue_plan_credits",
name="Issue monthly plan credits to eligible teams",
replace_existing=True,
)
_ = scheduler.add_job(
send_scheduler_heartbeat,
trigger=CronTrigger(minute="*", timezone="UTC"),
id="scheduler_heartbeat",
name="Scheduler Heartbeat",
replace_existing=True,
)
try:
logger.info("Starting scheduler process...")
scheduler.start()
# Wait for shutdown event
logger.info(
"Scheduler process running. Press Ctrl+C or send SIGTERM to exit."
)
_ = await shutdown_event.wait()
logger.info("Received shutdown signal. Shutting down gracefully...")
except Exception as e:
logger.error("Error in scheduler process: %s", e)
finally:
# Run the cleanup code and shutdown the scheduler
await cleanup_resources()
if scheduler.running:
scheduler.shutdown()
# Run the async main function
# We handle all signals inside the main function, so we don't need to handle KeyboardInterrupt here
asyncio.run(main())