Skip to content

Commit ccde576

Browse files
committed
feat(instrumentation): replace queue depth counter with metrics polling service
Replace the UpDownCounter-based execution.queue.depth metric (which required DB seeding on startup and drifted on crashes) with a PollingService daemon thread that runs COUNT GROUP BY status every 30s and emits an ObservableGauge per active (non-terminal) execution status. - Add cloud_pipelines_backend/instrumentation/metrics_poller.py with PollingService, configure_logging(), and _ACTIVE_STATUSES - Remove execution_queue_depth, seed_queue_depth() from metrics.py; add execution_status_count ObservableGauge - Remove _seed_metrics() and +1/-1 queue depth bookkeeping from orchestrator_sql.py - Add metrics poller region to start_local.py with OTel guard
1 parent 949e41c commit ccde576

3 files changed

Lines changed: 149 additions & 4 deletions

File tree

cloud_pipelines_backend/instrumentation/metrics.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class MetricUnit(str, enum.Enum):
3232

3333
SECONDS = "s"
3434
ERRORS = "{error}"
35+
EXECUTIONS = "{execution}"
3536

3637

3738
# ---------------------------------------------------------------------------
@@ -51,6 +52,13 @@ class MetricUnit(str, enum.Enum):
5152
unit=MetricUnit.SECONDS,
5253
)
5354

55+
execution_status_count = orchestrator_meter.create_observable_gauge(
56+
name="execution.status.count",
57+
callbacks=[],
58+
description="Number of execution nodes in each active (non-terminal) status",
59+
unit=MetricUnit.EXECUTIONS,
60+
)
61+
5462

5563
def record_status_transition(
5664
from_status: str,
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""Metrics poller.
2+
3+
Periodically queries the DB and updates ObservableGauges. Currently emits
4+
execution status counts; add new DB-backed metrics here as needed.
5+
6+
Only fluctuating (non-terminal) statuses are emitted as status count gauges —
7+
terminal statuses like SUCCEEDED and FAILED only ever climb and are not useful
8+
as gauges.
9+
"""
10+
11+
import logging
12+
import os
13+
import threading
14+
import time
15+
import typing
16+
17+
import sqlalchemy as sql
18+
from opentelemetry import metrics as otel_metrics
19+
from sqlalchemy import orm
20+
21+
from cloud_pipelines_backend import backend_types_sql as bts
22+
from cloud_pipelines_backend.instrumentation import metrics as app_metrics
23+
from cloud_pipelines_backend.instrumentation import structured_logging
24+
25+
_logger = logging.getLogger(__name__)
26+
27+
28+
def configure_logging() -> logging.Logger:
29+
"""Configure logging for the metrics poller process.
30+
31+
Reads LOG_LEVEL from the environment (default: INFO). Sets up a stderr
32+
handler with the context-aware formatter used across other tangle services.
33+
Returns the __main__ logger for use in the entrypoint.
34+
"""
35+
log_level_name = os.environ.get("LOG_LEVEL", "INFO").upper()
36+
log_level = logging.getLevelNamesMapping().get(log_level_name, logging.INFO)
37+
38+
handler = logging.StreamHandler()
39+
handler.setLevel(log_level)
40+
handler.setFormatter(structured_logging.ContextAwareFormatter())
41+
handler.addFilter(structured_logging.LoggingContextFilter())
42+
43+
root = logging.getLogger()
44+
root.setLevel(log_level)
45+
root.handlers.clear()
46+
root.addHandler(handler)
47+
48+
return logging.getLogger("__main__")
49+
50+
51+
# All statuses minus terminal (ended) ones — these fluctuate up and down
52+
_ACTIVE_STATUSES: frozenset[bts.ContainerExecutionStatus] = (
53+
frozenset(bts.ContainerExecutionStatus) - bts.CONTAINER_STATUSES_ENDED
54+
)
55+
56+
57+
class PollingService:
58+
"""Polls the DB periodically and emits execution status count gauges."""
59+
60+
def __init__(
61+
self,
62+
*,
63+
session_factory: typing.Callable[[], orm.Session],
64+
poll_interval_seconds: float = 30.0,
65+
) -> None:
66+
self._session_factory = session_factory
67+
self._poll_interval_seconds = poll_interval_seconds
68+
self._lock = threading.Lock()
69+
# Initialize all active statuses to 0
70+
self._counts: dict[str, int] = {s.value: 0 for s in _ACTIVE_STATUSES}
71+
# Register our observe method as the gauge callback.
72+
# The OTel SDK stores callbacks in _callbacks; we append after creation
73+
# since create_observable_gauge is called at module load time in metrics.py.
74+
app_metrics.execution_status_count._callbacks.append(self._observe)
75+
76+
def run_loop(self) -> None:
77+
while True:
78+
try:
79+
self._poll()
80+
except Exception:
81+
_logger.exception("Metrics PollingService: error polling DB")
82+
time.sleep(self._poll_interval_seconds)
83+
84+
def _poll(self) -> None:
85+
with self._session_factory() as session:
86+
rows = session.execute(
87+
sql.select(
88+
bts.ExecutionNode.container_execution_status,
89+
sql.func.count().label("count"),
90+
)
91+
.where(
92+
bts.ExecutionNode.container_execution_status.in_(_ACTIVE_STATUSES)
93+
)
94+
.group_by(bts.ExecutionNode.container_execution_status)
95+
).all()
96+
new_counts = {s.value: 0 for s in _ACTIVE_STATUSES}
97+
for status, count in rows:
98+
if status is not None:
99+
new_counts[status.value] = count
100+
with self._lock:
101+
self._counts = new_counts
102+
_logger.debug(f"Metrics PollingService: polled status counts: {new_counts}")
103+
104+
def _observe(
105+
self, _options: otel_metrics.CallbackOptions
106+
) -> typing.Iterable[otel_metrics.Observation]:
107+
with self._lock:
108+
counts = self._counts.copy()
109+
for status_value, count in counts.items():
110+
yield otel_metrics.Observation(count, {"execution.status": status_value})

start_local.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,35 @@ def run_orchestrator(
211211
# endregion
212212

213213

214+
# region: Metrics poller initialization
215+
216+
from cloud_pipelines_backend.instrumentation import metrics_poller
217+
from cloud_pipelines_backend.instrumentation.opentelemetry._internal import (
218+
configuration as otel_configuration,
219+
)
220+
221+
222+
def run_metrics_poller(*, db_engine: sqlalchemy.Engine) -> None:
223+
otel_config = otel_configuration.resolve()
224+
if otel_config is None or otel_config.metrics is None:
225+
logger.info(
226+
f"No OTel metrics endpoint configured"
227+
f" (set {otel_configuration.EnvVar.METRIC_EXPORTER_ENDPOINT})"
228+
f" — metrics poller not starting"
229+
)
230+
return
231+
session_factory = orm.sessionmaker(
232+
autocommit=False, autoflush=False, bind=db_engine
233+
)
234+
metrics_poller.PollingService(session_factory=session_factory).run_loop()
235+
236+
237+
run_configured_metrics_poller = lambda: run_metrics_poller(
238+
db_engine=db_engine,
239+
)
240+
# endregion
241+
242+
214243
# region: API Server initialization
215244
import contextlib
216245
import threading
@@ -228,10 +257,8 @@ def run_orchestrator(
228257
@contextlib.asynccontextmanager
229258
async def lifespan(app: fastapi.FastAPI):
230259
database_ops.initialize_and_migrate_db(db_engine=db_engine)
231-
threading.Thread(
232-
target=run_configured_orchestrator,
233-
daemon=True,
234-
).start()
260+
threading.Thread(target=run_configured_orchestrator, daemon=True).start()
261+
threading.Thread(target=run_configured_metrics_poller, daemon=True).start()
235262
if os.environ.get("GOOGLE_CLOUD_SHELL") == "true":
236263
# TODO: Find a way to get fastapi/starlette/uvicorn port
237264
port = 8000

0 commit comments

Comments
 (0)