From 3e085c6cbbee1926a139c0bc38773fa683499d9b Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 26 Jun 2026 10:18:29 +0200 Subject: [PATCH 1/9] chore(deps): Bump tokio --- Cargo.lock | 12 ++++++------ Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b0c74a..e515f51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1720,7 +1720,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.3", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -2989,7 +2989,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.38", - "socket2 0.6.3", + "socket2 0.5.10", "thiserror 2.0.18", "tokio", "tracing", @@ -3027,7 +3027,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.3", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] @@ -4242,9 +4242,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.52.1" +version = "1.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" +checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "bytes", "libc", @@ -5001,7 +5001,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a8d9e73..2d820ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ sqlx = { version = "0.8.2", features = [ "postgres", ] } strum = { version = "0.28", features = ["derive"] } -tokio = "1.39" +tokio = "1.52" tower = "0.5" tower-http = { version = "0.6", features = ["compression-full", "tracing"] } tracing = "0.1" From 0a32fb3f3e08ce759199cf2917aabec8963b9335 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 26 Jun 2026 11:46:02 +0200 Subject: [PATCH 2/9] chore(deps): Bump otel related crates --- Cargo.lock | 251 +++++++++++++++++------------------------------------ Cargo.toml | 14 +-- 2 files changed, 88 insertions(+), 177 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e515f51..0b02720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -107,7 +107,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -294,40 +294,13 @@ dependencies = [ "fs_extra", ] -[[package]] -name = "axum" -version = "0.7.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" -dependencies = [ - "async-trait", - "axum-core 0.4.5", - "bytes", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "itoa", - "matchit 0.7.3", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper 1.0.2", - "tower 0.5.3", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" dependencies = [ - "axum-core 0.5.6", + "axum-core", "bytes", "form_urlencoded", "futures-util", @@ -337,7 +310,7 @@ dependencies = [ "hyper 1.9.0", "hyper-util", "itoa", - "matchit 0.8.4", + "matchit", "memchr", "mime", "percent-encoding", @@ -348,32 +321,12 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tower 0.5.3", + "tower", "tower-layer", "tower-service", "tracing", ] -[[package]] -name = "axum-core" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "mime", - "pin-project-lite", - "rustversion", - "sync_wrapper 1.0.2", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.5.6" @@ -399,8 +352,8 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be44683b41ccb9ab2d23a5230015c9c3c55be97a25e4428366de8873103f7970" dependencies = [ - "axum 0.8.9", - "axum-core 0.5.6", + "axum", + "axum-core", "bytes", "futures-core", "futures-util", @@ -1074,7 +1027,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1371,7 +1324,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.14.0", + "indexmap", "slab", "tokio", "tokio-util", @@ -1390,19 +1343,13 @@ dependencies = [ "futures-core", "futures-sink", "http 1.4.0", - "indexmap 2.14.0", + "indexmap", "slab", "tokio", "tokio-util", "tracing", ] -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.15.5" @@ -1720,7 +1667,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.3", "tokio", "tower-service", "tracing", @@ -1859,16 +1806,6 @@ dependencies = [ "icu_properties", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.14.0" @@ -2137,7 +2074,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-util", - "tower 0.5.3", + "tower", "tower-http", "tracing", ] @@ -2284,12 +2221,6 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - [[package]] name = "matchit" version = "0.8.4" @@ -2345,7 +2276,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -2446,92 +2377,88 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "opentelemetry" -version = "0.24.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" dependencies = [ "futures-core", "futures-sink", "js-sys", - "once_cell", "pin-project-lite", - "thiserror 1.0.69", + "thiserror 2.0.18", + "tracing", ] [[package]] name = "opentelemetry-http" -version = "0.13.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad31e9de44ee3538fb9d64fe3376c1362f406162434609e79aea2a41a0af78ab" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" dependencies = [ "async-trait", "bytes", "http 1.4.0", "opentelemetry", + "reqwest 0.13.2", ] [[package]] name = "opentelemetry-otlp" -version = "0.17.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" dependencies = [ - "async-trait", - "futures-core", "http 1.4.0", "opentelemetry", + "opentelemetry-http", "opentelemetry-proto", "opentelemetry_sdk", "prost", + "reqwest 0.13.2", "serde", "serde_json", - "thiserror 1.0.69", - "tokio", + "thiserror 2.0.18", "tonic", ] [[package]] name = "opentelemetry-prometheus" -version = "0.17.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc4191ce34aa274621861a7a9d68dbcf618d5b6c66b10081631b61fd81fbc015" +checksum = "2c0359983e7f79cf33c9abd89e5d7ddf67c46c419d0148598022d70e70c01aba" dependencies = [ "once_cell", "opentelemetry", "opentelemetry_sdk", "prometheus", - "protobuf", + "tracing", ] [[package]] name = "opentelemetry-proto" -version = "0.7.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", - "tonic", ] [[package]] name = "opentelemetry_sdk" -version = "0.24.1" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692eac490ec80f24a17828d49b40b60f5aeaccdfe6a503f939713afd22bc28df" +checksum = "9b59f80e1ac4d5ff7a2db8fb6c80badb7f0f3f858211fba08dd9aaec750894f9" dependencies = [ - "async-trait", "futures-channel", "futures-executor", "futures-util", - "glob", - "once_cell", "opentelemetry", "percent-encoding", - "rand 0.8.6", - "serde_json", - "thiserror 1.0.69", + "portable-atomic", + "rand 0.9.4", + "thiserror 2.0.18", "tokio", "tokio-stream", ] @@ -2823,9 +2750,9 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.13.4" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" dependencies = [ "cfg-if", "fnv", @@ -2833,14 +2760,14 @@ dependencies = [ "memchr", "parking_lot", "protobuf", - "thiserror 1.0.69", + "thiserror 2.0.18", ] [[package]] name = "prost" -version = "0.13.5" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "528ac67416ff8646872a3c02cad9cc4ee5dc9f9540c9b10771855c95cb2e5ae1" dependencies = [ "bytes", "prost-derive", @@ -2848,9 +2775,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", "itertools 0.14.0", @@ -2861,9 +2788,23 @@ dependencies = [ [[package]] name = "protobuf" -version = "2.28.0" +version = "3.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] [[package]] name = "prusto" @@ -2989,7 +2930,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.38", - "socket2 0.5.10", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -3027,7 +2968,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.3", "tracing", "windows-sys 0.60.2", ] @@ -3266,7 +3207,9 @@ dependencies = [ "bytes", "cookie", "cookie_store", + "futures-channel", "futures-core", + "futures-util", "http 1.4.0", "http-body 1.0.1", "http-body-util", @@ -3286,7 +3229,7 @@ dependencies = [ "sync_wrapper 1.0.2", "tokio", "tokio-rustls 0.26.4", - "tower 0.5.3", + "tower", "tower-http", "tower-service", "url", @@ -3450,7 +3393,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3647,7 +3590,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.14.0", + "indexmap", "itoa", "ryu", "serde", @@ -3782,7 +3725,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -3836,7 +3779,7 @@ dependencies = [ "futures-util", "hashbrown 0.15.5", "hashlink", - "indexmap 2.14.0", + "indexmap", "log", "memchr", "once_cell", @@ -4328,7 +4271,7 @@ version = "0.25.11+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b59c4d22ed448339746c59b905d24568fcbb3ab65a500494f7b8c3e97739f2b" dependencies = [ - "indexmap 2.14.0", + "indexmap", "toml_datetime", "toml_parser", "winnow", @@ -4345,50 +4288,20 @@ dependencies = [ [[package]] name = "tonic" -version = "0.12.3" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef" dependencies = [ - "async-stream", - "async-trait", - "axum 0.7.9", "base64 0.22.1", "bytes", "flate2", - "h2 0.4.13", "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper 1.9.0", - "hyper-timeout", - "hyper-util", "percent-encoding", "pin-project", - "prost", - "socket2 0.5.10", - "tokio", + "sync_wrapper 1.0.2", "tokio-stream", - "tower 0.4.13", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "indexmap 1.9.3", - "pin-project", - "pin-project-lite", - "rand 0.8.6", - "slab", - "tokio", - "tokio-util", "tower-layer", "tower-service", "tracing", @@ -4431,7 +4344,7 @@ dependencies = [ "pin-project-lite", "tokio", "tokio-util", - "tower 0.5.3", + "tower", "tower-layer", "tower-service", "tracing", @@ -4495,14 +4408,12 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.25.0" +version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +checksum = "adbc64cba7137545b8044cb1fe9814f7aacf3c6b5f9b45be8bb5db538befdb26" dependencies = [ "js-sys", - "once_cell", "opentelemetry", - "opentelemetry_sdk", "smallvec", "tracing", "tracing-core", @@ -4545,7 +4456,7 @@ name = "trino-lb" version = "0.6.0" dependencies = [ "askama", - "axum 0.8.9", + "axum", "axum-extra", "axum-server", "chrono", @@ -4579,7 +4490,7 @@ dependencies = [ "snafu", "strum", "tokio", - "tower 0.5.3", + "tower", "tower-http", "tracing", "tracing-opentelemetry", @@ -4915,7 +4826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap 2.14.0", + "indexmap", "wasm-encoder", "wasmparser", ] @@ -4928,7 +4839,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags 2.11.1", "hashbrown 0.15.5", - "indexmap 2.14.0", + "indexmap", "semver", ] @@ -5398,7 +5309,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck", - "indexmap 2.14.0", + "indexmap", "prettyplease", "syn", "wasm-metadata", @@ -5429,7 +5340,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", "bitflags 2.11.1", - "indexmap 2.14.0", + "indexmap", "log", "serde", "serde_derive", @@ -5448,7 +5359,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap 2.14.0", + "indexmap", "log", "semver", "serde", diff --git a/Cargo.toml b/Cargo.toml index 2d820ac..99b5f63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,12 +40,12 @@ kube = { version = "3.0", default-features = false, features = [ main_error = "0.1" num_cpus = "1.16" number_prefix = "0.4" -opentelemetry = "0.24" -opentelemetry_sdk = { version = "0.24", features = ["rt-tokio"] } -opentelemetry-http = "0.13" -opentelemetry-otlp = { version = "0.17", features = ["serialize", "gzip-tonic"] } -opentelemetry-prometheus = "0.17" -prometheus = "0.13" +opentelemetry = "0.32" +opentelemetry_sdk = { version = "0.32", features = ["rt-tokio"] } +opentelemetry-http = "0.32" +opentelemetry-otlp = { version = "0.32", features = ["serialize", "gzip-tonic"] } +opentelemetry-prometheus = "0.32" +prometheus = "0.14" prusto = "0.5" pyo3 = { version = "0.28", features = ["auto-initialize"] } rand = "0.10" @@ -84,7 +84,7 @@ tokio = "1.52" tower = "0.5" tower-http = { version = "0.6", features = ["compression-full", "tracing"] } tracing = "0.1" -tracing-opentelemetry = "0.25" +tracing-opentelemetry = "0.33" tracing-subscriber = { version = "0.3", features = ["env-filter"] } trait-variant = "0.1" url = { version = "2.5", features = ["serde"] } From 7a4e42fd19c6e8ad8fdde4d8e4d564a19d9d4845 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 26 Jun 2026 11:47:30 +0200 Subject: [PATCH 3/9] refactor(metrics): Use the new metrics APIs --- trino-lb/src/metrics.rs | 173 +++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 99 deletions(-) diff --git a/trino-lb/src/metrics.rs b/trino-lb/src/metrics.rs index a711755..2f1b145 100644 --- a/trino-lb/src/metrics.rs +++ b/trino-lb/src/metrics.rs @@ -7,10 +7,9 @@ use std::{ use futures::future::try_join_all; use opentelemetry::{ KeyValue, - metrics::{Counter, Histogram, MetricsError}, + metrics::{Counter, Histogram}, }; use prometheus::Registry; -use snafu::{ResultExt, Snafu}; use tokio::{ runtime::Builder, sync::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -25,12 +24,6 @@ use trino_lb_persistence::{Persistence, PersistenceImplementation}; use crate::trino_client::ClusterInfo; -#[derive(Snafu, Debug)] -pub enum Error { - #[snafu(display("Failed to register metrics callback"))] - RegisterMetricsCallback { source: MetricsError }, -} - pub struct Metrics { pub registry: Registry, pub http_counter: Counter, @@ -46,79 +39,61 @@ impl Metrics { registry: Registry, persistence: Arc, config: &Config, - ) -> Result { + ) -> Self { let meter = opentelemetry::global::meter("trino-lb"); let http_counter = meter .u64_counter("http_requests_total") .with_unit("requests") .with_description("Total number of HTTP requests made.") - .init(); + .build(); let queued_time = meter .u64_histogram("query_queued_duration") .with_unit("ms") .with_description("The time queries where queued in trino-lb") - .init(); + .build(); let cluster_infos = Arc::new(RwLock::new(HashMap::::new())); - let cluster_counts_per_state_metric = meter - .u64_observable_gauge("cluster_counts_per_state") - .with_unit("clusters") - .with_description("The number of active or inactive clusters for each cluster group") - .init(); - - let cluster_queries_metric = meter + // As of opentelemetry 0.32 the callback is attached to the instrument builder via + // `with_callback` instead of the removed `Meter::register_callback`. The built instrument + // handle is not needed afterwards: the callback is registered with the SDK on `build()`. + let cluster_infos_for_callback = Arc::clone(&cluster_infos); + let _cluster_queries_metric = meter .u64_observable_gauge("cluster_queries") .with_unit("queries") .with_description( "The number of running, queued or blocked queries on a specific Trino cluster", ) - .init(); - - let queued_queries_metric = meter - .u64_observable_gauge("queued_queries") - .with_unit("queries") - .with_description("The number of queries queued across all trino-lb instances") - .init(); - - let cluster_infos_for_callback = Arc::clone(&cluster_infos); - meter - .register_callback(&[cluster_queries_metric.as_any()], move |observer| { + .with_callback(move |observer| { if let Ok(cluster_query_counters) = cluster_infos_for_callback.read() { for (cluster, counter) in cluster_query_counters.deref() { - observer.observe_u64( - &cluster_queries_metric, + observer.observe( counter.running_queries, - [ + &[ KeyValue::new("cluster", cluster.to_string()), KeyValue::new("state", "running"), - ] - .as_ref(), + ], ); - observer.observe_u64( - &cluster_queries_metric, + observer.observe( counter.queued_queries, - [ + &[ KeyValue::new("cluster", cluster.to_string()), KeyValue::new("state", "queued"), - ] - .as_ref(), + ], ); - observer.observe_u64( - &cluster_queries_metric, + observer.observe( counter.blocked_queries, - [ + &[ KeyValue::new("cluster", cluster.to_string()), KeyValue::new("state", "blocked"), - ] - .as_ref(), + ], ); } } }) - .context(RegisterMetricsCallbackSnafu)?; + .build(); // All of this mess can be removed once https://github.com/open-telemetry/opentelemetry-rust/issues/1376 is supported. let (ping_sender, ping_receiver) = tokio::sync::mpsc::unbounded_channel::<()>(); @@ -140,8 +115,11 @@ impl Metrics { )) }); - meter - .register_callback(&[queued_queries_metric.as_any()], move |observer| { + let _queued_queries_metric = meter + .u64_observable_gauge("queued_queries") + .with_unit("queries") + .with_description("The number of queries queued across all trino-lb instances") + .with_callback(move |observer| { if let Err(err) = ping_sender.send(()) { error!(error = ?err, "Failed to send ping for queued_queries metric"); return; @@ -172,15 +150,14 @@ impl Metrics { if let Some(queued_queries) = queued_queries { for (cluster_group, queued) in queued_queries { - observer.observe_u64( - &queued_queries_metric, + observer.observe( queued, - [KeyValue::new("cluster-group", cluster_group)].as_ref(), + &[KeyValue::new("cluster-group", cluster_group)], ); } } }) - .context(RegisterMetricsCallbackSnafu)?; + .build(); // All of this mess can be removed once https://github.com/open-telemetry/opentelemetry-rust/issues/1376 is supported. let (ping_sender, ping_receiver) = tokio::sync::mpsc::unbounded_channel::<()>(); @@ -202,63 +179,61 @@ impl Metrics { )) }); - meter - .register_callback( - &[cluster_counts_per_state_metric.as_any()], - move |observer| { - if let Err(err) = ping_sender.send(()) { - error!(error = ?err, "Failed to send ping for cluster_counts_per_state metric"); - return; - } - let cluster_counts = std::thread::scope(|s| { - s.spawn(|| { - let mut receiver = match metrics_receiver.write() { - Ok(r) => r, - Err(err) => { - error!(error = %err, "Failed to acquire write lock for cluster_counts_per_state metric"); - return None; - } - }; - match receiver.blocking_recv() { - Some(v) => Some(v), - None => { - error!("cluster_counts_per_state metrics channel closed"); - None - } + let _cluster_counts_per_state_metric = meter + .u64_observable_gauge("cluster_counts_per_state") + .with_unit("clusters") + .with_description("The number of active or inactive clusters for each cluster group") + .with_callback(move |observer| { + if let Err(err) = ping_sender.send(()) { + error!(error = ?err, "Failed to send ping for cluster_counts_per_state metric"); + return; + } + let cluster_counts = std::thread::scope(|s| { + s.spawn(|| { + let mut receiver = match metrics_receiver.write() { + Ok(r) => r, + Err(err) => { + error!(error = %err, "Failed to acquire write lock for cluster_counts_per_state metric"); + return None; } - }) - .join() - .unwrap_or_else(|err| { - error!(error = ?err, "cluster_counts_per_state metrics thread panicked"); - None - }) - }); - - if let Some(cluster_counts) = cluster_counts { - for (cluster_group, counts) in cluster_counts { - for (state, count) in counts { - observer.observe_u64( - &cluster_counts_per_state_metric, - count, - [ - KeyValue::new("cluster-group", cluster_group.clone()), - KeyValue::new::<_, &str>("state", state.into()), - ] - .as_ref(), - ); + }; + match receiver.blocking_recv() { + Some(v) => Some(v), + None => { + error!("cluster_counts_per_state metrics channel closed"); + None } } + }) + .join() + .unwrap_or_else(|err| { + error!(error = ?err, "cluster_counts_per_state metrics thread panicked"); + None + }) + }); + + if let Some(cluster_counts) = cluster_counts { + for (cluster_group, counts) in cluster_counts { + for (state, count) in counts { + observer.observe( + count, + &[ + KeyValue::new("cluster-group", cluster_group.clone()), + KeyValue::new::<_, &str>("state", state.into()), + ], + ); + } } - }, - ) - .context(RegisterMetricsCallbackSnafu)?; + } + }) + .build(); - Ok(Self { + Self { registry, http_counter, queued_time, cluster_infos, - }) + } } } From 585766390749e30e54f30545a47c3578501d17f2 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 26 Jun 2026 13:36:33 +0200 Subject: [PATCH 4/9] refactor(telemetry): Use the new telemetry APIs --- Cargo.lock | 50 ++++++++++-- Cargo.toml | 15 +++- trino-lb/src/main.rs | 24 +++--- trino-lb/src/tracing.rs | 168 ++++++++++++++++++++++------------------ 4 files changed, 163 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b02720..8c64209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2399,7 +2399,6 @@ dependencies = [ "bytes", "http 1.4.0", "opentelemetry", - "reqwest 0.13.2", ] [[package]] @@ -2410,15 +2409,15 @@ checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" dependencies = [ "http 1.4.0", "opentelemetry", - "opentelemetry-http", "opentelemetry-proto", "opentelemetry_sdk", "prost", - "reqwest 0.13.2", "serde", "serde_json", "thiserror 2.0.18", + "tokio", "tonic", + "tonic-types", ] [[package]] @@ -2443,6 +2442,8 @@ dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", + "tonic", + "tonic-prost", ] [[package]] @@ -2459,8 +2460,6 @@ dependencies = [ "portable-atomic", "rand 0.9.4", "thiserror 2.0.18", - "tokio", - "tokio-stream", ] [[package]] @@ -2786,6 +2785,15 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-types" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f94967dc7688f3054c7fac87473ffae4cc4c3904800e2d9f5b857246d8963b0a" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "3.7.2" @@ -3207,9 +3215,7 @@ dependencies = [ "bytes", "cookie", "cookie_store", - "futures-channel", "futures-core", - "futures-util", "http 1.4.0", "http-body 1.0.1", "http-body-util", @@ -4292,21 +4298,49 @@ version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef" dependencies = [ + "async-trait", "base64 0.22.1", "bytes", "flate2", "http 1.4.0", "http-body 1.0.1", "http-body-util", + "hyper 1.9.0", + "hyper-timeout", + "hyper-util", "percent-encoding", "pin-project", "sync_wrapper 1.0.2", + "tokio", "tokio-stream", + "tower", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "tonic-prost" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50849f68853be452acf590cde0b146665b8d507b3b8af17261df47e02c209ea0" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-types" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ab1b02061f83d519bba3caa167f88f261ef05720ab8ebc954ade70de3348e8" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "tower" version = "0.5.3" @@ -4315,7 +4349,9 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project-lite", + "slab", "sync_wrapper 1.0.2", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 99b5f63..28801be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,9 +41,20 @@ main_error = "0.1" num_cpus = "1.16" number_prefix = "0.4" opentelemetry = "0.32" -opentelemetry_sdk = { version = "0.32", features = ["rt-tokio"] } +opentelemetry_sdk = "0.32" opentelemetry-http = "0.32" -opentelemetry-otlp = { version = "0.32", features = ["serialize", "gzip-tonic"] } +# `default-features = false` because the 0.32 defaults switched from tonic/gRPC to an http-proto +# (reqwest) exporter we don't use; we re-list only the tonic-based features we need. +opentelemetry-otlp = { version = "0.32", default-features = false, features = [ + "grpc-tonic", + "gzip-tonic", + "serialize", + "trace", + "metrics", + # On by default in 0.32; re-listed because `default-features = false` would otherwise drop it. + # Lets the exporter report its own failures (e.g. collector unreachable) via `tracing`. + "internal-logs", +] } opentelemetry-prometheus = "0.32" prometheus = "0.14" prusto = "0.5" diff --git a/trino-lb/src/main.rs b/trino-lb/src/main.rs index 4722183..b02c35f 100644 --- a/trino-lb/src/main.rs +++ b/trino-lb/src/main.rs @@ -7,7 +7,6 @@ use maintenance::{ leftover_queries::LeftoverQueryDetector, query_count_fetcher, query_count_fetcher::QueryCountFetcher, }; -use opentelemetry::global::shutdown_tracer_provider; use routing::Router; use scaling::Scaler; use snafu::{ResultExt, Snafu}; @@ -146,14 +145,13 @@ async fn start() -> Result<(), MainError> { } }); - let metrics = Arc::new( - tracing::init( - config.trino_lb.tracing.as_ref(), - Arc::clone(&persistence), - &config, - ) - .context(SetUpTracingSnafu)?, - ); + let (metrics, tracer_provider) = tracing::init( + config.trino_lb.tracing.as_ref(), + Arc::clone(&persistence), + &config, + ) + .context(SetUpTracingSnafu)?; + let metrics = Arc::new(metrics); let cluster_group_manager = ClusterGroupManager::new( Arc::clone(&persistence), @@ -191,7 +189,13 @@ async fn start() -> Result<(), MainError> { .await .context(StartHttpServerSnafu)?; - shutdown_tracer_provider(); + // The global `shutdown_tracer_provider` helper was removed in opentelemetry 0.32; shut down via + // the provider handle returned by `tracing::init` instead so pending spans are flushed. + if let Some(tracer_provider) = tracer_provider + && let Err(error) = tracer_provider.shutdown() + { + ::tracing::warn!(%error, "Failed to shut down the OpenTelemetry tracer provider"); + } Ok(()) } diff --git a/trino-lb/src/tracing.rs b/trino-lb/src/tracing.rs index 31d4a56..c02cc19 100644 --- a/trino-lb/src/tracing.rs +++ b/trino-lb/src/tracing.rs @@ -1,20 +1,13 @@ use std::{sync::Arc, time::Duration}; -use opentelemetry::{ - Context, KeyValue, global, - metrics::MetricsError, - trace::{TraceError, TracerProvider}, -}; +use opentelemetry::{Context, global, trace::TracerProvider}; use opentelemetry_http::HeaderInjector; -use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig}; +use opentelemetry_otlp::{MetricExporter, SpanExporter, WithExportConfig, WithTonicConfig}; use opentelemetry_sdk::{ Resource, - metrics::{ - Aggregation, Instrument, SdkMeterProvider, Stream, - reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, - }, + metrics::{Aggregation, Instrument, PeriodicReader, SdkMeterProvider, Stream, Temporality}, propagation::TraceContextPropagator, - trace::{self, RandomIdGenerator, Sampler}, + trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, }; use snafu::{ResultExt, Snafu}; use tracing::{level_filters::LevelFilter, subscriber::SetGlobalDefaultError}; @@ -22,50 +15,62 @@ use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt}; use trino_lb_core::config::{Config, TrinoLbTracingConfig}; use trino_lb_persistence::PersistenceImplementation; -use crate::metrics::{self, Metrics}; +use crate::metrics::Metrics; #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("Failed to install tokio batch runtime"))] - InstallTokioBatchRuntime { source: TraceError }, + #[snafu(display("Failed to build the OTLP span exporter"))] + BuildSpanExporter { + source: opentelemetry_otlp::ExporterBuildError, + }, - #[snafu(display("Failed to create metrics pipeline"))] - CreateMetricsPipeline { source: MetricsError }, + #[snafu(display("Failed to build the OTLP metric exporter"))] + BuildMetricExporter { + source: opentelemetry_otlp::ExporterBuildError, + }, #[snafu(display("Failed to create OpenTelemetry Prometheus exporter"))] - CreateOpenTelemetryPrometheusExporter { source: MetricsError }, + CreateOpenTelemetryPrometheusExporter { + source: opentelemetry_sdk::error::OTelSdkError, + }, #[snafu(display("Failed to set global tracing subscriber"))] SetGlobalTracingSubscriber { source: SetGlobalDefaultError }, - - #[snafu(display("Failed to set up metrics"))] - SetUpMetrics { source: metrics::Error }, } +/// Sets up tracing and metrics. +/// +/// Returns the [`Metrics`] handle and, if OTLP tracing is enabled, the [`SdkTracerProvider`] so the +/// caller can flush and shut it down on exit (the global `shutdown_tracer_provider` helper was +/// removed in opentelemetry 0.32). pub fn init( tracing_config: Option<&TrinoLbTracingConfig>, persistence: Arc, config: &Config, -) -> Result { +) -> Result<(Metrics, Option), Error> { let env_filter_layer = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); let console_output_layer = tracing_subscriber::fmt::layer().with_filter(env_filter_layer); let mut layers = vec![console_output_layer.boxed()]; + let mut tracer_provider = None; if let Some(tracing_config) = tracing_config && tracing_config.enabled { let env_filter_layer = EnvFilter::builder() .with_default_directive(LevelFilter::DEBUG.into()) .from_env_lossy(); + + let provider = otel_tracer_provider(tracing_config)?; layers.push( tracing_opentelemetry::layer() .with_error_records_to_exceptions(true) - .with_tracer(otel_tracer(tracing_config)?) + .with_tracer(provider.tracer("trino-lb")) .with_filter(env_filter_layer) .boxed(), ); + tracer_provider = Some(provider); } let registry = prometheus::Registry::new(); @@ -85,84 +90,97 @@ pub fn init( opentelemetry::global::set_meter_provider(meter_provider); opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - let metrics = Metrics::new(registry, persistence, config).context(SetUpMetricsSnafu)?; + let metrics = Metrics::new(registry, persistence, config); - Ok(metrics) + Ok((metrics, tracer_provider)) } -fn otel_tracer(tracing_config: &TrinoLbTracingConfig) -> Result { - let provider = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(exporter(tracing_config)) - .with_trace_config( - trace::Config::default() - .with_sampler(Sampler::AlwaysOn) - .with_id_generator(RandomIdGenerator::default()) - .with_max_attributes_per_span(16) - .with_max_events_per_span(16) - .with_resource(Resource::new(vec![KeyValue::new( - "service.name", - "trino-lb", - )])), - ) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .context(InstallTokioBatchRuntimeSnafu)?; +fn otel_tracer_provider(tracing_config: &TrinoLbTracingConfig) -> Result { + let exporter = configure_exporter(SpanExporter::builder().with_tonic(), tracing_config) + .build() + .context(BuildSpanExporterSnafu)?; + + // The batch span processor manages its own background thread as of opentelemetry_sdk 0.32, so + // there is no longer an explicit runtime (previously `install_batch(runtime::Tokio)`). The span + // limits and sampler that used to live on `trace::Config` are now set directly on the builder. + let provider = SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_sampler(Sampler::AlwaysOn) + .with_id_generator(RandomIdGenerator::default()) + .with_max_attributes_per_span(16) + .with_max_events_per_span(16) + .with_resource(Resource::builder().with_service_name("trino-lb").build()) + .build(); global::set_tracer_provider(provider.clone()); - Ok(provider.tracer("trino-lb")) + Ok(provider) } +/// Currently unused, see the TODO in [`init`] about shipping Prometheus and OTLP metrics at the +/// same time. Kept (and migrated to the opentelemetry 0.32 API) as a reference for the OTLP metrics +/// push path. fn _otel_meter(tracing_config: &TrinoLbTracingConfig) -> Result { - opentelemetry_otlp::new_pipeline() - .metrics(opentelemetry_sdk::runtime::Tokio) - .with_exporter(exporter(tracing_config)) - .with_resource(Resource::new(vec![KeyValue::new( - "service.name", - "trino-lb", - )])) - .with_period(Duration::from_secs(3)) + let exporter = configure_exporter(MetricExporter::builder().with_tonic(), tracing_config) + // `DefaultAggregationSelector`/`DefaultTemporalitySelector` were removed in 0.32. The + // default temporality (cumulative) matches the previous `DefaultTemporalitySelector`, and + // aggregation is now picked by the SDK/views instead of an exporter-level selector. + .with_temporality(Temporality::default()) .with_timeout(Duration::from_secs(10)) - .with_aggregation_selector(DefaultAggregationSelector::new()) - .with_temporality_selector(DefaultTemporalitySelector::new()) .build() - .context(CreateMetricsPipelineSnafu) + .context(BuildMetricExporterSnafu)?; + + let reader = PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(3)) + .build(); + + Ok(SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(Resource::builder().with_service_name("trino-lb").build()) + .build()) } -fn exporter(tracing_config: &TrinoLbTracingConfig) -> TonicExporterBuilder { - let mut exporter = opentelemetry_otlp::new_exporter().tonic(); +/// Applies the configured endpoint, protocol and compression to an OTLP exporter builder. +/// +/// Works for both the span and metric tonic exporter builders, which both implement +/// [`WithExportConfig`] and [`WithTonicConfig`]. +fn configure_exporter( + mut builder: B, + tracing_config: &TrinoLbTracingConfig, +) -> B { if let Some(endpoint) = &tracing_config.otlp_endpoint { - exporter = exporter.with_endpoint(endpoint.as_str()); + builder = builder.with_endpoint(endpoint.as_str()); } if let Some(protocol) = tracing_config.otlp_protocol { - exporter = exporter.with_protocol(protocol); + builder = builder.with_protocol(protocol); } if let Some(compression) = tracing_config.otlp_compression { - exporter = exporter.with_compression(compression); + builder = builder.with_compression(compression); } // In case endpoint and protocol are not set here, they will still be read from the env vars // OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_PROTOCOL - exporter + builder } fn setup_custom_metrics(i: &Instrument) -> Option { - if i.name == "query_queued_duration" { - Some( - Stream::new() - .name(i.name.clone()) - .description(i.description.clone()) - .unit(i.unit.clone()) - .aggregation(Aggregation::ExplicitBucketHistogram { - // Copied and adopted from https://github.com/open-telemetry/opentelemetry-rust/blob/7d0b80ea852eb3218504b722476484063802a9a4/opentelemetry-sdk/src/metrics/reader.rs#L151-L154 - boundaries: vec![ - 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, - 2500.0, 5000.0, 7500.0, 10000.0, 25000.0, 50000.0, 75000.0, 100000.0, - 250000.0, 500000.0, 750000.0, 1000000.0, 2500000.0, - ], - record_min_max: true, - }), - ) + if i.name() == "query_queued_duration" { + // `Instrument` no longer exposes its description as a field/getter, so the view only sets + // the name and aggregation. Description and unit are inherited from the instrument when not + // overridden, so the resulting stream is identical to before. + Stream::builder() + .with_name(i.name().to_string()) + .with_aggregation(Aggregation::ExplicitBucketHistogram { + // Copied and adopted from https://github.com/open-telemetry/opentelemetry-rust/blob/7d0b80ea852eb3218504b722476484063802a9a4/opentelemetry-sdk/src/metrics/reader.rs#L151-L154 + boundaries: vec![ + 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, + 5000.0, 7500.0, 10000.0, 25000.0, 50000.0, 75000.0, 100000.0, 250000.0, + 500000.0, 750000.0, 1000000.0, 2500000.0, + ], + record_min_max: true, + }) + .build() + .ok() } else { None } From 2b2c53178b0421424ff40f4425faa51e8d66ad62 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 26 Jun 2026 13:56:03 +0200 Subject: [PATCH 5/9] refactor(metrics): Move query_queued_duration buckets out of a View and configure them directly where the metric is defined --- trino-lb/src/metrics.rs | 27 +++++++++++++++++++++++++++ trino-lb/src/tracing.rs | 26 +------------------------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/trino-lb/src/metrics.rs b/trino-lb/src/metrics.rs index 2f1b145..48d6af5 100644 --- a/trino-lb/src/metrics.rs +++ b/trino-lb/src/metrics.rs @@ -24,6 +24,32 @@ use trino_lb_persistence::{Persistence, PersistenceImplementation}; use crate::trino_client::ClusterInfo; +/// Explicit histogram bucket boundaries (in milliseconds) for the `query_queued_duration` metric. +/// +/// The default boundaries top out at 10s, which is far too small for queue durations, hence the +/// custom buckets. As of opentelemetry 0.32 these can be set directly on the instrument via +/// [`with_boundaries`](opentelemetry::metrics::HistogramBuilder::with_boundaries) instead of a view. +/// +// Copied and adapted from https://github.com/open-telemetry/opentelemetry-rust/blob/7d0b80ea852eb3218504b722476484063802a9a4/opentelemetry-sdk/src/metrics/reader.rs#L151-L154 +const QUEUED_DURATION_BUCKETS: [f64; 24] = [ + 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, + 10000.0, 25000.0, 50000.0, 75000.0, 100000.0, 250000.0, 500000.0, 750000.0, 1000000.0, + 2500000.0, +]; + +// The SDK silently turns the histogram into a no-op if the boundaries are not strictly increasing, +// so verify the constant at compile time. Strictly increasing also rules out duplicates and NaN. +const _: () = { + let mut i = 1; + while i < QUEUED_DURATION_BUCKETS.len() { + assert!( + QUEUED_DURATION_BUCKETS[i - 1] < QUEUED_DURATION_BUCKETS[i], + "QUEUED_DURATION_BUCKETS must be strictly increasing", + ); + i += 1; + } +}; + pub struct Metrics { pub registry: Registry, pub http_counter: Counter, @@ -52,6 +78,7 @@ impl Metrics { .u64_histogram("query_queued_duration") .with_unit("ms") .with_description("The time queries where queued in trino-lb") + .with_boundaries(QUEUED_DURATION_BUCKETS.to_vec()) .build(); let cluster_infos = Arc::new(RwLock::new(HashMap::::new())); diff --git a/trino-lb/src/tracing.rs b/trino-lb/src/tracing.rs index c02cc19..46b513f 100644 --- a/trino-lb/src/tracing.rs +++ b/trino-lb/src/tracing.rs @@ -5,7 +5,7 @@ use opentelemetry_http::HeaderInjector; use opentelemetry_otlp::{MetricExporter, SpanExporter, WithExportConfig, WithTonicConfig}; use opentelemetry_sdk::{ Resource, - metrics::{Aggregation, Instrument, PeriodicReader, SdkMeterProvider, Stream, Temporality}, + metrics::{PeriodicReader, SdkMeterProvider, Temporality}, propagation::TraceContextPropagator, trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, }; @@ -80,7 +80,6 @@ pub fn init( .context(CreateOpenTelemetryPrometheusExporterSnafu)?; let meter_provider = SdkMeterProvider::builder() - .with_view(setup_custom_metrics) .with_reader(exporter) .build(); @@ -163,29 +162,6 @@ fn configure_exporter( builder } -fn setup_custom_metrics(i: &Instrument) -> Option { - if i.name() == "query_queued_duration" { - // `Instrument` no longer exposes its description as a field/getter, so the view only sets - // the name and aggregation. Description and unit are inherited from the instrument when not - // overridden, so the resulting stream is identical to before. - Stream::builder() - .with_name(i.name().to_string()) - .with_aggregation(Aggregation::ExplicitBucketHistogram { - // Copied and adopted from https://github.com/open-telemetry/opentelemetry-rust/blob/7d0b80ea852eb3218504b722476484063802a9a4/opentelemetry-sdk/src/metrics/reader.rs#L151-L154 - boundaries: vec![ - 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, - 5000.0, 7500.0, 10000.0, 25000.0, 50000.0, 75000.0, 100000.0, 250000.0, - 500000.0, 750000.0, 1000000.0, 2500000.0, - ], - record_min_max: true, - }) - .build() - .ok() - } else { - None - } -} - pub fn add_current_context_to_client_request( context: Context, headers: &mut reqwest::header::HeaderMap, From bd6a24fba710f83f107d67ccbdec68ec0bc087b0 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 26 Jun 2026 14:00:13 +0200 Subject: [PATCH 6/9] chore: Formatting --- trino-lb/src/tracing.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/trino-lb/src/tracing.rs b/trino-lb/src/tracing.rs index 46b513f..6af82f2 100644 --- a/trino-lb/src/tracing.rs +++ b/trino-lb/src/tracing.rs @@ -79,9 +79,7 @@ pub fn init( .build() .context(CreateOpenTelemetryPrometheusExporterSnafu)?; - let meter_provider = SdkMeterProvider::builder() - .with_reader(exporter) - .build(); + let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build(); tracing::subscriber::set_global_default(tracing_subscriber::registry().with(layers)) .context(SetGlobalTracingSubscriberSnafu)?; From 384adc1734afbae0afe651fc77b5deaa94adf7ea Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Jun 2026 11:42:21 +0200 Subject: [PATCH 7/9] fix: Report problems starting the Prometheus exporter --- trino-lb/src/http_server/mod.rs | 34 +++++++++++++++++---------------- trino-lb/src/main.rs | 6 +++--- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/trino-lb/src/http_server/mod.rs b/trino-lb/src/http_server/mod.rs index 5dc6275..ec13e3c 100644 --- a/trino-lb/src/http_server/mod.rs +++ b/trino-lb/src/http_server/mod.rs @@ -42,6 +42,9 @@ pub enum Error { #[snafu(display("Failed start HTTP server"))] StartHttpServer { source: std::io::Error }, + #[snafu(display("Failed to start Prometheus metrics exporter"))] + StartMetricsExporter { source: std::io::Error }, + #[snafu(display( "In case https is used the `tls.certPemFile` and `tls.keyPemFile` options must be set" ))] @@ -84,16 +87,13 @@ pub async fn start_http_server( let handle = Handle::new(); tokio::spawn(graceful_shutdown(handle.clone())); - // TODO: Think about shutting down the whole trino-lb server when the Prometheus metrics exporter fails. - // This is the reason why we start the metrics exporter first on a new task, so we still fail when the main - // server fails. - let handle_clone = handle.clone(); - tokio::spawn(async move { - axum_server::bind(listen_addr) - .handle(handle_clone) - .serve(app.into_make_service()) - .await - }); + // The metrics exporter is run concurrently with the main server (see `try_join!` below) rather than + // on a detached task. This way a failure of either server (e.g. failing to bind the listen address) + // brings down the whole trino-lb server instead of being silently ignored. + let metrics_server = axum_server::bind(listen_addr) + .handle(handle.clone()) + .serve(app.into_make_service()) + .map(|result| result.context(StartMetricsExporterSnafu)); // Note that get routes will also be called for HEAD requests but will have the response body // removed. Make sure to add explicit HEAD routes afterwards. @@ -158,21 +158,23 @@ pub async fn start_http_server( key_pem_file, })?; - axum_server::bind_rustls(listen_addr, tls_config) + let main_server = axum_server::bind_rustls(listen_addr, tls_config) .handle(handle) .serve(app.into_make_service()) - .await - .context(StartHttpServerSnafu)?; + .map(|result| result.context(StartHttpServerSnafu)); + + tokio::try_join!(metrics_server, main_server)?; } else { // Start http server let listen_addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, ports_config.http)); info!(%listen_addr, "Starting server"); - axum_server::bind(listen_addr) + let main_server = axum_server::bind(listen_addr) .handle(handle) .serve(app.into_make_service()) - .await - .context(StartHttpServerSnafu)?; + .map(|result| result.context(StartHttpServerSnafu)); + + tokio::try_join!(metrics_server, main_server)?; } info!("Shut down"); diff --git a/trino-lb/src/main.rs b/trino-lb/src/main.rs index b02c35f..6e0a5a7 100644 --- a/trino-lb/src/main.rs +++ b/trino-lb/src/main.rs @@ -65,8 +65,8 @@ pub enum Error { #[snafu(display("Failed to start scaler"))] StartScaler { source: scaling::Error }, - #[snafu(display("Failed to start HTTP server"))] - StartHttpServer { source: http_server::Error }, + #[snafu(display("Failed to run HTTP servers"))] + RunHttpServers { source: http_server::Error }, } /// We can not use the `#[tokio::main]` macro, as we need at least 3 worker threads because of some magic happening @@ -187,7 +187,7 @@ async fn start() -> Result<(), MainError> { Arc::clone(&metrics), ) .await - .context(StartHttpServerSnafu)?; + .context(RunHttpServersSnafu)?; // The global `shutdown_tracer_provider` helper was removed in opentelemetry 0.32; shut down via // the provider handle returned by `tracing::init` instead so pending spans are flushed. From f39066da747720af45dd487fb1776f3d02daa109 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Jun 2026 14:44:37 +0200 Subject: [PATCH 8/9] Apply suggestions from code review Co-authored-by: Sebastian Bernauer --- trino-lb/src/main.rs | 4 +--- trino-lb/src/metrics.rs | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/trino-lb/src/main.rs b/trino-lb/src/main.rs index 6e0a5a7..aeebe70 100644 --- a/trino-lb/src/main.rs +++ b/trino-lb/src/main.rs @@ -189,12 +189,10 @@ async fn start() -> Result<(), MainError> { .await .context(RunHttpServersSnafu)?; - // The global `shutdown_tracer_provider` helper was removed in opentelemetry 0.32; shut down via - // the provider handle returned by `tracing::init` instead so pending spans are flushed. if let Some(tracer_provider) = tracer_provider && let Err(error) = tracer_provider.shutdown() { - ::tracing::warn!(%error, "Failed to shut down the OpenTelemetry tracer provider"); + warn!(%error, "Failed to shut down the OpenTelemetry tracer provider"); } Ok(()) diff --git a/trino-lb/src/metrics.rs b/trino-lb/src/metrics.rs index 48d6af5..546a8b2 100644 --- a/trino-lb/src/metrics.rs +++ b/trino-lb/src/metrics.rs @@ -27,8 +27,7 @@ use crate::trino_client::ClusterInfo; /// Explicit histogram bucket boundaries (in milliseconds) for the `query_queued_duration` metric. /// /// The default boundaries top out at 10s, which is far too small for queue durations, hence the -/// custom buckets. As of opentelemetry 0.32 these can be set directly on the instrument via -/// [`with_boundaries`](opentelemetry::metrics::HistogramBuilder::with_boundaries) instead of a view. +/// custom buckets. /// // Copied and adapted from https://github.com/open-telemetry/opentelemetry-rust/blob/7d0b80ea852eb3218504b722476484063802a9a4/opentelemetry-sdk/src/metrics/reader.rs#L151-L154 const QUEUED_DURATION_BUCKETS: [f64; 24] = [ From 2ff088d1df348b42b7efea86b0e4198672a0b040 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Jun 2026 14:46:10 +0200 Subject: [PATCH 9/9] Add import --- trino-lb/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/trino-lb/src/main.rs b/trino-lb/src/main.rs index aeebe70..e247847 100644 --- a/trino-lb/src/main.rs +++ b/trino-lb/src/main.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use ::tracing::warn; use clap::Parser; use cluster_group_manager::ClusterGroupManager; use main_error::MainError;