Skip to content

[flink] add OpenMetrics-based collection mode#23857

Open
barryib wants to merge 10 commits into
DataDog:masterfrom
barryib:feature/flink-openmetrics-collection
Open

[flink] add OpenMetrics-based collection mode#23857
barryib wants to merge 10 commits into
DataDog:masterfrom
barryib:feature/flink-openmetrics-collection

Conversation

@barryib
Copy link
Copy Markdown

@barryib barryib commented May 28, 2026

What does this PR do?

Adds an OpenMetrics-based collection mode to the flink integration. The new mode uses OpenMetricsBaseCheckV2 to scrape Flink's flink-metrics-prometheus reporter endpoint and emits the same flink.* metric namespace as the existing Datadog HTTP Reporter push mode.

The existing push-based mode (Flink's DatadogHttpReporterFactory) is preserved and remains documented — this PR is purely additive.

Motivation

We run Flink on Kubernetes via the Flink Operator. Today, the Datadog Agent's Flink integration assumes Flink pushes metrics directly via its native HTTP Reporter, which requires the Datadog API key to live inside flink-conf.yaml. With the Flink Operator, flink-conf.yaml is mounted from a read-only ConfigMap, so injecting a Kubernetes Secret into it isn't viable — the standard FLINK_PROPERTIES mechanism crashes the pod because the entrypoint tries to append to the read-only file.

We've been working around this with the vals plugin in ArgoCD to template the API key in at deploy time, which is brittle and operationally heavy.

An OpenMetrics-based collection mode solves this cleanly: the Agent already has the API key (standard k8s deployment pattern), and Flink only needs to enable its built-in Prometheus reporter — no secrets in Flink config, no entrypoint hacks, no vals plugin.

There's been parallel work on the Flink side to make the existing Datadog reporter accept DD_API_KEY from the environment (see apache/flink#27432 and FLINK-33994), but that PR has been waiting for committer review for months. Even if it eventually lands, an Agent-side OpenMetrics integration is the better long-term shape — it matches how Datadog handles CoreDNS, Temporal, and other modern k8s-native integrations.

Billing classification — confirmed by Datadog support

I raised this with Datadog support before opening the PR for review. The answer, paraphrased:

The billing classification rule is simple: if a metric name is listed in Datadog's official integration documentation, it is an integration metric and does not count toward your custom metrics quota. The flink.* metrics are documented on the Flink integration page, so they qualify as integration metrics — the base class used internally (OpenMetricsBaseCheckV2) has no effect on billing.

Using the generic openmetrics check produces custom metrics for you because that check has no named integration behind it, so nothing it emits is matched against the documented integration metric list. Your PR solves this by running the scraping logic inside the official flink check, which is what the billing system looks at.

Two caveats from the answer, both addressed:

  1. Classification applies once the PR is merged and shipped in an official integration release. Metrics emitted by the new mode in a development/testing context will count as custom until then — flagging this so anyone testing the branch isn't surprised.
  2. Only metric names listed in the official documentation (backed by metadata.csv) are covered. I confirmed that every entry the new METRIC_MAP produces is already present in metadata.csv — 87 mapped names, 87 documented names, with no extras on either side. So no metric emitted by the new mode would land as custom for an end user on a released agent.

What's in this PR

  • datadog_checks/flink/check.pyFlinkCheck inheriting from OpenMetricsBaseCheckV2, namespace flink. Flink's Prometheus reporter labels every series with a host label that collides with Datadog's reserved hostname tag, so the check promotes host to the metric's hostname and excludes it from the tag set (hostname_label='host', exclude_labels=['host']).
  • datadog_checks/flink/metrics.pyMETRIC_MAP from raw Prometheus names (e.g. flink_jobmanager_Status_JVM_CPU_Load) to namespaced Datadog names (e.g. jobmanager.Status.JVM.CPU.Load). Covers all 87 metrics documented in metadata.csv (JobManager, TaskManager, job, task, operator, and Netty shuffle metrics).
  • datadog_checks/flink/config_models/ — generated via ddev validate models flink --sync.
  • datadog_checks/flink/data/conf.yaml.example — regenerated from assets/configuration/spec.yaml via ddev validate config flink --sync.
  • assets/configuration/spec.yaml — adds init_config/openmetrics and instances/openmetrics templates.
  • tests/ — unit tests with a fixture covering the mapped metrics (tests/fixtures/metrics.txt), an E2E test, and a compose/docker-compose.yaml running the official Flink image with the Prometheus reporter enabled.
  • README.md — new section documenting OpenMetrics collection, with a Kubernetes Autodiscovery example. The existing Datadog HTTP Reporter section is preserved.
  • changelog.d/23857.added — changelog fragment.

Long-term direction (out of scope for this PR)

Once the OpenMetrics mode has been live for a release or two, there's a case for marking the Datadog HTTP Reporter as legacy and eventually deprecating it. The OpenMetrics path is operationally cleaner — no Datadog API key in Flink config, no metrics.scope.* remap required to avoid being billed as custom, native fit with the Agent's Kubernetes Autodiscovery — and it matches how integrations-core handles other modern OSS data systems (CoreDNS, Temporal). Happy to follow up with a separate PR if maintainers agree with the direction.

Testing

ddev validate all flink           # all green
ddev test flink -- -m unit         # unit tests pass
ddev env test flink                # E2E test (requires a Datadog Agent)

The local compose pins flink:1.18-scala_2.12-java17. I tried 1.19 first and the JobManager crashed during config-parser-utils.sh while parsing the FLINK_PROPERTIES-injected YAML — Flink 1.19's new config.yaml format appears to collide with how the entrypoint writes properties. Happy to revisit which Flink version(s) the E2E test should pin if reviewers prefer 1.19 or 1.20.

A note on the CI

This PR registers the new flink test target in .github/workflows/test-all.yml (via ddev validate ci --sync). That file is in pr-all.yml's trigger path filter, so the PR All workflow runs the entire integration test suite on every push. Most of the unrelated failures it surfaces (SAP HANA, WebLogic, IBM ACE, IBM MQ, n8n, SNMP, etc.) are flaky infra issues — DNS resolution to Azure blob storage, docker-compose start timeouts, JVM crashes — that intermittently fail on master too. The Datadog auto-annotation marks most of them as "🔄 Retry job. This looks flaky."

The checks that should actually be green and are owned by this PR:

  • Run Validations / Validate
  • test / j7f02532 / flink (lint, unit, integration, E2E)
  • test / j77217e9 / Datadog Checks Dev on Linux (the is_logs_only fixture was updated to use cisco_asa since flink no longer qualifies)
  • test / jd316aba / ddev on Linux (the code-coverage.datadog.yml registration was added)

Related links

Thanks for taking a look. 🙏

@datadog-official
Copy link
Copy Markdown
Contributor

datadog-official Bot commented May 28, 2026

Pipelines

Fix all issues with BitsAI

⚠️ Warnings

🚦 5 Pipeline jobs failed

PR All | test / j46da136 / JBoss_WildFly   View in Datadog   GitHub Actions

🔄 Retry job. This looks flaky and may succeed on retry. Connection issue: Could not resolve 'ddintegrations.blob.core.windows.net:443'.

PR All | test / j5a9585a / IBM ACE   View in Datadog   GitHub Actions

🔄 Retry job. This looks flaky and may succeed on retry. Could not resolve host: ddintegrations.blob.core.windows.net

PR All | test / jf4d06ee / IBM MQ on Linux   View in Datadog   GitHub Actions

🔄 Retry job. This looks flaky and may succeed on retry. Connection failed: Could not resolve host 'ddintegrations.blob.core.windows.net'.

View all 5 failed jobs.

Useful? React with 👍 / 👎

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 4c002b8 | Docs | Datadog PR Page | Give us feedback!

@barryib barryib force-pushed the feature/flink-openmetrics-collection branch from 8edf1ee to 7c27fb4 Compare May 28, 2026 09:45
@barryib barryib marked this pull request as ready for review May 28, 2026 11:30
@barryib barryib requested review from a team as code owners May 28, 2026 11:30
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f83faebe4b

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread flink/datadog_checks/flink/metrics.py Outdated
@barryib barryib force-pushed the feature/flink-openmetrics-collection branch from 0bd3e12 to baea764 Compare May 28, 2026 12:55
@barryib barryib force-pushed the feature/flink-openmetrics-collection branch 2 times, most recently from bb7fb25 to 1c64e2a Compare May 28, 2026 16:21
barryib and others added 8 commits May 28, 2026 22:07
Adds an alternative collection mode that scrapes Flink's
flink-metrics-prometheus reporter via OpenMetricsBaseCheckV2,
complementing the existing Datadog HTTP Reporter push-based mode.

Motivation: the existing push-based collection requires the
Datadog API key to live inside Flink's configuration. On Kubernetes
deployments managed by the Flink Operator, flink-conf.yaml is
mounted from a read-only ConfigMap, which makes secret injection
brittle and forces workarounds like the vals plugin in ArgoCD.

With agent-side OpenMetrics scraping the API key lives with the
Datadog Agent, which is the standard pattern for K8s deployments
and fits cleanly with External Secrets Operator + Secret refs.

Includes:
- FlinkCheck inheriting from OpenMetricsBaseCheckV2
- METRIC_MAP covering the JVM and core jobmanager / taskmanager
  metrics (representative subset; full coverage to follow once
  the approach is agreed)
- Updated configuration spec and conf.yaml.example
- Unit tests with a fixture covering the mapped metrics
- README updated to document both collection modes

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Regenerate conf.yaml.example via 'ddev validate config flink -s'
  (drops two stray lines from the openmetrics_endpoint comment block)
- Add auto-generated config_models/ via 'ddev validate models flink -s'
- Replace non-ASCII em-dashes in README.md with ASCII hyphens
- Rename changelog fragment to use the PR number ('23857.added' instead
  of 'openmetrics-collection.added') so check_pr.py can parse it

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Expand METRIC_MAP from a representative subset (~20) to the full
  87 metrics declared in metadata.csv (script-generated)
- Add tests/compose/docker-compose.yaml with jobmanager + taskmanager
  configured to expose Flink's flink-metrics-prometheus reporter, plus
  tests/test_e2e.py asserting core JVM and cluster metrics
- Add hatch.toml so 'ddev test' picks up the integration
- Set hostname_label + exclude_labels for 'host' in the check's
  default config: Flink's Prometheus reporter labels every series
  with 'host' to identify the source JM/TM, which collides with
  Datadog's reserved hostname tag. Promote it to the metric's
  hostname and drop it from the tag set.
- Tweak unit-test fixtures/expectations to match (gauges instead
  of monotonic counts since Flink emits all counter-like metrics as
  Prometheus gauges)

Verified locally: ddev validate all flink (passes), ddev test flink -m
unit (2 passed), and docker compose up on the new compose file with
/metrics returning Prometheus output as expected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Add file-level `# flake8: noqa: E501` to flink/metrics.py for the
  METRIC_MAP dict (mirrors the temporal integration's pattern).
- Regenerate .codecov.yml and .github/workflows/test-all.yml via
  `ddev validate ci --sync` so the new flink target is registered.
Flink's Prometheus reporter emits the throughput meter as
`numRecordsOutPerSecond` (full word), per the upstream metric reference:
https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/metrics/.
metadata.csv however documents the DD-side metric with the truncated
suffix `PerSec` — a pre-existing asymmetry inherited from the push
reporter. Spotted by Codex on the PR.

Map the raw `_PerSecond` keys to the truncated DD-side `_PerSec` names
for both `flink_task_numRecordsOutPerSecond` and
`flink_operator_numRecordsOutPerSecond`. Extend the unit fixture and
assertions to lock the mapping in so it can't drift again.
Flink's Prometheus reporter ignores `metrics.scope.task` and
`metrics.scope.operator` overrides -- it uses Flink's internal logical
scope names (`taskmanager_job_task`, `taskmanager_job_task_operator`)
with a hardcoded `flink_` prefix. Verified by scraping a live
docker-compose + StateMachineExample job: the old MAP keys
`flink_task_*` and `flink_operator_*` never match, silently dropping
~35 metrics.

- Rename all task-scope and operator-scope keys in METRIC_MAP to the
  long `flink_taskmanager_job_task[_operator]_*` form. DD-side names
  (right side of map) are unchanged so metadata.csv still applies.
- Drop the `metrics.scope.*` overrides from the test compose -- they
  were no-ops for the Prometheus reporter.
- Drop the equivalent block from the README OpenMetrics section and
  add a note explaining that the Prometheus reporter ignores those
  overrides. The legacy Datadog HTTP Reporter section keeps its
  scope-remap block, where it's still required.
- Update the unit fixture to mirror the raw names Flink actually
  emits so the unit assertion remains a meaningful regression guard.

Live-scrape re-validation: 0 unmatched keys for the workload covered;
the 8 still-unmatched MAP keys (commitsFailed, currentInput2Watermark,
etc.) are conditional metrics not emitted by the smoke job.
After rebasing onto master, the upstream removal of .codecov.yml
(Datadog Code Coverage migration, PR DataDog#23360) means the flink entry
that ddev validate ci --sync added to .codecov.yml is gone. The new
code-coverage.datadog.yml needs the same registration; without it
the CI validation reports "Code coverage config has 1 missing service".
The unit test in datadog_checks_dev/tests/tooling/test_utils.py uses
'flink' as a known-logs-only fixture to exercise the is_logs_only()
helper. After this PR adds OpenMetrics-based metric collection (with
init_config:/instances:), flink is no longer logs-only, so the test
fails. Swap the fixture to 'cisco_asa', which remains a pure-logs
integration on master.
@barryib barryib force-pushed the feature/flink-openmetrics-collection branch from 1c64e2a to b122e70 Compare May 28, 2026 20:10
@barryib
Copy link
Copy Markdown
Author

barryib commented May 28, 2026

PR All shows flake noise on docker-compose/Azure-blob setup steps (also flaking on master); happy to retry if it'd help reviewers.

@barryib barryib changed the title [flink] add OpenMetrics-based collection mode (RFC + billing classification question) [flink] add OpenMetrics-based collection mode May 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants