Skip to content

Commit ec124e6

Browse files
committed
add stop worker functionality
1 parent 2a34940 commit ec124e6

4 files changed

Lines changed: 365 additions & 143 deletions

File tree

model/worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class WorkerStatus:
1717
READY = "READY"
1818
RUNNING = "RUNNING"
1919
FAILED = "FAILED"
20+
STOPPING = "STOPPING"
2021
STOPPED = "STOPPED"
2122

2223

queuer.py

Lines changed: 111 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from .queuer_next_interval import QueuerNextIntervalMixin
2929
from .queuer_listener import QueuerListenerMixin
3030
from .queuer_master import QueuerMasterMixin
31+
from .queuer_worker import QueuerWorkerMixin
3132

3233
logger = get_logger(__name__)
3334

@@ -68,6 +69,7 @@ class Queuer(
6869
QueuerNextIntervalMixin,
6970
QueuerListenerMixin,
7071
QueuerMasterMixin,
72+
QueuerWorkerMixin,
7173
):
7274
"""
7375
Main queuing system class.
@@ -113,6 +115,7 @@ def __init__(
113115

114116
# Configuration
115117
self.job_poll_interval: timedelta = timedelta(minutes=5)
118+
self.worker_poll_interval: timedelta = timedelta(seconds=30)
116119
self.retention_archive: timedelta = timedelta(days=30)
117120

118121
# Active runners (jobs currently executing)
@@ -219,177 +222,96 @@ def start(self, master_settings: Optional[MasterSettings] = None) -> None:
219222
logger.info("Queuer started")
220223

221224
def stop(self) -> None:
222-
"""Stop the queuer."""
223-
# Store whether we were running to decide if we need to close connections
224-
was_running = getattr(self, "_running", False)
225+
"""
226+
Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs,
227+
and cancelling the context to stop the queuer.
228+
"""
229+
# Check if already stopped (database is None means we've already cleaned up)
230+
if self.database is None:
231+
return # Already stopped
232+
233+
# Mark as not running immediately to prevent re-entrant calls
234+
was_running = self.running
235+
self.running = False
225236

226-
# Stop tickers first to prevent new work from being queued
227-
if self.heartbeat_ticker:
237+
# Stop tickers first to prevent them from calling stop() recursively
238+
if hasattr(self, "heartbeat_ticker") and self.heartbeat_ticker:
228239
try:
229240
self.heartbeat_ticker.stop()
230241
except Exception as e:
231242
logger.warning(f"Error stopping heartbeat ticker: {e}")
232243

233-
if self.poll_job_ticker:
244+
if hasattr(self, "poll_job_ticker") and self.poll_job_ticker:
234245
try:
235246
self.poll_job_ticker.stop()
236247
except Exception as e:
237248
logger.warning(f"Error stopping poll job ticker: {e}")
238249

239-
# Close database listeners
250+
# Close db listeners
240251
if self.job_db_listener:
241252
try:
242-
# Use go_func but wait for completion
243253
task = go_func(self.job_db_listener.stop, use_mp=False)
244-
task.get_results(timeout=3.0) # Wait up to 3 seconds total
254+
task.get_results(timeout=3.0)
245255
except Exception as e:
246256
# Only log if it's not already closed
247-
if "closed" not in str(e).lower():
248-
logger.warning(f"Error stopping job listener: {e}")
249-
250-
# Wait for database listener runners to complete
251-
if self.job_db_listener_runner:
252-
try:
253-
self.job_db_listener_runner.get_results(timeout=2.0)
254-
self.job_db_listener_runner = None
255-
except Exception as e:
256-
logger.warning(f"Error waiting for job listener runner: {e}")
257+
if "Listener has been closed" not in str(e):
258+
logger.error(f"Error closing job insert listener: {e}")
257259

258260
if self.job_archive_db_listener:
259261
try:
260262
task = go_func(self.job_archive_db_listener.stop, use_mp=False)
261-
task.get_results(timeout=3.0) # Wait up to 3 seconds total
263+
task.get_results(timeout=3.0)
262264
except Exception as e:
263-
if "closed" not in str(e).lower():
264-
logger.warning(f"Error stopping job archive listener: {e}")
265+
if "Listener has been closed" not in str(e):
266+
logger.error(f"Error closing job archive listener: {e}")
265267

266-
# Wait for database archive listener runner to complete
267-
if self.job_archive_db_listener_runner:
268-
try:
269-
self.job_archive_db_listener_runner.get_results(timeout=2.0)
270-
self.job_archive_db_listener_runner = None
271-
except Exception as e:
272-
logger.warning(f"Error waiting for job archive listener runner: {e}")
273-
274-
# Update worker status to stopped (only if we were running)
268+
# Update worker status to stopped
269+
err = None
275270
worker_rid = None
276-
if was_running:
277-
try:
278-
with self.worker_mutex:
271+
if self.database and self.database.instance:
272+
with self.worker_mutex:
273+
if self.worker:
279274
self.worker.status = WorkerStatus.STOPPED
280275
worker_updated = self.db_worker.update_worker(self.worker)
281276
if worker_updated:
277+
self.worker = worker_updated
282278
worker_rid = self.worker.rid
283-
except Exception as e:
284-
logger.error(f"Error updating worker status to stopped: {e}")
285-
else:
286-
# If we were never running, just get the worker_rid for job cancellation
287-
worker_rid = self.worker.rid
279+
else:
280+
err = Exception("Failed to update worker")
288281

289-
# Cancel all queued and running jobs
282+
if err:
283+
logger.error(f"Error updating worker status to stopped: {err}")
284+
return
285+
286+
# Cancel all queued and running jobs (only if we have a valid worker RID)
290287
if worker_rid:
291288
try:
292289
self.cancel_all_jobs_by_worker(worker_rid, 100)
293290
except Exception as e:
294291
logger.error(f"Error cancelling all jobs by worker: {e}")
292+
return
295293

296-
# Cancel the context equivalent - set cancellation event
297-
if self._cancel_event:
298-
try:
299-
self._cancel_event.set()
300-
except Exception as e:
301-
logger.warning(f"Error setting cancel event: {e}")
302-
303-
# Signal that we're stopping
304-
self.running = False
305-
self._stopped.set()
306-
307-
# Cancel all active runners - similar to Go's job cancellation
308-
active_runner_count = len(self.active_runners)
309-
if active_runner_count > 0:
310-
logger.info(f"Cancelling {active_runner_count} active runners")
311-
294+
# Cancel all active runners
312295
for runner_id, runner in list(self.active_runners.items()):
313-
try:
314-
runner.cancel()
315-
if runner_id in self.active_runners:
316-
del self.active_runners[runner_id]
317-
logger.debug(f"Removed runner {runner_id} from active_runners")
296+
runner.cancel()
297+
del self.active_runners[runner_id]
318298

319-
except Exception as e:
320-
logger.warning(f"Error cancelling runner {runner_id}: {e}")
321-
322-
# Final check
323-
remaining_runners = len(self.active_runners)
324-
if remaining_runners > 0:
325-
logger.warning(f"{remaining_runners} runners could not be cleaned up")
326-
else:
327-
logger.debug("All active runners cleaned up successfully")
328-
329-
# Cleanup broadcasters and listeners
330-
try:
331-
broadcaster_count = 0
332-
if self.job_insert_broadcaster:
333-
listener_count = len(self.job_insert_broadcaster.listeners)
334-
if listener_count > 0:
335-
logger.debug(
336-
f"Clearing {listener_count} listeners from job_insert_broadcaster"
337-
)
338-
self.job_insert_broadcaster.listeners.clear()
339-
broadcaster_count += 1
340-
341-
if self.job_update_broadcaster:
342-
listener_count = len(self.job_update_broadcaster.listeners)
343-
if listener_count > 0:
344-
logger.debug(
345-
f"Clearing {listener_count} listeners from job_update_broadcaster"
346-
)
347-
self.job_update_broadcaster.listeners.clear()
348-
broadcaster_count += 1
349-
350-
if self.job_delete_broadcaster:
351-
listener_count = len(self.job_delete_broadcaster.listeners)
352-
if listener_count > 0:
353-
logger.debug(
354-
f"Clearing {listener_count} listeners from job_delete_broadcaster"
355-
)
356-
self.job_delete_broadcaster.listeners.clear()
357-
broadcaster_count += 1
358-
359-
if broadcaster_count > 0:
360-
logger.debug(f"Cleaned up {broadcaster_count} broadcasters")
361-
362-
except Exception as e:
363-
logger.warning(f"Error during broadcaster cleanup: {e}")
364-
365-
# Cleanup AsyncIO resources
366-
try:
367-
try:
368-
loop = asyncio.get_running_loop()
369-
pending_tasks = [
370-
task for task in asyncio.all_tasks(loop) if not task.done()
371-
]
372-
if pending_tasks:
373-
logger.debug(f"Cancelling {len(pending_tasks)} pending async tasks")
374-
for task in pending_tasks:
375-
task.cancel()
376-
except RuntimeError:
377-
pass
378-
379-
# Clear the cancel event
380-
self._cancel_event = None
299+
# Cancel the context to stop the queuer
300+
if self._cancel_event:
301+
self._cancel_event.set()
381302

382-
except Exception as e:
383-
logger.warning(f"Error during AsyncIO cleanup: {e}")
303+
# Wait a moment for background goroutines to finish gracefully
304+
time.sleep(0.1)
384305

306+
# Close database connection
385307
if self.database:
308+
logger.info("Closing database connection")
386309
try:
387-
logger.info(f"Closing database '{self.name}'")
388310
self.database.close()
389311
except Exception as e:
390-
logger.error(f"Error closing main database connection: {e}")
312+
logger.error(f"Error closing database connection: {e}")
391313

392-
logger.info(f"Queuer '{self.worker.name}' stopped")
314+
logger.info("Queuer stopped")
393315

394316
# Job notification listeners
395317
async def _handle_job_notification(self, notification: str) -> None:
@@ -524,31 +446,77 @@ def _wait_for_listeners_ready(self, timeout_seconds: float = 3.0) -> None:
524446

525447
# Tickers
526448
def _heartbeat_func(self) -> None:
527-
"""Send periodic heartbeats - only updates database, not queuer state."""
449+
"""Send periodic heartbeats and handle worker status changes."""
528450
try:
529-
# Get current worker with mutex
451+
logger.debug("Sending worker heartbeat...")
452+
453+
# Get current worker with read lock
530454
with self.worker_mutex:
531-
current_worker = self.worker
532-
533-
updated_worker: Optional[Worker] = None
534-
if current_worker:
535-
# Update timestamp and save to database
536-
current_worker.updated_at = datetime.now()
537-
updated_worker = self.db_worker.update_worker(current_worker)
538-
logger.debug(
539-
f"Updated worker heartbeat timestamp: {current_worker.updated_at}"
455+
worker = self.worker
456+
457+
if worker is None:
458+
return
459+
460+
# Select worker from database for heartbeat
461+
worker_from_db = self.db_worker.select_worker(worker.rid)
462+
if not worker_from_db:
463+
logger.error("Error selecting worker for heartbeat")
464+
return
465+
466+
# Handle worker status
467+
if worker_from_db.status == WorkerStatus.STOPPED:
468+
logger.info(
469+
f"Stopping worker... (worker_status: {worker_from_db.status})"
540470
)
471+
try:
472+
self.stop()
473+
except Exception as e:
474+
logger.error(f"Error stopping queuer: {e}")
475+
return
476+
477+
elif worker_from_db.status == WorkerStatus.STOPPING:
478+
if worker_from_db.max_concurrency != 0:
479+
logger.info(
480+
f"Gracefully stopping worker... (worker_status: {worker_from_db.status})"
481+
)
482+
worker_from_db.max_concurrency = 0
483+
worker_from_db = self.db_worker.update_worker(worker_from_db)
484+
if not worker_from_db:
485+
logger.error("Error updating worker concurrency")
486+
return
487+
elif len(self.active_runners) == 0:
488+
logger.info(
489+
f"All running jobs finished, stopping worker... (worker_status: {worker_from_db.status})"
490+
)
491+
worker_from_db.status = WorkerStatus.STOPPED
492+
worker_from_db = self.db_worker.update_worker(worker_from_db)
493+
if not worker_from_db:
494+
logger.error("Error updating worker status to stopped")
495+
return
496+
try:
497+
self.stop()
498+
except Exception as e:
499+
logger.error(f"Error stopping queuer: {e}")
500+
return
501+
502+
else:
503+
# Default case: update worker heartbeat
504+
worker_from_db = self.db_worker.update_worker(worker)
505+
if not worker_from_db:
506+
logger.error("Error updating worker heartbeat")
507+
return
508+
509+
# Update local worker with write lock
510+
with self.worker_mutex:
511+
self.worker = worker_from_db
541512

542-
if updated_worker:
543-
with self.worker_mutex:
544-
self.worker = updated_worker
545513
except Exception as e:
546514
logger.error(f"Heartbeat error: {e}")
547515

548516
def _start_heartbeat_ticker(self) -> None:
549517
"""Start heartbeat ticker using threading."""
550518
self.heartbeat_ticker = Ticker(
551-
timedelta(seconds=30),
519+
self.worker_poll_interval,
552520
self._heartbeat_func,
553521
use_mp=False,
554522
)

0 commit comments

Comments
 (0)