diff --git a/.changeset/serve-shape-request-metrics.md b/.changeset/serve-shape-request-metrics.md new file mode 100644 index 0000000000..367e558fcb --- /dev/null +++ b/.changeset/serve-shape-request-metrics.md @@ -0,0 +1,17 @@ +--- +'@core/electric-telemetry': patch +'@core/sync-service': patch +--- + +Export a per-request `electric.plug.serve_shape.requests.count` metric tagged +by `status`, `known_error` and `live`. + +Existing `serve_shape` metrics drop live (long-poll) requests and are not +dimensioned by response status, so they can't answer "what's my request mix / +error rate right now". This counter intentionally counts every request +(including live) and is unsampled, making it a reliable request-health signal +that doesn't depend on trace sampling. Admission-control rejections show up +here as `status=503, known_error=true` (the conn is halted but still flows +through `emit_shape_telemetry/1`), so overload is visible alongside normal +traffic. The `known_error` tag mirrors the `electric-internal-known-error` +response header, so it matches the classification downstream consumers key on. diff --git a/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex index 9393adf765..105d1c0c35 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex @@ -93,6 +93,11 @@ defmodule ElectricTelemetry.StackTelemetry do unit: :byte, keep: fn metadata -> metadata[:live] != true end ), + counter("electric.plug.serve_shape.requests.count", + event_name: [:electric, :plug, :serve_shape], + measurement: :count, + tags: [:status, :known_error, :live] + ), distribution("electric.shape.response_size.bytes", unit: :byte, tags: [:root_table, :is_live, :stack_id] diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index d7a67d64e4..2f3609e862 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -451,7 +451,8 @@ defmodule Electric.Plug.ServeShapePlug do shape_handle: get_handle(assigns) || conn.query_params["handle"], client_ip: conn.remote_ip, status: conn.status, - stack_id: stack_id + stack_id: stack_id, + known_error: Api.Response.conn_has_known_error?(conn) } ) diff --git a/packages/sync-service/lib/electric/shapes/api/response.ex b/packages/sync-service/lib/electric/shapes/api/response.ex index 121762e2ae..f7276e6cd7 100644 --- a/packages/sync-service/lib/electric/shapes/api/response.ex +++ b/packages/sync-service/lib/electric/shapes/api/response.ex @@ -390,6 +390,13 @@ defmodule Electric.Shapes.Api.Response do Plug.Conn.put_resp_header(conn, @electric_known_error_header, "#{known_error}") end + # keeping this function close to `put_known_error_header/2` above so that we know exactly + # which value to expect for a set known_error header: i.e. "true" or "false" (and absent when + # known_error is nil), as opposed to e.g. "1" etc. + def conn_has_known_error?(conn) do + Plug.Conn.get_resp_header(conn, @electric_known_error_header) == ["true"] + end + defp put_retry_after_header(conn, %__MODULE__{retry_after: nil}) do conn end diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index 6aa840f8de..44cb0b4dc6 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -86,7 +86,8 @@ defmodule Electric.Plug.ServeShapePlugTest do sse_timeout: sse_timeout(ctx), max_age: max_age(ctx), stale_age: stale_age(ctx), - max_concurrent_requests: %{initial: 300, existing: 10_000} + max_concurrent_requests: + Access.get(ctx, :max_concurrent_requests, %{initial: 300, existing: 10_000}) ) end @@ -1044,6 +1045,101 @@ defmodule Electric.Plug.ServeShapePlugTest do :telemetry.detach(handler_id) end end + + test "[:electric, :plug, :serve_shape] tags a successful response with known_error: false", + ctx do + expect_shape_cache( + get_or_create_shape_handle: fn @test_shape, _stack_id, _opts -> + {@test_shape_handle, @test_offset} + end, + await_snapshot_start: fn @test_shape_handle, _ -> :started end + ) + + patch_shape_cache(has_shape?: fn @test_shape_handle, _opts -> true end) + + next_offset = LogOffset.increment(@first_offset) + + expect_storage( + get_chunk_end_log_offset: fn @before_all_offset, _ -> + @first_offset + end, + get_log_stream: fn @before_all_offset, @first_offset, @test_opts -> + [Jason.encode!(%{key: "log", value: "foo", headers: %{}, offset: next_offset})] + end + ) + + test_pid = self() + ref = make_ref() + handler_id = "test-serve-shape-known-error-false-#{inspect(ref)}" + + :telemetry.attach( + handler_id, + [:electric, :plug, :serve_shape], + fn event, measurements, metadata, _config -> + send(test_pid, {:telemetry_serve_shape, event, measurements, metadata}) + end, + nil + ) + + stack_id = ctx.stack_id + + try do + conn = + ctx + |> conn(:get, %{"table" => "public.users"}, "?offset=-1") + |> call_serve_shape_plug(ctx) + + assert conn.status == 200 + + assert_receive {:telemetry_serve_shape, [:electric, :plug, :serve_shape], _measurements, + %{stack_id: ^stack_id} = metadata} + + assert metadata.status == 200 + assert metadata.known_error == false + after + :telemetry.detach(handler_id) + end + end + + test "[:electric, :plug, :serve_shape] tags an admission-rejected 503 with known_error: true", + ctx do + # Force the load-shedding path: with a zero concurrency limit, check_admission + # rejects immediately with a 503 carrying the `electric-internal-known-error` + # header. + ctx = Map.put(ctx, :max_concurrent_requests, %{initial: 0, existing: 0}) + + test_pid = self() + ref = make_ref() + handler_id = "test-serve-shape-known-error-true-#{inspect(ref)}" + + :telemetry.attach( + handler_id, + [:electric, :plug, :serve_shape], + fn event, measurements, metadata, _config -> + send(test_pid, {:telemetry_serve_shape, event, measurements, metadata}) + end, + nil + ) + + stack_id = ctx.stack_id + + try do + conn = + ctx + |> conn(:get, %{"table" => "public.users"}, "?offset=-1") + |> call_serve_shape_plug(ctx) + + assert conn.status == 503 + + assert_receive {:telemetry_serve_shape, [:electric, :plug, :serve_shape], _measurements, + %{stack_id: ^stack_id} = metadata} + + assert metadata.status == 503 + assert metadata.known_error == true + after + :telemetry.detach(handler_id) + end + end end describe "serving shapes with sse mode" do