Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions harness/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.models.task_def import TaskDef
from conductor.client.orkes_clients import OrkesClients
from conductor.client.telemetry.metrics_factory import create_metrics_collector
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.simple_task import SimpleTask

Expand Down Expand Up @@ -67,7 +68,13 @@ def register_metadata(clients: OrkesClients) -> None:

def main() -> None:
configuration = Configuration()
clients = OrkesClients(configuration)

metrics_port = env_int_or_default("HARNESS_METRICS_PORT", 9991)
metrics_settings = MetricsSettings(http_port=metrics_port)

metrics_collector = create_metrics_collector(metrics_settings)
print(f"Prometheus metrics server started on port {metrics_port} ({metrics_collector.collector_name()} metrics)")
clients = OrkesClients(configuration, metrics_collector=metrics_collector)

register_metadata(clients)

Expand All @@ -80,10 +87,6 @@ def main() -> None:
worker = SimulatedTaskWorker(task_name, codename, sleep_seconds, batch_size, poll_interval_ms)
workers.append(worker)

metrics_port = env_int_or_default("HARNESS_METRICS_PORT", 9991)
metrics_settings = MetricsSettings(http_port=metrics_port)
print(f"Prometheus metrics will be served on port {metrics_port}")

task_handler = TaskHandler(
workers=workers,
configuration=configuration,
Expand All @@ -93,7 +96,6 @@ def main() -> None:

workflow_executor = clients.get_workflow_executor()
governor = WorkflowGovernor(workflow_executor, WORKFLOW_NAME, workflows_per_sec)
governor.start()

main_pid = os.getpid()
shutting_down = False
Expand All @@ -111,7 +113,13 @@ def shutdown(signum, frame):
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

# Fork worker processes BEFORE starting the governor thread. The parent's
# MetricsCollector uses prometheus_client's multiprocess mode, which guards
# its mmapped value files with a threading.Lock. If the governor thread is
# already running at fork() time, a child can inherit that lock in a held
# state and deadlock on its first metric write.
task_handler.start_processes()
governor.start()
task_handler.join_processes()


Expand Down
4 changes: 4 additions & 0 deletions harness/manifests/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ spec:
- name: HARNESS_POLL_INTERVAL_MS
value: "100"

# === METRICS MODE ===
- name: WORKER_CANONICAL_METRICS
value: "true"

ports:
- name: metrics
containerPort: 9991
Expand Down
23 changes: 21 additions & 2 deletions src/conductor/client/automator/async_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from conductor.client.http.rest import AuthorizationException, ApiException
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.telemetry.metrics_factory import create_metrics_collector
from conductor.client.worker.worker_interface import WorkerInterface
from conductor.client.worker.worker_config import resolve_worker_config, get_worker_config_oneline
from conductor.client.worker.exception import NonRetryableException
Expand Down Expand Up @@ -88,7 +88,7 @@ def __init__(

self.metrics_collector = None
if metrics_settings is not None:
self.metrics_collector = MetricsCollector(
self.metrics_collector = create_metrics_collector(
metrics_settings
)
# Register metrics collector as event listener
Expand Down Expand Up @@ -863,6 +863,7 @@ async def __async_update_task(self, task_result: TaskResult):
if attempt > 0:
# Exponential backoff: [10s, 20s, 30s] before retry
await asyncio.sleep(attempt * 10)
update_start = time.time()
try:
if self._use_update_v2:
next_task = await self.async_task_client.update_task_v2(body=task_result)
Expand All @@ -873,6 +874,10 @@ async def __async_update_task(self, task_result: TaskResult):
task_definition_name,
next_task.task_id if next_task else None
)
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="SUCCESS"
)
return next_task
else:
await self.async_task_client.update_task(body=task_result)
Expand All @@ -882,6 +887,10 @@ async def __async_update_task(self, task_result: TaskResult):
task_result.workflow_instance_id,
task_definition_name,
)
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="SUCCESS"
)
return None
except ApiException as e:
if e.status in (404, 405) and self._use_update_v2:
Expand All @@ -895,12 +904,19 @@ async def __async_update_task(self, task_result: TaskResult):
# Retry immediately with v1
try:
await self.async_task_client.update_task(body=task_result)
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="SUCCESS"
)
return None
except Exception as fallback_e:
last_exception = fallback_e
continue
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="FAILURE"
)
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
Expand All @@ -917,6 +933,9 @@ async def __async_update_task(self, task_result: TaskResult):
except Exception as e:
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="FAILURE"
)
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
Expand Down
23 changes: 17 additions & 6 deletions src/conductor/client/automator/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from conductor.client.event.task_runner_events import TaskRunnerEvent
from conductor.client.event.sync_event_dispatcher import SyncEventDispatcher
from conductor.client.event.sync_listener_register import register_task_runner_listener
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.telemetry.metrics_collector_base import MetricsCollectorBase
from conductor.client.telemetry.model.metric_documentation import MetricDocumentation
from conductor.client.telemetry.model.metric_label import MetricLabel
from conductor.client.telemetry.model.metric_name import MetricName
Expand All @@ -33,13 +33,24 @@
)

_decorated_functions = {}
_VALID_MP_START_METHODS = {"spawn", "fork", "forkserver"}
_mp_fork_set = False
if not _mp_fork_set:
try:
if platform == "win32":
set_start_method("spawn")
else:
set_start_method("fork")
# The prometheus_client library holds a module-level threading lock;
# forking while that lock is held causes a deadlock in child processes.
# Set CONDUCTOR_MP_START_METHOD=spawn to avoid this if you hit the
# deadlock. Default is fork for backward compatibility (spawn requires
# all Process arguments to be picklable).
_default_method = "spawn" if platform == "win32" else "fork"
_method = os.environ.get("CONDUCTOR_MP_START_METHOD", "").strip().lower() or _default_method
if _method not in _VALID_MP_START_METHODS:
logger.warning(
"Ignoring invalid CONDUCTOR_MP_START_METHOD=%r; falling back to %r",
_method, _default_method,
)
_method = _default_method
set_start_method(_method)
_mp_fork_set = True
except Exception as e:
logger.info("error when setting multiprocessing.set_start_method - maybe the context is set %s", e.args)
Expand Down Expand Up @@ -345,7 +356,7 @@ def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -
self.metrics_provider_process = None
return
self.metrics_provider_process = Process(
target=MetricsCollector.provide_metrics,
target=MetricsCollectorBase.provide_metrics,
args=(metrics_settings,)
)
logger.info("Created MetricsProvider process")
Expand Down
23 changes: 21 additions & 2 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from conductor.client.http.rest import AuthorizationException, ApiException
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.telemetry.metrics_factory import create_metrics_collector
from conductor.client.worker.worker import ASYNC_TASK_RUNNING
from conductor.client.worker.worker_interface import WorkerInterface
from conductor.client.worker.worker_config import resolve_worker_config, get_worker_config_oneline
Expand Down Expand Up @@ -69,7 +69,7 @@ def __init__(

self.metrics_collector = None
if metrics_settings is not None:
self.metrics_collector = MetricsCollector(
self.metrics_collector = create_metrics_collector(
metrics_settings
)
# Register metrics collector as event listener
Expand Down Expand Up @@ -972,6 +972,7 @@ def __update_task(self, task_result: TaskResult):
if attempt > 0:
# Exponential backoff: [10s, 20s, 30s] before retry
time.sleep(attempt * 10)
update_start = time.time()
try:
if self._use_update_v2:
next_task = self.task_client.update_task_v2(body=task_result)
Expand All @@ -982,6 +983,10 @@ def __update_task(self, task_result: TaskResult):
task_definition_name,
next_task.task_id if next_task else None
)
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="SUCCESS"
)
return next_task
else:
self.task_client.update_task(body=task_result)
Expand All @@ -991,6 +996,10 @@ def __update_task(self, task_result: TaskResult):
task_result.workflow_instance_id,
task_definition_name,
)
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="SUCCESS"
)
return None
except ApiException as e:
if e.status in (404, 405) and self._use_update_v2:
Expand All @@ -1004,12 +1013,19 @@ def __update_task(self, task_result: TaskResult):
# Retry immediately with v1
try:
self.task_client.update_task(body=task_result)
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="SUCCESS"
)
return None
except Exception as fallback_e:
last_exception = fallback_e
continue
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="FAILURE"
)
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
Expand Down Expand Up @@ -1044,6 +1060,9 @@ def __update_task(self, task_result: TaskResult):
except Exception as e:
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.record_task_update_time(
task_definition_name, time.time() - update_start, status="FAILURE"
)
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
Expand Down
19 changes: 18 additions & 1 deletion src/conductor/client/configuration/settings/metrics_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def __init__(
directory: Optional[str] = None,
file_name: str = "metrics.log",
update_interval: float = 0.1,
http_port: Optional[int] = None):
http_port: Optional[int] = None,
clean_directory: bool = True):
"""
Configure metrics collection settings.

Expand All @@ -40,13 +41,19 @@ def __init__(
If None:
- Metrics will be written to file at {directory}/{file_name}
- No HTTP server will be started
clean_directory: If True (default), remove stale prometheus_client
.db files from the directory on init. Without this,
metric names from previous configurations persist in the
multiprocess directory and appear in /metrics output.
"""
if directory is None:
directory = get_default_temporary_folder()
self.__set_dir(directory)
self.file_name = file_name
self.update_interval = update_interval
self.http_port = http_port
if clean_directory:
self._clean_stale_db_files()

def __set_dir(self, dir: str) -> None:
if not os.path.isdir(dir):
Expand All @@ -57,3 +64,13 @@ def __set_dir(self, dir: str) -> None:
"Failed to create metrics temporary folder, reason: %s", e)

self.directory = dir

def _clean_stale_db_files(self) -> None:
"""Remove leftover prometheus_client multiprocess .db files."""
import glob
pattern = os.path.join(self.directory, "*.db")
for path in glob.glob(pattern):
try:
os.remove(path)
except Exception as e:
logger.debug("Could not remove stale metrics db file %s: %s", path, e)
5 changes: 3 additions & 2 deletions src/conductor/client/orkes/orkes_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@


class OrkesBaseClient(object):
def __init__(self, configuration: Configuration):
self.api_client = ApiClient(configuration)
def __init__(self, configuration: Configuration, metrics_collector=None):
self.metrics_collector = metrics_collector
self.api_client = ApiClient(configuration, metrics_collector=metrics_collector)
self.logger = logging.getLogger(
Configuration.get_logging_formatted_name(__name__)
)
Expand Down
44 changes: 40 additions & 4 deletions src/conductor/client/orkes/orkes_workflow_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
import json
from typing import Optional, List, Dict

from conductor.client.configuration.configuration import Configuration
Expand All @@ -18,9 +19,10 @@
class OrkesWorkflowClient(OrkesBaseClient, WorkflowClient):
def __init__(
self,
configuration: Configuration
configuration: Configuration,
metrics_collector=None,
):
super(OrkesWorkflowClient, self).__init__(configuration)
super(OrkesWorkflowClient, self).__init__(configuration, metrics_collector=metrics_collector)

def start_workflow_by_name(
self,
Expand All @@ -38,10 +40,44 @@ def start_workflow_by_name(
if priority:
kwargs.update({"priority": priority})

return self.workflowResourceApi.start_workflow1(input, name, **kwargs)
self._record_workflow_input_payload_size(name, version, input)
try:
return self.workflowResourceApi.start_workflow1(input, name, **kwargs)
except Exception as e:
self._record_workflow_start_error(name, e)
raise

def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str:
return self.workflowResourceApi.start_workflow(start_workflow_request)
self._record_workflow_input_payload_size(
start_workflow_request.name,
start_workflow_request.version,
getattr(start_workflow_request, "input", None),
)
try:
return self.workflowResourceApi.start_workflow(start_workflow_request)
except Exception as e:
self._record_workflow_start_error(start_workflow_request.name, e)
raise

def _record_workflow_input_payload_size(self, name: str, version, workflow_input) -> None:
if self.metrics_collector is None:
return
try:
encoded = json.dumps(workflow_input if workflow_input is not None else {}, default=str)
size_bytes = len(encoded.encode("utf-8"))
self.metrics_collector.record_workflow_input_payload_size(
name, str(version) if version is not None else "", size_bytes,
)
except Exception:
pass

def _record_workflow_start_error(self, name: str, exception: Exception) -> None:
if self.metrics_collector is None:
return
try:
self.metrics_collector.increment_workflow_start_error(name, exception)
except Exception:
pass

def execute_workflow(
self,
Expand Down
Loading
Loading