Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/serve-shape-request-metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
'@core/electric-telemetry': patch
'@core/sync-service': patch
---

Export a per-request `electric.plug.serve_shape.requests.count` metric tagged
by `status`, `known_error` and `live`.

Existing `serve_shape` metrics drop live (long-poll) requests and are not
dimensioned by response status, so they can't answer "what's my request mix /
error rate right now". This counter intentionally counts every request
(including live) and is unsampled, making it a reliable request-health signal
that doesn't depend on trace sampling. Admission-control rejections show up
here as `status=503, known_error=true` (the conn is halted but still flows
through `emit_shape_telemetry/1`), so overload is visible alongside normal
traffic. The `known_error` tag mirrors the `electric-internal-known-error`
response header, so it matches the classification downstream consumers key on.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ defmodule ElectricTelemetry.StackTelemetry do
unit: :byte,
keep: fn metadata -> metadata[:live] != true end
),
counter("electric.plug.serve_shape.requests.count",
event_name: [:electric, :plug, :serve_shape],
measurement: :count,
tags: [:status, :known_error, :live]
),
distribution("electric.shape.response_size.bytes",
unit: :byte,
tags: [:root_table, :is_live, :stack_id]
Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ defmodule Electric.Plug.ServeShapePlug do
shape_handle: get_handle(assigns) || conn.query_params["handle"],
client_ip: conn.remote_ip,
status: conn.status,
stack_id: stack_id
stack_id: stack_id,
known_error: Api.Response.conn_has_known_error?(conn)
}
)

Expand Down
7 changes: 7 additions & 0 deletions packages/sync-service/lib/electric/shapes/api/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,13 @@ defmodule Electric.Shapes.Api.Response do
Plug.Conn.put_resp_header(conn, @electric_known_error_header, "#{known_error}")
end

# keeping this function close to `put_known_error_header/2` above so that we know exactly
# which value to expect for a set known_error header: i.e. "true" or "false" (and absent when
# known_error is nil), as opposed to e.g. "1" etc.
def conn_has_known_error?(conn) do
Plug.Conn.get_resp_header(conn, @electric_known_error_header) == ["true"]
end

defp put_retry_after_header(conn, %__MODULE__{retry_after: nil}) do
conn
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ defmodule Electric.Plug.ServeShapePlugTest do
sse_timeout: sse_timeout(ctx),
max_age: max_age(ctx),
stale_age: stale_age(ctx),
max_concurrent_requests: %{initial: 300, existing: 10_000}
max_concurrent_requests:
Access.get(ctx, :max_concurrent_requests, %{initial: 300, existing: 10_000})
)
end

Expand Down Expand Up @@ -1044,6 +1045,101 @@ defmodule Electric.Plug.ServeShapePlugTest do
:telemetry.detach(handler_id)
end
end

test "[:electric, :plug, :serve_shape] tags a successful response with known_error: false",
ctx do
expect_shape_cache(
get_or_create_shape_handle: fn @test_shape, _stack_id, _opts ->
{@test_shape_handle, @test_offset}
end,
await_snapshot_start: fn @test_shape_handle, _ -> :started end
)

patch_shape_cache(has_shape?: fn @test_shape_handle, _opts -> true end)

next_offset = LogOffset.increment(@first_offset)

expect_storage(
get_chunk_end_log_offset: fn @before_all_offset, _ ->
@first_offset
end,
get_log_stream: fn @before_all_offset, @first_offset, @test_opts ->
[Jason.encode!(%{key: "log", value: "foo", headers: %{}, offset: next_offset})]
end
)

test_pid = self()
ref = make_ref()
handler_id = "test-serve-shape-known-error-false-#{inspect(ref)}"

:telemetry.attach(
handler_id,
[:electric, :plug, :serve_shape],
fn event, measurements, metadata, _config ->
send(test_pid, {:telemetry_serve_shape, event, measurements, metadata})
end,
nil
)

stack_id = ctx.stack_id

try do
conn =
ctx
|> conn(:get, %{"table" => "public.users"}, "?offset=-1")
|> call_serve_shape_plug(ctx)

assert conn.status == 200

assert_receive {:telemetry_serve_shape, [:electric, :plug, :serve_shape], _measurements,
%{stack_id: ^stack_id} = metadata}

assert metadata.status == 200
assert metadata.known_error == false
after
:telemetry.detach(handler_id)
end
end

test "[:electric, :plug, :serve_shape] tags an admission-rejected 503 with known_error: true",
ctx do
# Force the load-shedding path: with a zero concurrency limit, check_admission
# rejects immediately with a 503 carrying the `electric-internal-known-error`
# header.
ctx = Map.put(ctx, :max_concurrent_requests, %{initial: 0, existing: 0})

test_pid = self()
ref = make_ref()
handler_id = "test-serve-shape-known-error-true-#{inspect(ref)}"

:telemetry.attach(
handler_id,
[:electric, :plug, :serve_shape],
fn event, measurements, metadata, _config ->
send(test_pid, {:telemetry_serve_shape, event, measurements, metadata})
end,
nil
)

stack_id = ctx.stack_id

try do
conn =
ctx
|> conn(:get, %{"table" => "public.users"}, "?offset=-1")
|> call_serve_shape_plug(ctx)

assert conn.status == 503

assert_receive {:telemetry_serve_shape, [:electric, :plug, :serve_shape], _measurements,
%{stack_id: ^stack_id} = metadata}

assert metadata.status == 503
assert metadata.known_error == true
after
:telemetry.detach(handler_id)
end
end
end

describe "serving shapes with sse mode" do
Expand Down
Loading