diff --git a/Cargo.lock b/Cargo.lock index bdcc1046..a8798faa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,7 +70,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -81,7 +81,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1014,7 +1014,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -1096,6 +1096,8 @@ version = "0.2.0" dependencies = [ "clap", "fluss-rs", + "metrics", + "metrics-exporter-prometheus", "tikv-jemallocator", "tokio", ] @@ -1835,7 +1837,7 @@ dependencies = [ "portable-atomic-util", "serde_core", "wasm-bindgen", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2084,6 +2086,26 @@ dependencies = [ "rapidhash", ] +[[package]] +name = "metrics-exporter-prometheus" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" +dependencies = [ + "base64 0.22.1", + "http-body-util", + "hyper", + "hyper-util", + "indexmap 2.13.1", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror 2.0.18", + "tokio", + "tracing", +] + [[package]] name = "metrics-util" version = "0.20.3" @@ -3014,7 +3036,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3373,7 +3395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -3567,7 +3589,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4269,7 +4291,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/README.md b/README.md index 63d6a9d3..4f4768d5 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ Key concepts: | **Data Types** | Int, BigInt, String, Float, Double, Boolean, Bytes, Decimal, Date, Time, Timestamp, TimestampLTZ, Char, Binary | | **Config** | Batch sizing, buffering, retries, compression, timeouts, prefetch, concurrency | | **Storage** | Memory, Filesystem, S3, OSS (via [OpenDAL](https://opendal.apache.org/)) | +| **Observability** | Connection, writer, and scanner [metrics](https://clients.fluss.apache.org/docs/user-guide/rust/metrics/) via the [`metrics`](https://docs.rs/metrics) facade (Prometheus, StatsD, etc.) | | **WASM** | Compiles for `wasm32` target | ### Language Bindings @@ -83,7 +84,7 @@ fluss-rust/ │ │ ├── src/row/ # GenericRow, InternalRow, Arrow integration │ │ ├── src/rpc/ # gRPC transport layer │ │ └── src/config.rs # Client configuration -│ ├── examples/ # 5 runnable examples (log, KV, partitioned, prefix lookup) +│ ├── examples/ # runnable examples (log, KV, partitioned, prefix lookup, metrics) │ └── fluss-test-cluster/ # Test harness for integration tests ├── bindings/ │ ├── python/ # Python binding (PyO3) @@ -236,6 +237,7 @@ async fn main() -> Result<()> { | `example-partitioned-upsert-lookup` | KV table with partitions | | `example-prefix-lookup` | Prefix lookup on bucket keys | | `example-partitioned-prefix-lookup` | Prefix lookup on partitioned tables | +| `example-prometheus-metrics` | Expose client metrics on a Prometheus endpoint | Build and run any example: diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 45f029ee..7203095b 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -27,6 +27,8 @@ version = { workspace = true } fluss = { workspace = true, features = ["storage-all"] } tokio = { workspace = true } clap = { workspace = true } +metrics = { workspace = true } +metrics-exporter-prometheus = { version = "0.17", default-features = false, features = ["http-listener"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" @@ -50,3 +52,7 @@ path = "src/example_prefix_lookup.rs" [[example]] name = "example-partitioned-prefix-lookup" path = "src/example_partitioned_prefix_lookup.rs" + +[[example]] +name = "example-prometheus-metrics" +path = "src/example_prometheus_metrics.rs" diff --git a/crates/examples/src/example_prometheus_metrics.rs b/crates/examples/src/example_prometheus_metrics.rs new file mode 100644 index 00000000..24aee2ce --- /dev/null +++ b/crates/examples/src/example_prometheus_metrics.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Exposes Fluss client metrics on a Prometheus scrape endpoint. +//! +//! Run a local cluster, then: +//! ```shell +//! cargo run -p fluss-examples --example example-prometheus-metrics +//! curl http://localhost:9000/metrics +//! ``` +//! The endpoint exposes `fluss_client_writer_*`, `fluss_client_scanner_*`, and +//! `fluss_client_requests_*` series produced by the workload below. + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + +use clap::Parser; +use fluss::client::FlussConnection; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; +use fluss::row::{DataGetters, GenericRow}; +use metrics_exporter_prometheus::PrometheusBuilder; +use std::time::Duration; + +#[tokio::main] +pub async fn main() -> Result<()> { + // Install the global Prometheus recorder BEFORE creating any connection, + // writer, or scanner: the client caches metric handles on first use and + // binds them to whichever recorder is installed at that moment. + // + // `build()` (rather than `install()`) hands back a `PrometheusHandle` so the + // example can read its own metrics back and self-verify; the returned + // exporter future runs the HTTP scrape endpoint. + let (recorder, exporter) = PrometheusBuilder::new() + .with_http_listener(([0, 0, 0, 0], 9000)) + .build() + .expect("failed to build Prometheus recorder"); + let metrics_handle = recorder.handle(); + metrics::set_global_recorder(recorder).expect("failed to install global recorder"); + tokio::spawn(exporter); + println!("Metrics exposed on http://localhost:9000/metrics"); + + let mut config = Config::parse(); + config.bootstrap_servers = "127.0.0.1:9123".to_string(); + + let conn = FlussConnection::new(config).await?; + let admin = conn.get_admin()?; + + let table_path = TablePath::new("fluss", "rust_prometheus_metrics"); + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("message", DataTypes::string()) + .build()?, + ) + .build()?; + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + + let table = conn.get_table(&table_path).await?; + let append_writer = table.new_append()?.create_writer()?; + let log_scanner = table.new_scan().create_log_scanner()?; + log_scanner.subscribe(0, 0).await?; + + // Continuously write and read so the metrics keep updating and can be + // observed over multiple Prometheus scrapes. + let rows_per_iter = 100; + let mut id = 0i32; + let mut verified = false; + loop { + for _ in 0..rows_per_iter { + let mut row = GenericRow::new(2); + row.set_field(0, id); + row.set_field(1, "metrics demo"); + append_writer.append(&row)?; + id += 1; + } + append_writer.flush().await?; + + let scan_records = log_scanner.poll(Duration::from_secs(1)).await?; + let mut count = 0; + for record in scan_records { + let row = record.row(); + let _ = (row.get_int(0)?, row.get_string(1)?, record.offset()); + count += 1; + } + println!( + "appended {rows_per_iter} rows, polled {count} rows; scrape /metrics to see counters" + ); + + // After the first flush every appended row has been acknowledged, so the + // writer counter must have advanced by at least the rows we sent (retries + // can push it higher). This proves the recorder is wired up correctly. + if !verified { + let rendered = metrics_handle.render(); + let sent = counter_value(&rendered, "fluss_client_writer_records_send_total"); + assert!( + sent.is_some_and(|v| v >= rows_per_iter as f64), + "expected fluss_client_writer_records_send_total >= {rows_per_iter}, got {sent:?}\n{rendered}" + ); + println!("self-check OK: records_send_total = {}", sent.unwrap()); + verified = true; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +/// Parse the value of an unlabeled counter/gauge line from rendered Prometheus +/// exposition text (lines shaped `metric_name `). +fn counter_value(rendered: &str, name: &str) -> Option { + let prefix = format!("{name} "); + rendered + .lines() + .find(|line| line.starts_with(&prefix)) + .and_then(|line| line.rsplit(' ').next()) + .and_then(|value| value.parse().ok()) +} diff --git a/website/docs/user-guide/rust/metrics.md b/website/docs/user-guide/rust/metrics.md new file mode 100644 index 00000000..c7fa1af6 --- /dev/null +++ b/website/docs/user-guide/rust/metrics.md @@ -0,0 +1,179 @@ +--- +sidebar_position: 5 +--- +# Metrics + +The Fluss Rust client is instrumented with client-side metrics for the connection +layer, the write pipeline, and the read (scanner) pipeline. Metrics are emitted +through the [`metrics`](https://docs.rs/metrics) crate facade, so collecting them is +opt-in and costs nothing until you install a recorder. + +## How it works + +The client never decides *where* metrics go. It only emits them via the `metrics` +facade. Your application installs a global **recorder** (for example a Prometheus +exporter), and that recorder decides how to store and expose the values. + +- **No recorder installed** — every metric call is a zero-cost no-op. This is the + default, so the client adds no overhead unless you opt in. +- **Recorder installed** — values flow to whatever backend the recorder represents + (Prometheus, StatsD, OpenTelemetry, a test recorder, etc.). + +This differs from the Fluss Java client, where metric reporters are configured +server-side in `conf/server.yaml` (`metrics.reporters: jmx,prometheus`) and +discovered through plugins. The Rust client instead follows the idiomatic Rust +`metrics` ecosystem: the application owns recorder installation, and rate +computation is left to the backend (e.g. PromQL `rate()`) instead of a built-in +background rate thread. + +## Installing a recorder + +Use any [`metrics`-compatible exporter](https://docs.rs/metrics/latest/metrics/#related-crates). +The example below uses `metrics-exporter-prometheus` to expose a scrape endpoint: + +```rust +use metrics_exporter_prometheus::PrometheusBuilder; + +PrometheusBuilder::new() + .with_http_listener(([0, 0, 0, 0], 9000)) + .install() + .expect("failed to install Prometheus recorder"); +``` + +A full, runnable program is available as the `example-prometheus-metrics` example +in the `fluss-examples` crate. + +:::warning Install the recorder before writing or scanning +The client caches metric handles the first time a writer or scanner is created, +binding them to whichever recorder is installed at that moment. Install your global +recorder **before** calling `FlussConnection::new` (ideally as the very first thing +in `main`). If you install it after creating a writer or scanner, those metrics will +be bound to the no-op recorder and never appear. +::: + +## Metric catalog + +All metric names use the `fluss.client.` prefix. A Prometheus exporter translates +`.` to `_`, so `fluss.client.writer.send_latency_ms` is scraped as +`fluss_client_writer_send_latency_ms`. + +### Connection / RPC + +Recorded per RPC for the four reportable API keys. Labeled with `api_key` +(`produce_log`, `fetch_log`, `put_kv`, `lookup`). + +| Metric | Type | Description | +| --- | --- | --- | +| `fluss.client.requests.total` | Counter | Requests sent to a server. | +| `fluss.client.responses.total` | Counter | Responses received from a server. | +| `fluss.client.bytes_sent.total` | Counter | Request body bytes sent (excludes protocol framing). | +| `fluss.client.bytes_received.total` | Counter | Response body bytes received (excludes protocol framing). | +| `fluss.client.request_latency_ms` | Histogram | Round-trip latency per request, in milliseconds. | +| `fluss.client.requests_in_flight` | Gauge | Requests currently awaiting a response. | + +### Writer + +Recorded in the write pipeline. These metrics are **unlabeled** (one series per +process), matching Java's `WriterMetricGroup`, which carries no table label. + +| Metric | Type | Description | +| --- | --- | --- | +| `fluss.client.writer.send_latency_ms` | Histogram | Round-trip latency of each write request (ProduceLog / PutKv). | +| `fluss.client.writer.batch_queue_time_ms` | Histogram | Time a batch spent queued in the accumulator before being drained. | +| `fluss.client.writer.records_send.total` | Counter | Records handed to the cluster across all sent batches. | +| `fluss.client.writer.bytes_send.total` | Counter | Serialized batch bytes sent. | +| `fluss.client.writer.records_retry.total` | Counter | Records re-enqueued for retry. | +| `fluss.client.writer.records_per_batch` | Histogram | Records per sent batch. | +| `fluss.client.writer.bytes_per_batch` | Histogram | Serialized bytes per sent batch. | +| `fluss.client.writer.buffer_total_bytes` | Gauge | Total writer buffer memory, in bytes. | +| `fluss.client.writer.buffer_available_bytes` | Gauge | Currently available writer buffer memory, in bytes. | +| `fluss.client.writer.buffer_waiting_threads` | Gauge | Producer threads blocked waiting for buffer memory (backpressure signal). | + +### Scanner + +Recorded in the read pipeline. Labeled with `database` and `table`, so each scanned +table gets its own series. + +| Metric | Type | Description | +| --- | --- | --- | +| `fluss.client.scanner.time_between_poll_ms` | Gauge | Milliseconds between the start of consecutive `poll()` calls. | +| `fluss.client.scanner.poll_idle_ratio` | Gauge | Fraction of wall-clock time spent inside `poll()` (near 1.0 means starved for data). | +| `fluss.client.scanner.last_poll_seconds_ago` | Gauge | Seconds since the most recent `poll()` started (stuck-consumer signal). | +| `fluss.client.scanner.fetch_latency_ms` | Histogram | Latency of each successful FetchLog RPC, in milliseconds. | +| `fluss.client.scanner.fetch_requests.total` | Counter | FetchLog RPC requests attempted. | +| `fluss.client.scanner.bytes_per_request` | Histogram | Serialized bytes per successful FetchLog response. | +| `fluss.client.scanner.remote_fetch_requests.total` | Counter | Remote log download attempts (includes per-segment retries). | +| `fluss.client.scanner.remote_fetch_bytes.total` | Counter | Bytes downloaded from remote log storage. | +| `fluss.client.scanner.remote_fetch_errors.total` | Counter | Remote log download failures (each retry counts). | + +## Differences from the Java client + +The Rust client records the same events at the same points as Java, but uses +metric types better suited to the `metrics` ecosystem: + +| Aspect | Java | Rust | +| --- | --- | --- | +| Latency metrics (`send_latency_ms`, `batch_queue_time_ms`, `fetch_latency_ms`) | Volatile-long gauge (latest value only) | Histogram (full p50/p95/p99 distribution) | +| Throughput / retry metrics | `MeterView` rate computed on a background thread | Raw counter; compute the rate with PromQL `rate()` | +| Writer table label | No table label | No table label (kept identical) | + +These are client-internal implementation choices; the values reported to the server +are unaffected. + +## Tuning histogram buckets + +The default buckets in `metrics-exporter-prometheus` may not give meaningful +percentiles for sub-millisecond or multi-second latencies. Configure per-metric +buckets when installing the recorder: + +```rust +use metrics_exporter_prometheus::PrometheusBuilder; +use metrics::Unit; + +PrometheusBuilder::new() + .set_buckets_for_metric( + metrics_exporter_prometheus::Matcher::Suffix("_ms".to_string()), + &[0.5, 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0], + ) + .expect("invalid bucket configuration") + .with_http_listener(([0, 0, 0, 0], 9000)) + .install() + .expect("failed to install Prometheus recorder"); + +let _ = Unit::Milliseconds; // optional: register units for richer metadata +``` + +## Cardinality + +Scanner metrics carry `database` and `table` labels, so the number of scanner +series scales with the number of tables a single client scans. This is normally +small, but if your client scans many short-lived tables, the series count can grow +over time. Connection metrics are bounded (four API keys) and writer metrics are +unlabeled, so neither contributes meaningfully to cardinality. + +## Grafana / PromQL tips + +By default, `metrics-exporter-prometheus` emits histogram metrics as Prometheus +**summaries** (no `_bucket` series). The `_bucket`-based `histogram_quantile` +query only works if you enabled bucket mode via `set_buckets(...)` or +`set_buckets_for_metric(...)` in the previous section. + +```promql +# Write throughput (records/sec), Java-style rate from the raw counter +rate(fluss_client_writer_records_send_total[1m]) + +# p99 send latency (default summary mode) +fluss_client_writer_send_latency_ms{quantile="0.99"} + +# p99 send latency (bucket mode enabled) +histogram_quantile(0.99, sum(rate(fluss_client_writer_send_latency_ms_bucket[5m])) by (le)) + +# Backpressure: producers blocked waiting for buffer memory +fluss_client_writer_buffer_waiting_threads + +# Per-table fetch rate +sum(rate(fluss_client_scanner_fetch_requests_total[1m])) by (database, table) + +# Stuck-consumer alert: no poll in the last 60s +fluss_client_scanner_last_poll_seconds_ago > 60 +```