[flink] add OpenMetrics-based collection mode#23857
Conversation
|
8edf1ee to
7c27fb4
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f83faebe4b
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
0bd3e12 to
baea764
Compare
bb7fb25 to
1c64e2a
Compare
Adds an alternative collection mode that scrapes Flink's flink-metrics-prometheus reporter via OpenMetricsBaseCheckV2, complementing the existing Datadog HTTP Reporter push-based mode. Motivation: the existing push-based collection requires the Datadog API key to live inside Flink's configuration. On Kubernetes deployments managed by the Flink Operator, flink-conf.yaml is mounted from a read-only ConfigMap, which makes secret injection brittle and forces workarounds like the vals plugin in ArgoCD. With agent-side OpenMetrics scraping the API key lives with the Datadog Agent, which is the standard pattern for K8s deployments and fits cleanly with External Secrets Operator + Secret refs. Includes: - FlinkCheck inheriting from OpenMetricsBaseCheckV2 - METRIC_MAP covering the JVM and core jobmanager / taskmanager metrics (representative subset; full coverage to follow once the approach is agreed) - Updated configuration spec and conf.yaml.example - Unit tests with a fixture covering the mapped metrics - README updated to document both collection modes Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Regenerate conf.yaml.example via 'ddev validate config flink -s'
(drops two stray lines from the openmetrics_endpoint comment block)
- Add auto-generated config_models/ via 'ddev validate models flink -s'
- Replace non-ASCII em-dashes in README.md with ASCII hyphens
- Rename changelog fragment to use the PR number ('23857.added' instead
of 'openmetrics-collection.added') so check_pr.py can parse it
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Expand METRIC_MAP from a representative subset (~20) to the full 87 metrics declared in metadata.csv (script-generated) - Add tests/compose/docker-compose.yaml with jobmanager + taskmanager configured to expose Flink's flink-metrics-prometheus reporter, plus tests/test_e2e.py asserting core JVM and cluster metrics - Add hatch.toml so 'ddev test' picks up the integration - Set hostname_label + exclude_labels for 'host' in the check's default config: Flink's Prometheus reporter labels every series with 'host' to identify the source JM/TM, which collides with Datadog's reserved hostname tag. Promote it to the metric's hostname and drop it from the tag set. - Tweak unit-test fixtures/expectations to match (gauges instead of monotonic counts since Flink emits all counter-like metrics as Prometheus gauges) Verified locally: ddev validate all flink (passes), ddev test flink -m unit (2 passed), and docker compose up on the new compose file with /metrics returning Prometheus output as expected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Add file-level `# flake8: noqa: E501` to flink/metrics.py for the METRIC_MAP dict (mirrors the temporal integration's pattern). - Regenerate .codecov.yml and .github/workflows/test-all.yml via `ddev validate ci --sync` so the new flink target is registered.
Flink's Prometheus reporter emits the throughput meter as `numRecordsOutPerSecond` (full word), per the upstream metric reference: https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/metrics/. metadata.csv however documents the DD-side metric with the truncated suffix `PerSec` — a pre-existing asymmetry inherited from the push reporter. Spotted by Codex on the PR. Map the raw `_PerSecond` keys to the truncated DD-side `_PerSec` names for both `flink_task_numRecordsOutPerSecond` and `flink_operator_numRecordsOutPerSecond`. Extend the unit fixture and assertions to lock the mapping in so it can't drift again.
Flink's Prometheus reporter ignores `metrics.scope.task` and `metrics.scope.operator` overrides -- it uses Flink's internal logical scope names (`taskmanager_job_task`, `taskmanager_job_task_operator`) with a hardcoded `flink_` prefix. Verified by scraping a live docker-compose + StateMachineExample job: the old MAP keys `flink_task_*` and `flink_operator_*` never match, silently dropping ~35 metrics. - Rename all task-scope and operator-scope keys in METRIC_MAP to the long `flink_taskmanager_job_task[_operator]_*` form. DD-side names (right side of map) are unchanged so metadata.csv still applies. - Drop the `metrics.scope.*` overrides from the test compose -- they were no-ops for the Prometheus reporter. - Drop the equivalent block from the README OpenMetrics section and add a note explaining that the Prometheus reporter ignores those overrides. The legacy Datadog HTTP Reporter section keeps its scope-remap block, where it's still required. - Update the unit fixture to mirror the raw names Flink actually emits so the unit assertion remains a meaningful regression guard. Live-scrape re-validation: 0 unmatched keys for the workload covered; the 8 still-unmatched MAP keys (commitsFailed, currentInput2Watermark, etc.) are conditional metrics not emitted by the smoke job.
After rebasing onto master, the upstream removal of .codecov.yml (Datadog Code Coverage migration, PR DataDog#23360) means the flink entry that ddev validate ci --sync added to .codecov.yml is gone. The new code-coverage.datadog.yml needs the same registration; without it the CI validation reports "Code coverage config has 1 missing service".
The unit test in datadog_checks_dev/tests/tooling/test_utils.py uses 'flink' as a known-logs-only fixture to exercise the is_logs_only() helper. After this PR adds OpenMetrics-based metric collection (with init_config:/instances:), flink is no longer logs-only, so the test fails. Swap the fixture to 'cisco_asa', which remains a pure-logs integration on master.
1c64e2a to
b122e70
Compare
|
PR All shows flake noise on docker-compose/Azure-blob setup steps (also flaking on master); happy to retry if it'd help reviewers. |
What does this PR do?
Adds an OpenMetrics-based collection mode to the
flinkintegration. The new mode usesOpenMetricsBaseCheckV2to scrape Flink'sflink-metrics-prometheusreporter endpoint and emits the sameflink.*metric namespace as the existing Datadog HTTP Reporter push mode.The existing push-based mode (Flink's
DatadogHttpReporterFactory) is preserved and remains documented — this PR is purely additive.Motivation
We run Flink on Kubernetes via the Flink Operator. Today, the Datadog Agent's Flink integration assumes Flink pushes metrics directly via its native HTTP Reporter, which requires the Datadog API key to live inside
flink-conf.yaml. With the Flink Operator,flink-conf.yamlis mounted from a read-only ConfigMap, so injecting a Kubernetes Secret into it isn't viable — the standardFLINK_PROPERTIESmechanism crashes the pod because the entrypoint tries to append to the read-only file.We've been working around this with the
valsplugin in ArgoCD to template the API key in at deploy time, which is brittle and operationally heavy.An OpenMetrics-based collection mode solves this cleanly: the Agent already has the API key (standard k8s deployment pattern), and Flink only needs to enable its built-in Prometheus reporter — no secrets in Flink config, no entrypoint hacks, no
valsplugin.There's been parallel work on the Flink side to make the existing Datadog reporter accept
DD_API_KEYfrom the environment (see apache/flink#27432 and FLINK-33994), but that PR has been waiting for committer review for months. Even if it eventually lands, an Agent-side OpenMetrics integration is the better long-term shape — it matches how Datadog handles CoreDNS, Temporal, and other modern k8s-native integrations.Billing classification — confirmed by Datadog support
I raised this with Datadog support before opening the PR for review. The answer, paraphrased:
Two caveats from the answer, both addressed:
metadata.csv) are covered. I confirmed that every entry the newMETRIC_MAPproduces is already present inmetadata.csv— 87 mapped names, 87 documented names, with no extras on either side. So no metric emitted by the new mode would land as custom for an end user on a released agent.What's in this PR
datadog_checks/flink/check.py—FlinkCheckinheriting fromOpenMetricsBaseCheckV2, namespaceflink. Flink's Prometheus reporter labels every series with ahostlabel that collides with Datadog's reserved hostname tag, so the check promoteshostto the metric's hostname and excludes it from the tag set (hostname_label='host',exclude_labels=['host']).datadog_checks/flink/metrics.py—METRIC_MAPfrom raw Prometheus names (e.g.flink_jobmanager_Status_JVM_CPU_Load) to namespaced Datadog names (e.g.jobmanager.Status.JVM.CPU.Load). Covers all 87 metrics documented inmetadata.csv(JobManager, TaskManager, job, task, operator, and Netty shuffle metrics).datadog_checks/flink/config_models/— generated viaddev validate models flink --sync.datadog_checks/flink/data/conf.yaml.example— regenerated fromassets/configuration/spec.yamlviaddev validate config flink --sync.assets/configuration/spec.yaml— addsinit_config/openmetricsandinstances/openmetricstemplates.tests/— unit tests with a fixture covering the mapped metrics (tests/fixtures/metrics.txt), an E2E test, and acompose/docker-compose.yamlrunning the official Flink image with the Prometheus reporter enabled.README.md— new section documenting OpenMetrics collection, with a Kubernetes Autodiscovery example. The existing Datadog HTTP Reporter section is preserved.changelog.d/23857.added— changelog fragment.Long-term direction (out of scope for this PR)
Once the OpenMetrics mode has been live for a release or two, there's a case for marking the Datadog HTTP Reporter as legacy and eventually deprecating it. The OpenMetrics path is operationally cleaner — no Datadog API key in Flink config, no
metrics.scope.*remap required to avoid being billed as custom, native fit with the Agent's Kubernetes Autodiscovery — and it matches how integrations-core handles other modern OSS data systems (CoreDNS, Temporal). Happy to follow up with a separate PR if maintainers agree with the direction.Testing
The local compose pins
flink:1.18-scala_2.12-java17. I tried 1.19 first and the JobManager crashed duringconfig-parser-utils.shwhile parsing theFLINK_PROPERTIES-injected YAML — Flink 1.19's newconfig.yamlformat appears to collide with how the entrypoint writes properties. Happy to revisit which Flink version(s) the E2E test should pin if reviewers prefer 1.19 or 1.20.A note on the CI
This PR registers the new
flinktest target in.github/workflows/test-all.yml(viaddev validate ci --sync). That file is inpr-all.yml's trigger path filter, so the PR All workflow runs the entire integration test suite on every push. Most of the unrelated failures it surfaces (SAP HANA, WebLogic, IBM ACE, IBM MQ, n8n, SNMP, etc.) are flaky infra issues — DNS resolution to Azure blob storage, docker-compose start timeouts, JVM crashes — that intermittently fail onmastertoo. The Datadog auto-annotation marks most of them as "🔄 Retry job. This looks flaky."The checks that should actually be green and are owned by this PR:
Run Validations / Validatetest / j7f02532 / flink(lint, unit, integration, E2E)test / j77217e9 / Datadog Checks Dev on Linux(theis_logs_onlyfixture was updated to usecisco_asasince flink no longer qualifies)test / jd316aba / ddev on Linux(thecode-coverage.datadog.ymlregistration was added)Related links
Thanks for taking a look. 🙏