diff --git a/Cargo.lock b/Cargo.lock index 4e4979c..2840c98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,15 +123,15 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +checksum = "2a4385e2e34eb35d6b3efe798b9eb88096925d87726c0798709bf56d9ed84af3" [[package]] name = "arc-swap" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" +checksum = "c049c0be4daef0b145cb3555416b3b8ef5b7888a38aea1a3a155801fe7b0810b" dependencies = [ "rustversion", ] @@ -312,40 +312,13 @@ dependencies = [ "fs_extra", ] -[[package]] -name = "axum" -version = "0.7.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" -dependencies = [ - "async-trait", - "axum-core 0.4.5", - "bytes", - "futures-util", - "http 1.4.2", - "http-body 1.0.1", - "http-body-util", - "itoa", - "matchit 0.7.3", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper 1.0.2", - "tower 0.5.3", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" dependencies = [ - "axum-core 0.5.6", + "axum-core", "bytes", "form_urlencoded", "futures-util", @@ -355,7 +328,7 @@ dependencies = [ "hyper 1.10.1", "hyper-util", "itoa", - "matchit 0.8.4", + "matchit", "memchr", "mime", "percent-encoding", @@ -366,32 +339,12 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tower 0.5.3", + "tower", "tower-layer", "tower-service", "tracing", ] -[[package]] -name = "axum-core" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 1.4.2", - "http-body 1.0.1", - "http-body-util", - "mime", - "pin-project-lite", - "rustversion", - "sync_wrapper 1.0.2", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.5.6" @@ -417,8 +370,8 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be44683b41ccb9ab2d23a5230015c9c3c55be97a25e4428366de8873103f7970" dependencies = [ - "axum 0.8.9", - "axum-core 0.5.6", + "axum", + "axum-core", "bytes", "futures-core", "futures-util", @@ -1100,18 +1053,18 @@ dependencies = [ [[package]] name = "enum-ordinalize" -version = "4.3.2" +version = "4.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a1091a7bb1f8f2c4b28f1fe2cef4980ca2d410a3d727d67ecc3178c9b0800f0" +checksum = "07f808d588c10e464ea6f7d3eaed500049eff30aaac103460f61828c2d65b3eb" dependencies = [ "enum-ordinalize-derive", ] [[package]] name = "enum-ordinalize-derive" -version = "4.3.2" +version = "4.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" +checksum = "42e528e2d34ba8a67a1a650b86beae8ef69fc5fdb638016f386b973226590432" dependencies = [ "proc-macro2", "quote", @@ -1441,7 +1394,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.14.0", + "indexmap", "slab", "tokio", "tokio-util", @@ -1460,19 +1413,13 @@ dependencies = [ "futures-core", "futures-sink", "http 1.4.2", - "indexmap 2.14.0", + "indexmap", "slab", "tokio", "tokio-util", "tracing", ] -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.16.1" @@ -1665,9 +1612,9 @@ dependencies = [ [[package]] name = "hybrid-array" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9155a582abd142abc056962c29e3ce5ff2ad5469f4246b537ed42c5deba857da" +checksum = "818356c5132c1fede50f837ca96afbe78ff42413047f4abb886217845e1b6c8c" dependencies = [ "typenum", ] @@ -1912,16 +1859,6 @@ dependencies = [ "icu_properties", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.14.0" @@ -2183,7 +2120,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-util", - "tower 0.5.3", + "tower", "tower-http", "tracing", ] @@ -2303,12 +2240,6 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - [[package]] name = "matchit" version = "0.8.4" @@ -2443,23 +2374,23 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "opentelemetry" -version = "0.24.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" dependencies = [ "futures-core", "futures-sink", "js-sys", - "once_cell", "pin-project-lite", - "thiserror 1.0.69", + "thiserror 2.0.18", + "tracing", ] [[package]] name = "opentelemetry-http" -version = "0.13.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad31e9de44ee3538fb9d64fe3376c1362f406162434609e79aea2a41a0af78ab" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" dependencies = [ "async-trait", "bytes", @@ -2469,12 +2400,10 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.17.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" dependencies = [ - "async-trait", - "futures-core", "http 1.4.2", "opentelemetry", "opentelemetry-proto", @@ -2482,55 +2411,52 @@ dependencies = [ "prost", "serde", "serde_json", - "thiserror 1.0.69", + "thiserror 2.0.18", "tokio", "tonic", + "tonic-types", ] [[package]] name = "opentelemetry-prometheus" -version = "0.17.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc4191ce34aa274621861a7a9d68dbcf618d5b6c66b10081631b61fd81fbc015" +checksum = "2c0359983e7f79cf33c9abd89e5d7ddf67c46c419d0148598022d70e70c01aba" dependencies = [ "once_cell", "opentelemetry", "opentelemetry_sdk", "prometheus", - "protobuf", + "tracing", ] [[package]] name = "opentelemetry-proto" -version = "0.7.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry_sdk" -version = "0.24.1" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692eac490ec80f24a17828d49b40b60f5aeaccdfe6a503f939713afd22bc28df" +checksum = "9b59f80e1ac4d5ff7a2db8fb6c80badb7f0f3f858211fba08dd9aaec750894f9" dependencies = [ - "async-trait", "futures-channel", "futures-executor", "futures-util", - "glob", - "once_cell", "opentelemetry", "percent-encoding", - "rand 0.8.6", - "serde_json", - "thiserror 1.0.69", - "tokio", - "tokio-stream", + "portable-atomic", + "rand 0.9.4", + "thiserror 2.0.18", ] [[package]] @@ -2796,9 +2722,9 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.13.4" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" dependencies = [ "cfg-if", "fnv", @@ -2806,14 +2732,14 @@ dependencies = [ "memchr", "parking_lot", "protobuf", - "thiserror 1.0.69", + "thiserror 2.0.18", ] [[package]] name = "prost" -version = "0.13.5" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "528ac67416ff8646872a3c02cad9cc4ee5dc9f9540c9b10771855c95cb2e5ae1" dependencies = [ "bytes", "prost-derive", @@ -2821,9 +2747,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", "itertools 0.14.0", @@ -2832,11 +2758,34 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-types" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f94967dc7688f3054c7fac87473ffae4cc4c3904800e2d9f5b857246d8963b0a" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" -version = "2.28.0" +version = "3.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] [[package]] name = "prusto" @@ -3031,8 +2980,6 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" dependencies = [ - "libc", - "rand_chacha 0.3.1", "rand_core 0.6.4", ] @@ -3042,7 +2989,7 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ - "rand_chacha 0.9.0", + "rand_chacha", "rand_core 0.9.5", ] @@ -3057,16 +3004,6 @@ dependencies = [ "rand_core 0.10.1", ] -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.4", -] - [[package]] name = "rand_chacha" version = "0.9.0" @@ -3082,9 +3019,6 @@ name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom 0.2.17", -] [[package]] name = "rand_core" @@ -3103,9 +3037,9 @@ checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" [[package]] name = "redis" -version = "1.2.4" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae41a63fd0b8a5372f82b21e810e09a316f5dd7efd96bf08e678fb240fc1918" +checksum = "2fa6f8e4b491d7a8ef3a9550a4d71969bd0064f46e32b8dbbcc7fc60dad94fed" dependencies = [ "arc-swap", "arcstr", @@ -3249,7 +3183,7 @@ dependencies = [ "sync_wrapper 1.0.2", "tokio", "tokio-rustls 0.26.4", - "tower 0.5.3", + "tower", "tower-http", "tower-service", "url", @@ -3609,7 +3543,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.14.0", + "indexmap", "itoa", "ryu", "serde", @@ -3817,7 +3751,7 @@ dependencies = [ "futures-util", "hashbrown 0.16.1", "hashlink", - "indexmap 2.14.0", + "indexmap", "log", "memchr", "percent-encoding", @@ -4290,7 +4224,7 @@ version = "0.25.12+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2153edc6955a6c354fad8f5efd38b6a8769bdccf9fe50f8e1329f81b0baa5d7" dependencies = [ - "indexmap 2.14.0", + "indexmap", "toml_datetime", "toml_parser", "winnow", @@ -4307,17 +4241,14 @@ dependencies = [ [[package]] name = "tonic" -version = "0.12.3" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef" dependencies = [ - "async-stream", "async-trait", - "axum 0.7.9", "base64 0.22.1", "bytes", "flate2", - "h2 0.4.15", "http 1.4.2", "http-body 1.0.1", "http-body-util", @@ -4326,34 +4257,35 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", - "socket2 0.5.10", + "sync_wrapper 1.0.2", "tokio", "tokio-stream", - "tower 0.4.13", + "tower", "tower-layer", "tower-service", "tracing", ] [[package]] -name = "tower" -version = "0.4.13" +name = "tonic-prost" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +checksum = "50849f68853be452acf590cde0b146665b8d507b3b8af17261df47e02c209ea0" dependencies = [ - "futures-core", - "futures-util", - "indexmap 1.9.3", - "pin-project", - "pin-project-lite", - "rand 0.8.6", - "slab", - "tokio", - "tokio-util", - "tower-layer", - "tower-service", - "tracing", + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-types" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ab1b02061f83d519bba3caa167f88f261ef05720ab8ebc954ade70de3348e8" +dependencies = [ + "prost", + "prost-types", + "tonic", ] [[package]] @@ -4364,7 +4296,9 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project-lite", + "slab", "sync_wrapper 1.0.2", "tokio", "tokio-util", @@ -4392,7 +4326,7 @@ dependencies = [ "pin-project-lite", "tokio", "tokio-util", - "tower 0.5.3", + "tower", "tower-layer", "tower-service", "tracing", @@ -4457,14 +4391,12 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.25.0" +version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +checksum = "adbc64cba7137545b8044cb1fe9814f7aacf3c6b5f9b45be8bb5db538befdb26" dependencies = [ "js-sys", - "once_cell", "opentelemetry", - "opentelemetry_sdk", "smallvec", "tracing", "tracing-core", @@ -4507,7 +4439,7 @@ name = "trino-lb" version = "0.6.0" dependencies = [ "askama", - "axum 0.8.9", + "axum", "axum-extra", "axum-server", "chrono", @@ -4541,7 +4473,7 @@ dependencies = [ "snafu", "strum", "tokio", - "tower 0.5.3", + "tower", "tower-http", "tracing", "tracing-opentelemetry", diff --git a/Cargo.toml b/Cargo.toml index b0647a7..eb50dac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,12 +40,23 @@ kube = { version = "4.0", default-features = false, features = [ main_error = "0.1" num_cpus = "1.16" number_prefix = "0.4" -opentelemetry = "0.24" -opentelemetry_sdk = { version = "0.24", features = ["rt-tokio"] } -opentelemetry-http = "0.13" -opentelemetry-otlp = { version = "0.17", features = ["serialize", "gzip-tonic"] } -opentelemetry-prometheus = "0.17" -prometheus = "0.13" +opentelemetry = "0.32" +opentelemetry_sdk = "0.32" +opentelemetry-http = "0.32" +# `default-features = false` because the 0.32 defaults switched from tonic/gRPC to an http-proto +# (reqwest) exporter we don't use; we re-list only the tonic-based features we need. +opentelemetry-otlp = { version = "0.32", default-features = false, features = [ + "grpc-tonic", + "gzip-tonic", + "serialize", + "trace", + "metrics", + # On by default in 0.32; re-listed because `default-features = false` would otherwise drop it. + # Lets the exporter report its own failures (e.g. collector unreachable) via `tracing`. + "internal-logs", +] } +opentelemetry-prometheus = "0.32" +prometheus = "0.14" prusto = "0.5" pyo3 = { version = "0.29", features = ["auto-initialize"] } rand = "0.10" @@ -80,11 +91,11 @@ sqlx = { version = "0.9", features = [ "postgres", ] } strum = { version = "0.28", features = ["derive"] } -tokio = "1.39" +tokio = "1.52" tower = "0.5" tower-http = { version = "0.6", features = ["compression-full", "tracing"] } tracing = "0.1" -tracing-opentelemetry = "0.25" +tracing-opentelemetry = "0.33" tracing-subscriber = { version = "0.3", features = ["env-filter"] } trait-variant = "0.1" url = { version = "2.5", features = ["serde"] } diff --git a/trino-lb/src/http_server/mod.rs b/trino-lb/src/http_server/mod.rs index 5dc6275..ec13e3c 100644 --- a/trino-lb/src/http_server/mod.rs +++ b/trino-lb/src/http_server/mod.rs @@ -42,6 +42,9 @@ pub enum Error { #[snafu(display("Failed start HTTP server"))] StartHttpServer { source: std::io::Error }, + #[snafu(display("Failed to start Prometheus metrics exporter"))] + StartMetricsExporter { source: std::io::Error }, + #[snafu(display( "In case https is used the `tls.certPemFile` and `tls.keyPemFile` options must be set" ))] @@ -84,16 +87,13 @@ pub async fn start_http_server( let handle = Handle::new(); tokio::spawn(graceful_shutdown(handle.clone())); - // TODO: Think about shutting down the whole trino-lb server when the Prometheus metrics exporter fails. - // This is the reason why we start the metrics exporter first on a new task, so we still fail when the main - // server fails. - let handle_clone = handle.clone(); - tokio::spawn(async move { - axum_server::bind(listen_addr) - .handle(handle_clone) - .serve(app.into_make_service()) - .await - }); + // The metrics exporter is run concurrently with the main server (see `try_join!` below) rather than + // on a detached task. This way a failure of either server (e.g. failing to bind the listen address) + // brings down the whole trino-lb server instead of being silently ignored. + let metrics_server = axum_server::bind(listen_addr) + .handle(handle.clone()) + .serve(app.into_make_service()) + .map(|result| result.context(StartMetricsExporterSnafu)); // Note that get routes will also be called for HEAD requests but will have the response body // removed. Make sure to add explicit HEAD routes afterwards. @@ -158,21 +158,23 @@ pub async fn start_http_server( key_pem_file, })?; - axum_server::bind_rustls(listen_addr, tls_config) + let main_server = axum_server::bind_rustls(listen_addr, tls_config) .handle(handle) .serve(app.into_make_service()) - .await - .context(StartHttpServerSnafu)?; + .map(|result| result.context(StartHttpServerSnafu)); + + tokio::try_join!(metrics_server, main_server)?; } else { // Start http server let listen_addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, ports_config.http)); info!(%listen_addr, "Starting server"); - axum_server::bind(listen_addr) + let main_server = axum_server::bind(listen_addr) .handle(handle) .serve(app.into_make_service()) - .await - .context(StartHttpServerSnafu)?; + .map(|result| result.context(StartHttpServerSnafu)); + + tokio::try_join!(metrics_server, main_server)?; } info!("Shut down"); diff --git a/trino-lb/src/main.rs b/trino-lb/src/main.rs index 4722183..e247847 100644 --- a/trino-lb/src/main.rs +++ b/trino-lb/src/main.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use ::tracing::warn; use clap::Parser; use cluster_group_manager::ClusterGroupManager; use main_error::MainError; @@ -7,7 +8,6 @@ use maintenance::{ leftover_queries::LeftoverQueryDetector, query_count_fetcher, query_count_fetcher::QueryCountFetcher, }; -use opentelemetry::global::shutdown_tracer_provider; use routing::Router; use scaling::Scaler; use snafu::{ResultExt, Snafu}; @@ -66,8 +66,8 @@ pub enum Error { #[snafu(display("Failed to start scaler"))] StartScaler { source: scaling::Error }, - #[snafu(display("Failed to start HTTP server"))] - StartHttpServer { source: http_server::Error }, + #[snafu(display("Failed to run HTTP servers"))] + RunHttpServers { source: http_server::Error }, } /// We can not use the `#[tokio::main]` macro, as we need at least 3 worker threads because of some magic happening @@ -146,14 +146,13 @@ async fn start() -> Result<(), MainError> { } }); - let metrics = Arc::new( - tracing::init( - config.trino_lb.tracing.as_ref(), - Arc::clone(&persistence), - &config, - ) - .context(SetUpTracingSnafu)?, - ); + let (metrics, tracer_provider) = tracing::init( + config.trino_lb.tracing.as_ref(), + Arc::clone(&persistence), + &config, + ) + .context(SetUpTracingSnafu)?; + let metrics = Arc::new(metrics); let cluster_group_manager = ClusterGroupManager::new( Arc::clone(&persistence), @@ -189,9 +188,13 @@ async fn start() -> Result<(), MainError> { Arc::clone(&metrics), ) .await - .context(StartHttpServerSnafu)?; + .context(RunHttpServersSnafu)?; - shutdown_tracer_provider(); + if let Some(tracer_provider) = tracer_provider + && let Err(error) = tracer_provider.shutdown() + { + warn!(%error, "Failed to shut down the OpenTelemetry tracer provider"); + } Ok(()) } diff --git a/trino-lb/src/metrics.rs b/trino-lb/src/metrics.rs index a711755..546a8b2 100644 --- a/trino-lb/src/metrics.rs +++ b/trino-lb/src/metrics.rs @@ -7,10 +7,9 @@ use std::{ use futures::future::try_join_all; use opentelemetry::{ KeyValue, - metrics::{Counter, Histogram, MetricsError}, + metrics::{Counter, Histogram}, }; use prometheus::Registry; -use snafu::{ResultExt, Snafu}; use tokio::{ runtime::Builder, sync::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -25,11 +24,30 @@ use trino_lb_persistence::{Persistence, PersistenceImplementation}; use crate::trino_client::ClusterInfo; -#[derive(Snafu, Debug)] -pub enum Error { - #[snafu(display("Failed to register metrics callback"))] - RegisterMetricsCallback { source: MetricsError }, -} +/// Explicit histogram bucket boundaries (in milliseconds) for the `query_queued_duration` metric. +/// +/// The default boundaries top out at 10s, which is far too small for queue durations, hence the +/// custom buckets. +/// +// Copied and adapted from https://github.com/open-telemetry/opentelemetry-rust/blob/7d0b80ea852eb3218504b722476484063802a9a4/opentelemetry-sdk/src/metrics/reader.rs#L151-L154 +const QUEUED_DURATION_BUCKETS: [f64; 24] = [ + 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, + 10000.0, 25000.0, 50000.0, 75000.0, 100000.0, 250000.0, 500000.0, 750000.0, 1000000.0, + 2500000.0, +]; + +// The SDK silently turns the histogram into a no-op if the boundaries are not strictly increasing, +// so verify the constant at compile time. Strictly increasing also rules out duplicates and NaN. +const _: () = { + let mut i = 1; + while i < QUEUED_DURATION_BUCKETS.len() { + assert!( + QUEUED_DURATION_BUCKETS[i - 1] < QUEUED_DURATION_BUCKETS[i], + "QUEUED_DURATION_BUCKETS must be strictly increasing", + ); + i += 1; + } +}; pub struct Metrics { pub registry: Registry, @@ -46,79 +64,62 @@ impl Metrics { registry: Registry, persistence: Arc, config: &Config, - ) -> Result { + ) -> Self { let meter = opentelemetry::global::meter("trino-lb"); let http_counter = meter .u64_counter("http_requests_total") .with_unit("requests") .with_description("Total number of HTTP requests made.") - .init(); + .build(); let queued_time = meter .u64_histogram("query_queued_duration") .with_unit("ms") .with_description("The time queries where queued in trino-lb") - .init(); + .with_boundaries(QUEUED_DURATION_BUCKETS.to_vec()) + .build(); let cluster_infos = Arc::new(RwLock::new(HashMap::::new())); - let cluster_counts_per_state_metric = meter - .u64_observable_gauge("cluster_counts_per_state") - .with_unit("clusters") - .with_description("The number of active or inactive clusters for each cluster group") - .init(); - - let cluster_queries_metric = meter + // As of opentelemetry 0.32 the callback is attached to the instrument builder via + // `with_callback` instead of the removed `Meter::register_callback`. The built instrument + // handle is not needed afterwards: the callback is registered with the SDK on `build()`. + let cluster_infos_for_callback = Arc::clone(&cluster_infos); + let _cluster_queries_metric = meter .u64_observable_gauge("cluster_queries") .with_unit("queries") .with_description( "The number of running, queued or blocked queries on a specific Trino cluster", ) - .init(); - - let queued_queries_metric = meter - .u64_observable_gauge("queued_queries") - .with_unit("queries") - .with_description("The number of queries queued across all trino-lb instances") - .init(); - - let cluster_infos_for_callback = Arc::clone(&cluster_infos); - meter - .register_callback(&[cluster_queries_metric.as_any()], move |observer| { + .with_callback(move |observer| { if let Ok(cluster_query_counters) = cluster_infos_for_callback.read() { for (cluster, counter) in cluster_query_counters.deref() { - observer.observe_u64( - &cluster_queries_metric, + observer.observe( counter.running_queries, - [ + &[ KeyValue::new("cluster", cluster.to_string()), KeyValue::new("state", "running"), - ] - .as_ref(), + ], ); - observer.observe_u64( - &cluster_queries_metric, + observer.observe( counter.queued_queries, - [ + &[ KeyValue::new("cluster", cluster.to_string()), KeyValue::new("state", "queued"), - ] - .as_ref(), + ], ); - observer.observe_u64( - &cluster_queries_metric, + observer.observe( counter.blocked_queries, - [ + &[ KeyValue::new("cluster", cluster.to_string()), KeyValue::new("state", "blocked"), - ] - .as_ref(), + ], ); } } }) - .context(RegisterMetricsCallbackSnafu)?; + .build(); // All of this mess can be removed once https://github.com/open-telemetry/opentelemetry-rust/issues/1376 is supported. let (ping_sender, ping_receiver) = tokio::sync::mpsc::unbounded_channel::<()>(); @@ -140,8 +141,11 @@ impl Metrics { )) }); - meter - .register_callback(&[queued_queries_metric.as_any()], move |observer| { + let _queued_queries_metric = meter + .u64_observable_gauge("queued_queries") + .with_unit("queries") + .with_description("The number of queries queued across all trino-lb instances") + .with_callback(move |observer| { if let Err(err) = ping_sender.send(()) { error!(error = ?err, "Failed to send ping for queued_queries metric"); return; @@ -172,15 +176,14 @@ impl Metrics { if let Some(queued_queries) = queued_queries { for (cluster_group, queued) in queued_queries { - observer.observe_u64( - &queued_queries_metric, + observer.observe( queued, - [KeyValue::new("cluster-group", cluster_group)].as_ref(), + &[KeyValue::new("cluster-group", cluster_group)], ); } } }) - .context(RegisterMetricsCallbackSnafu)?; + .build(); // All of this mess can be removed once https://github.com/open-telemetry/opentelemetry-rust/issues/1376 is supported. let (ping_sender, ping_receiver) = tokio::sync::mpsc::unbounded_channel::<()>(); @@ -202,63 +205,61 @@ impl Metrics { )) }); - meter - .register_callback( - &[cluster_counts_per_state_metric.as_any()], - move |observer| { - if let Err(err) = ping_sender.send(()) { - error!(error = ?err, "Failed to send ping for cluster_counts_per_state metric"); - return; - } - let cluster_counts = std::thread::scope(|s| { - s.spawn(|| { - let mut receiver = match metrics_receiver.write() { - Ok(r) => r, - Err(err) => { - error!(error = %err, "Failed to acquire write lock for cluster_counts_per_state metric"); - return None; - } - }; - match receiver.blocking_recv() { - Some(v) => Some(v), - None => { - error!("cluster_counts_per_state metrics channel closed"); - None - } + let _cluster_counts_per_state_metric = meter + .u64_observable_gauge("cluster_counts_per_state") + .with_unit("clusters") + .with_description("The number of active or inactive clusters for each cluster group") + .with_callback(move |observer| { + if let Err(err) = ping_sender.send(()) { + error!(error = ?err, "Failed to send ping for cluster_counts_per_state metric"); + return; + } + let cluster_counts = std::thread::scope(|s| { + s.spawn(|| { + let mut receiver = match metrics_receiver.write() { + Ok(r) => r, + Err(err) => { + error!(error = %err, "Failed to acquire write lock for cluster_counts_per_state metric"); + return None; } - }) - .join() - .unwrap_or_else(|err| { - error!(error = ?err, "cluster_counts_per_state metrics thread panicked"); - None - }) - }); - - if let Some(cluster_counts) = cluster_counts { - for (cluster_group, counts) in cluster_counts { - for (state, count) in counts { - observer.observe_u64( - &cluster_counts_per_state_metric, - count, - [ - KeyValue::new("cluster-group", cluster_group.clone()), - KeyValue::new::<_, &str>("state", state.into()), - ] - .as_ref(), - ); + }; + match receiver.blocking_recv() { + Some(v) => Some(v), + None => { + error!("cluster_counts_per_state metrics channel closed"); + None } } + }) + .join() + .unwrap_or_else(|err| { + error!(error = ?err, "cluster_counts_per_state metrics thread panicked"); + None + }) + }); + + if let Some(cluster_counts) = cluster_counts { + for (cluster_group, counts) in cluster_counts { + for (state, count) in counts { + observer.observe( + count, + &[ + KeyValue::new("cluster-group", cluster_group.clone()), + KeyValue::new::<_, &str>("state", state.into()), + ], + ); + } } - }, - ) - .context(RegisterMetricsCallbackSnafu)?; + } + }) + .build(); - Ok(Self { + Self { registry, http_counter, queued_time, cluster_infos, - }) + } } } diff --git a/trino-lb/src/tracing.rs b/trino-lb/src/tracing.rs index 31d4a56..6af82f2 100644 --- a/trino-lb/src/tracing.rs +++ b/trino-lb/src/tracing.rs @@ -1,20 +1,13 @@ use std::{sync::Arc, time::Duration}; -use opentelemetry::{ - Context, KeyValue, global, - metrics::MetricsError, - trace::{TraceError, TracerProvider}, -}; +use opentelemetry::{Context, global, trace::TracerProvider}; use opentelemetry_http::HeaderInjector; -use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig}; +use opentelemetry_otlp::{MetricExporter, SpanExporter, WithExportConfig, WithTonicConfig}; use opentelemetry_sdk::{ Resource, - metrics::{ - Aggregation, Instrument, SdkMeterProvider, Stream, - reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, - }, + metrics::{PeriodicReader, SdkMeterProvider, Temporality}, propagation::TraceContextPropagator, - trace::{self, RandomIdGenerator, Sampler}, + trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, }; use snafu::{ResultExt, Snafu}; use tracing::{level_filters::LevelFilter, subscriber::SetGlobalDefaultError}; @@ -22,50 +15,62 @@ use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt}; use trino_lb_core::config::{Config, TrinoLbTracingConfig}; use trino_lb_persistence::PersistenceImplementation; -use crate::metrics::{self, Metrics}; +use crate::metrics::Metrics; #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("Failed to install tokio batch runtime"))] - InstallTokioBatchRuntime { source: TraceError }, + #[snafu(display("Failed to build the OTLP span exporter"))] + BuildSpanExporter { + source: opentelemetry_otlp::ExporterBuildError, + }, - #[snafu(display("Failed to create metrics pipeline"))] - CreateMetricsPipeline { source: MetricsError }, + #[snafu(display("Failed to build the OTLP metric exporter"))] + BuildMetricExporter { + source: opentelemetry_otlp::ExporterBuildError, + }, #[snafu(display("Failed to create OpenTelemetry Prometheus exporter"))] - CreateOpenTelemetryPrometheusExporter { source: MetricsError }, + CreateOpenTelemetryPrometheusExporter { + source: opentelemetry_sdk::error::OTelSdkError, + }, #[snafu(display("Failed to set global tracing subscriber"))] SetGlobalTracingSubscriber { source: SetGlobalDefaultError }, - - #[snafu(display("Failed to set up metrics"))] - SetUpMetrics { source: metrics::Error }, } +/// Sets up tracing and metrics. +/// +/// Returns the [`Metrics`] handle and, if OTLP tracing is enabled, the [`SdkTracerProvider`] so the +/// caller can flush and shut it down on exit (the global `shutdown_tracer_provider` helper was +/// removed in opentelemetry 0.32). pub fn init( tracing_config: Option<&TrinoLbTracingConfig>, persistence: Arc, config: &Config, -) -> Result { +) -> Result<(Metrics, Option), Error> { let env_filter_layer = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); let console_output_layer = tracing_subscriber::fmt::layer().with_filter(env_filter_layer); let mut layers = vec![console_output_layer.boxed()]; + let mut tracer_provider = None; if let Some(tracing_config) = tracing_config && tracing_config.enabled { let env_filter_layer = EnvFilter::builder() .with_default_directive(LevelFilter::DEBUG.into()) .from_env_lossy(); + + let provider = otel_tracer_provider(tracing_config)?; layers.push( tracing_opentelemetry::layer() .with_error_records_to_exceptions(true) - .with_tracer(otel_tracer(tracing_config)?) + .with_tracer(provider.tracer("trino-lb")) .with_filter(env_filter_layer) .boxed(), ); + tracer_provider = Some(provider); } let registry = prometheus::Registry::new(); @@ -74,10 +79,7 @@ pub fn init( .build() .context(CreateOpenTelemetryPrometheusExporterSnafu)?; - let meter_provider = SdkMeterProvider::builder() - .with_view(setup_custom_metrics) - .with_reader(exporter) - .build(); + let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build(); tracing::subscriber::set_global_default(tracing_subscriber::registry().with(layers)) .context(SetGlobalTracingSubscriberSnafu)?; @@ -85,87 +87,77 @@ pub fn init( opentelemetry::global::set_meter_provider(meter_provider); opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - let metrics = Metrics::new(registry, persistence, config).context(SetUpMetricsSnafu)?; + let metrics = Metrics::new(registry, persistence, config); - Ok(metrics) + Ok((metrics, tracer_provider)) } -fn otel_tracer(tracing_config: &TrinoLbTracingConfig) -> Result { - let provider = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(exporter(tracing_config)) - .with_trace_config( - trace::Config::default() - .with_sampler(Sampler::AlwaysOn) - .with_id_generator(RandomIdGenerator::default()) - .with_max_attributes_per_span(16) - .with_max_events_per_span(16) - .with_resource(Resource::new(vec![KeyValue::new( - "service.name", - "trino-lb", - )])), - ) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .context(InstallTokioBatchRuntimeSnafu)?; +fn otel_tracer_provider(tracing_config: &TrinoLbTracingConfig) -> Result { + let exporter = configure_exporter(SpanExporter::builder().with_tonic(), tracing_config) + .build() + .context(BuildSpanExporterSnafu)?; + + // The batch span processor manages its own background thread as of opentelemetry_sdk 0.32, so + // there is no longer an explicit runtime (previously `install_batch(runtime::Tokio)`). The span + // limits and sampler that used to live on `trace::Config` are now set directly on the builder. + let provider = SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_sampler(Sampler::AlwaysOn) + .with_id_generator(RandomIdGenerator::default()) + .with_max_attributes_per_span(16) + .with_max_events_per_span(16) + .with_resource(Resource::builder().with_service_name("trino-lb").build()) + .build(); global::set_tracer_provider(provider.clone()); - Ok(provider.tracer("trino-lb")) + Ok(provider) } +/// Currently unused, see the TODO in [`init`] about shipping Prometheus and OTLP metrics at the +/// same time. Kept (and migrated to the opentelemetry 0.32 API) as a reference for the OTLP metrics +/// push path. fn _otel_meter(tracing_config: &TrinoLbTracingConfig) -> Result { - opentelemetry_otlp::new_pipeline() - .metrics(opentelemetry_sdk::runtime::Tokio) - .with_exporter(exporter(tracing_config)) - .with_resource(Resource::new(vec![KeyValue::new( - "service.name", - "trino-lb", - )])) - .with_period(Duration::from_secs(3)) + let exporter = configure_exporter(MetricExporter::builder().with_tonic(), tracing_config) + // `DefaultAggregationSelector`/`DefaultTemporalitySelector` were removed in 0.32. The + // default temporality (cumulative) matches the previous `DefaultTemporalitySelector`, and + // aggregation is now picked by the SDK/views instead of an exporter-level selector. + .with_temporality(Temporality::default()) .with_timeout(Duration::from_secs(10)) - .with_aggregation_selector(DefaultAggregationSelector::new()) - .with_temporality_selector(DefaultTemporalitySelector::new()) .build() - .context(CreateMetricsPipelineSnafu) + .context(BuildMetricExporterSnafu)?; + + let reader = PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(3)) + .build(); + + Ok(SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(Resource::builder().with_service_name("trino-lb").build()) + .build()) } -fn exporter(tracing_config: &TrinoLbTracingConfig) -> TonicExporterBuilder { - let mut exporter = opentelemetry_otlp::new_exporter().tonic(); +/// Applies the configured endpoint, protocol and compression to an OTLP exporter builder. +/// +/// Works for both the span and metric tonic exporter builders, which both implement +/// [`WithExportConfig`] and [`WithTonicConfig`]. +fn configure_exporter( + mut builder: B, + tracing_config: &TrinoLbTracingConfig, +) -> B { if let Some(endpoint) = &tracing_config.otlp_endpoint { - exporter = exporter.with_endpoint(endpoint.as_str()); + builder = builder.with_endpoint(endpoint.as_str()); } if let Some(protocol) = tracing_config.otlp_protocol { - exporter = exporter.with_protocol(protocol); + builder = builder.with_protocol(protocol); } if let Some(compression) = tracing_config.otlp_compression { - exporter = exporter.with_compression(compression); + builder = builder.with_compression(compression); } // In case endpoint and protocol are not set here, they will still be read from the env vars // OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_PROTOCOL - exporter -} - -fn setup_custom_metrics(i: &Instrument) -> Option { - if i.name == "query_queued_duration" { - Some( - Stream::new() - .name(i.name.clone()) - .description(i.description.clone()) - .unit(i.unit.clone()) - .aggregation(Aggregation::ExplicitBucketHistogram { - // Copied and adopted from https://github.com/open-telemetry/opentelemetry-rust/blob/7d0b80ea852eb3218504b722476484063802a9a4/opentelemetry-sdk/src/metrics/reader.rs#L151-L154 - boundaries: vec![ - 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, - 2500.0, 5000.0, 7500.0, 10000.0, 25000.0, 50000.0, 75000.0, 100000.0, - 250000.0, 500000.0, 750000.0, 1000000.0, 2500000.0, - ], - record_min_max: true, - }), - ) - } else { - None - } + builder } pub fn add_current_context_to_client_request(