feat: Metrics to count all non-terminal execution statuses#186
feat: Metrics to count all non-terminal execution statuses#186morgan-wowk wants to merge 1 commit intotask-status-durationfrom
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
ccde576 to
cfdf72a
Compare
cfdf72a to
365b4d3
Compare
…ng service
Replace the UpDownCounter-based execution.queue.depth metric (which required
DB seeding on startup and drifted on crashes) with a PollingService daemon
thread that runs COUNT GROUP BY status every 30s and emits an ObservableGauge
per active (non-terminal) execution status.
- Add cloud_pipelines_backend/instrumentation/metrics_poller.py with
PollingService, configure_logging(), and _ACTIVE_STATUSES
- Remove execution_queue_depth, seed_queue_depth() from metrics.py; add
execution_status_count ObservableGauge
- Remove _seed_metrics() and +1/-1 queue depth bookkeeping from orchestrator_sql.py
- Add metrics poller region to start_local.py with OTel guard
949e41c to
c8256fc
Compare
365b4d3 to
a3c0001
Compare
| self._poll_interval_seconds = poll_interval_seconds | ||
| self._lock = threading.Lock() | ||
| # Initialize all active statuses to 0 | ||
| self._counts: dict[str, int] = {s.value: 0 for s in _ACTIVE_STATUSES} |
There was a problem hiding this comment.
Nit: used in two places, might make sense to put into a function.
| # Register our observe method as the gauge callback. | ||
| # The OTel SDK stores callbacks in _callbacks; we append after creation | ||
| # since create_observable_gauge is called at module load time in metrics.py. | ||
| app_metrics.execution_status_count._callbacks.append(self._observe) |
There was a problem hiding this comment.
You're calling a private variable, is it possible to make a function set sets up execution_status_count with the callbacks in metric.py?
Reason for this is:
- The private variable could change since this is not meant to be called, which will have issues if it does in the future.
- Will there ever be a case you're appending more than one callback? If so, can you elaborate that use case. My thought is you would only ever need to "set" one callback.
There was a problem hiding this comment.
I'll see what we can do!
| run_configured_metrics_poller = lambda: run_metrics_poller( | ||
| db_engine=db_engine, | ||
| ) |
There was a problem hiding this comment.
It's better to do
def run_configured_metrics_poller() -> None:
run_metrics_poller(db_engine=db_engine)
Instead of lambda for variables. Reason why is 1) troubleshooting, if there was a stack trace it'll just say "lambda" instead of the actual function name and 2) cannot have type hinting/docstrings to have a strongly type function for type checking.
Also, if we had a linter, there are some that would complain about this.
Use cases for when using lambda are inline/throwaway like:
sorted(items, key=lambda x: x.created_at)
There was a problem hiding this comment.
For now this is intentional to match existing code standards set by the orchestrator. It matches the orchestrator code exactly. I'm not sure I want to introduce an alternative format right now unless @Ark-kun would like make the switch.
I do like all your points for not using a lambda in this case.
| with self._lock: | ||
| counts = self._counts.copy() | ||
| for status_value, count in counts.items(): | ||
| yield otel_metrics.Observation(count, {"execution.status": status_value}) |
There was a problem hiding this comment.
Curious, when will observe be called? Will there ever be a race condition where observe is called before the poll happened/completed? Would it make sense to put a flag here that at last 1 poll completed?
There was a problem hiding this comment.
The lock will prevent it from copying counts that are currently being modified.
As for there being at least 1 iteration, i'll add a check.

TL;DR
Added a metrics polling service that periodically queries the database to emit execution status count gauges for monitoring active (non-terminal) pipeline executions.
What changed?
EXECUTIONSmetric unit toMetricUnitenumexecution_status_countobservable gauge to track execution node counts by statusPollingServiceclass in newmetrics_poller.pymodule that:How to test?
examples/observability/otel-jaeger-prometheus)execution.status.countgauge emits observations with correct counts for each statusWhy make this change?
This enables real-time monitoring of pipeline execution health by providing visibility into how many executions are currently in each active state, which is essential for operational observability.