Skip to content

feat: Metrics to count all non-terminal execution statuses#186

Open
morgan-wowk wants to merge 1 commit intotask-status-durationfrom
execution-status-count
Open

feat: Metrics to count all non-terminal execution statuses#186
morgan-wowk wants to merge 1 commit intotask-status-durationfrom
execution-status-count

Conversation

@morgan-wowk
Copy link
Copy Markdown
Collaborator

@morgan-wowk morgan-wowk commented Mar 24, 2026

TL;DR

Added a metrics polling service that periodically queries the database to emit execution status count gauges for monitoring active (non-terminal) pipeline executions.

Screenshot 2026-03-23 at 5.24.33 PM.png

What changed?

  • Added EXECUTIONS metric unit to MetricUnit enum
  • Created execution_status_count observable gauge to track execution node counts by status
  • Implemented PollingService class in new metrics_poller.py module that:
    • Queries the database every 30 seconds for execution status counts
    • Only tracks active (non-terminal) statuses like PENDING, RUNNING, etc.
    • Thread-safely updates gauge observations with current counts
  • Integrated metrics poller as a daemon thread in the application startup, with automatic detection of OpenTelemetry metrics configuration

How to test?

  1. Set the OpenTelemetry metrics exporter endpoint environment variable (see examples/observability/otel-jaeger-prometheus)
  2. Start the application and verify the metrics poller thread launches
  3. Create pipeline executions in various active statuses
  4. Check that the execution.status.count gauge emits observations with correct counts for each status
  5. Verify that terminal statuses (SUCCEEDED, FAILED) are not included in the gauge metrics

Why make this change?

This enables real-time monitoring of pipeline execution health by providing visibility into how many executions are currently in each active state, which is essential for operational observability.

Copy link
Copy Markdown
Collaborator Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

@morgan-wowk morgan-wowk force-pushed the execution-status-count branch 2 times, most recently from ccde576 to cfdf72a Compare March 24, 2026 00:35
@morgan-wowk morgan-wowk marked this pull request as ready for review March 24, 2026 00:40
@morgan-wowk morgan-wowk requested a review from Ark-kun as a code owner March 24, 2026 00:40
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from cfdf72a to 365b4d3 Compare March 24, 2026 00:58
…ng service

    Replace the UpDownCounter-based execution.queue.depth metric (which required
    DB seeding on startup and drifted on crashes) with a PollingService daemon
    thread that runs COUNT GROUP BY status every 30s and emits an ObservableGauge
    per active (non-terminal) execution status.

    - Add cloud_pipelines_backend/instrumentation/metrics_poller.py with
      PollingService, configure_logging(), and _ACTIVE_STATUSES
    - Remove execution_queue_depth, seed_queue_depth() from metrics.py; add
      execution_status_count ObservableGauge
    - Remove _seed_metrics() and +1/-1 queue depth bookkeeping from orchestrator_sql.py
    - Add metrics poller region to start_local.py with OTel guard
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 949e41c to c8256fc Compare March 24, 2026 01:16
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 365b4d3 to a3c0001 Compare March 24, 2026 01:16
self._poll_interval_seconds = poll_interval_seconds
self._lock = threading.Lock()
# Initialize all active statuses to 0
self._counts: dict[str, int] = {s.value: 0 for s in _ACTIVE_STATUSES}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: used in two places, might make sense to put into a function.

# Register our observe method as the gauge callback.
# The OTel SDK stores callbacks in _callbacks; we append after creation
# since create_observable_gauge is called at module load time in metrics.py.
app_metrics.execution_status_count._callbacks.append(self._observe)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're calling a private variable, is it possible to make a function set sets up execution_status_count with the callbacks in metric.py?

Reason for this is:

  • The private variable could change since this is not meant to be called, which will have issues if it does in the future.
  • Will there ever be a case you're appending more than one callback? If so, can you elaborate that use case. My thought is you would only ever need to "set" one callback.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see what we can do!

Comment on lines +237 to +239
run_configured_metrics_poller = lambda: run_metrics_poller(
db_engine=db_engine,
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to do

def run_configured_metrics_poller() -> None:
    run_metrics_poller(db_engine=db_engine)

Instead of lambda for variables. Reason why is 1) troubleshooting, if there was a stack trace it'll just say "lambda" instead of the actual function name and 2) cannot have type hinting/docstrings to have a strongly type function for type checking.

Also, if we had a linter, there are some that would complain about this.

Use cases for when using lambda are inline/throwaway like:

sorted(items, key=lambda x: x.created_at)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this is intentional to match existing code standards set by the orchestrator. It matches the orchestrator code exactly. I'm not sure I want to introduce an alternative format right now unless @Ark-kun would like make the switch.

I do like all your points for not using a lambda in this case.

Comment on lines +82 to +85
with self._lock:
counts = self._counts.copy()
for status_value, count in counts.items():
yield otel_metrics.Observation(count, {"execution.status": status_value})
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, when will observe be called? Will there ever be a race condition where observe is called before the poll happened/completed? Would it make sense to put a flag here that at last 1 poll completed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock will prevent it from copying counts that are currently being modified.

As for there being at least 1 iteration, i'll add a check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants