Skip to content

Commit 6e2a2e9

Browse files
manan164claude
andcommitted
Merge main into fix/thread-safety-and-metrics-registry
Resolve conflicts after PR #375 was merged to main. Keep thread safety improvements (RLock in MetricsCollector, process lock in TaskHandler) while adopting main's cleaner lazy-init metrics counter and simpler process cleanup logic. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2 parents 21ba23f + 8582146 commit 6e2a2e9

3 files changed

Lines changed: 36 additions & 55 deletions

File tree

src/conductor/client/automator/task_handler.py

Lines changed: 36 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -255,22 +255,7 @@ def __init__(
255255

256256
self.__create_task_runner_processes(workers, configuration, metrics_settings)
257257
self.__create_metrics_provider_process(metrics_settings)
258-
259-
# Initialize worker restart counter directly using prometheus_client (if metrics enabled).
260-
# We use prometheus_client directly in the parent process instead of MetricsCollector
261-
# to avoid registry confusion between parent and worker processes.
262258
self._worker_restart_counter = None
263-
if metrics_settings is not None:
264-
try:
265-
from prometheus_client import Counter
266-
self._worker_restart_counter = Counter(
267-
name='worker_restart_total',
268-
documentation='Number of times TaskHandler restarted a worker subprocess',
269-
labelnames=['taskType']
270-
)
271-
logger.debug("Initialized worker_restart_total counter in parent process")
272-
except Exception as e:
273-
logger.debug("Failed to initialize worker restart counter: %s", e)
274259

275260
# Optional supervision: monitor worker processes and (optionally) restart on failure.
276261
self.monitor_processes = monitor_processes
@@ -441,52 +426,58 @@ def __restart_worker_process(self, index: int) -> None:
441426
attempt = self._restart_counts[index] + 1
442427

443428
# Exponential backoff per-worker to avoid tight crash loops
444-
backoff = min(self.restart_backoff_seconds * (2 ** self._restart_counts[index]), self.restart_backoff_max_seconds)
429+
backoff = min(self.restart_backoff_seconds * (2 ** max(self._restart_counts[index], 0)), self.restart_backoff_max_seconds)
445430
self._next_restart_at[index] = now + backoff
446431

447432
try:
448433
# Reap the old process (avoid accumulating zombies on repeated restarts).
449434
old_process = self.task_runner_processes[index]
450-
old_pid = getattr(old_process, "pid", None)
451435
try:
452436
if old_process is not None and old_process.exitcode is not None:
453-
# Give process a bit more time to clean up
454-
old_process.join(timeout=0.5)
437+
old_process.join(timeout=0.0)
455438
try:
456439
old_process.close()
457-
logger.debug("Cleaned up old worker process (worker=%s, pid=%s)", worker.get_task_definition_name(), old_pid)
458-
except Exception as close_err:
459-
logger.debug("Failed to close old worker process (worker=%s, pid=%s): %s",
460-
worker.get_task_definition_name(), old_pid, close_err)
461-
except Exception as join_err:
462-
logger.debug("Failed to join old worker process (worker=%s, pid=%s): %s",
463-
worker.get_task_definition_name(), old_pid, join_err)
440+
except Exception:
441+
pass
442+
except Exception:
443+
pass
464444

465445
new_process = self.__build_process_for_worker(worker)
466446
self.task_runner_processes[index] = new_process
467447
new_process.start()
468448
self._restart_counts[index] = attempt
469449
self.__inc_worker_restart_metric(worker.get_task_definition_name())
470450
logger.info(
471-
"Restarted worker process (worker=%s, attempt=%s, old_pid=%s, new_pid=%s, next_backoff=%ss)",
451+
"Restarted worker process (worker=%s, attempt=%s, pid=%s, next_backoff=%ss)",
472452
worker.get_task_definition_name(),
473453
attempt,
474-
old_pid,
475454
new_process.pid,
476455
backoff
477456
)
478457
except Exception as e:
479458
logger.error("Failed to restart worker process (worker=%s): %s", worker.get_task_definition_name(), e)
480459

481460
def __inc_worker_restart_metric(self, task_type: str) -> None:
482-
"""Best-effort counter increment for worker subprocess restarts."""
483-
if self._worker_restart_counter is None:
461+
"""Best-effort counter increment for worker subprocess restarts (requires metrics_settings)."""
462+
if self._metrics_settings is None:
484463
return
485464

486465
try:
487-
# Increment the prometheus counter directly.
488-
# This writes to the shared multiprocess metrics directory.
489-
self._worker_restart_counter.labels(taskType=task_type).inc()
466+
# Avoid instantiating MetricsCollector here: it keeps a global registry which can be problematic
467+
# when multiple TaskHandlers/tests use different PROMETHEUS_MULTIPROC_DIR values in one process.
468+
from conductor.client.telemetry import metrics_collector as mc
469+
470+
mc._ensure_prometheus_imported()
471+
if self._worker_restart_counter is None:
472+
# Use a dedicated registry to avoid duplicate metric registration errors in the default registry.
473+
registry = mc.CollectorRegistry()
474+
self._worker_restart_counter = mc.Counter(
475+
name=MetricName.WORKER_RESTART,
476+
documentation=MetricDocumentation.WORKER_RESTART,
477+
labelnames=[MetricLabel.TASK_TYPE.value],
478+
registry=registry,
479+
)
480+
self._worker_restart_counter.labels(task_type).inc()
490481
except Exception as e:
491482
# Metrics should never break worker supervision.
492483
logger.debug("Failed to increment worker_restart metric: %s", e)
@@ -511,19 +502,17 @@ def __build_process_for_worker(self, worker: WorkerInterface) -> Process:
511502

512503
def get_worker_process_status(self) -> List[Dict[str, Any]]:
513504
"""Return basic worker process status for health checks / observability."""
514-
# Lock to ensure consistent snapshot of process state
515-
with self._process_lock:
516-
statuses: List[Dict[str, Any]] = []
517-
for i, worker in enumerate(self.workers):
518-
process = self.task_runner_processes[i] if i < len(self.task_runner_processes) else None
519-
statuses.append({
520-
"worker": worker.get_task_definition_name(),
521-
"pid": getattr(process, "pid", None),
522-
"alive": process.is_alive() if process is not None else False,
523-
"exitcode": getattr(process, "exitcode", None),
524-
"restart_count": self._restart_counts[i] if i < len(self._restart_counts) else 0,
525-
})
526-
return statuses
505+
statuses: List[Dict[str, Any]] = []
506+
for i, worker in enumerate(self.workers):
507+
process = self.task_runner_processes[i] if i < len(self.task_runner_processes) else None
508+
statuses.append({
509+
"worker": worker.get_task_definition_name(),
510+
"pid": getattr(process, "pid", None),
511+
"alive": process.is_alive() if process is not None else False,
512+
"exitcode": getattr(process, "exitcode", None),
513+
"restart_count": self._restart_counts[i] if i < len(self._restart_counts) else 0,
514+
})
515+
return statuses
527516

528517
def is_healthy(self) -> bool:
529518
"""True if all worker processes are alive."""
@@ -541,7 +530,7 @@ def __start_task_runner_processes(self):
541530
task_runner_process.start()
542531
worker = self.workers[i]
543532
paused_status = "PAUSED" if getattr(worker, "paused", False) else "ACTIVE"
544-
logger.info("Started worker '%s' [%s] (pid=%s)", worker.get_task_definition_name(), paused_status, task_runner_process.pid)
533+
logger.debug("Started worker '%s' [%s]", worker.get_task_definition_name(), paused_status)
545534
n = n + 1
546535
logger.info("Started %s TaskRunner process(es)", n)
547536

src/conductor/client/http/async_rest.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,6 @@ async def _reset_connection(self) -> None:
8888
except Exception:
8989
pass
9090
self.connection = self._create_default_httpx_client()
91-
# Log at debug level for diagnostics
92-
import logging
93-
logger = logging.getLogger(__name__)
94-
logger.debug("Reset HTTP connection after protocol error (HTTP/2 enabled: %s)", self._http2_enabled)
9591

9692
async def __aenter__(self):
9793
"""Async context manager entry."""

src/conductor/client/http/rest.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@ def _reset_connection(self) -> None:
9191
except Exception:
9292
pass
9393
self.connection = self._create_default_httpx_client()
94-
# Log at debug level for diagnostics
95-
import logging
96-
logger = logging.getLogger(__name__)
97-
logger.debug("Reset HTTP connection after protocol error (HTTP/2 enabled: %s)", self._http2_enabled)
9894

9995
def __del__(self):
10096
"""Cleanup httpx client on object destruction."""

0 commit comments

Comments
 (0)