Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nice-bugs-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Add sync-service telemetry for indexed vs unindexed shape counts, backed by maintained in-memory counters so periodic metrics stay O(1) even on very large stacks.
9 changes: 9 additions & 0 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ defmodule Electric.ShapeCache do
ArgumentError -> :error
end

@spec shape_counts(stack_id()) ::
%{total: non_neg_integer(), indexed: non_neg_integer(), unindexed: non_neg_integer()}
| :error
def shape_counts(stack_id) when is_stack_id(stack_id) do
ShapeStatus.shape_counts(stack_id)
rescue
ArgumentError -> :error
end

@spec clean_shape(shape_handle(), stack_id()) :: :ok
def clean_shape(shape_handle, stack_id)
when is_shape_handle(shape_handle) and is_stack_id(stack_id) do
Expand Down
136 changes: 128 additions & 8 deletions packages/sync-service/lib/electric/shape_cache/shape_status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
to access the data in the ETS from anywhere, so there's an internal api,
using the full state and an external api using just the table name.
"""
alias Electric.Shapes.Filter
alias Electric.Shapes.Shape
alias Electric.ShapeCache.ShapeStatus.ShapeDb
alias Electric.Telemetry.OpenTelemetry
Expand All @@ -25,12 +26,18 @@ defmodule Electric.ShapeCache.ShapeStatus do

@type stack_id() :: Electric.stack_id()
@type shape_handle() :: Electric.shape_handle()
@type shape_counts() :: %{
total: non_neg_integer(),
indexed: non_neg_integer(),
unindexed: non_neg_integer()
}

# MUST be updated when Shape.comparable/1 changes.
@version 8

# Tuple format: {handle, hash, snapshot_started, last_read_time, generation}
@shape_last_used_time_pos 4
@shape_counts_key :counts

@spec version() :: pos_integer()
def version, do: @version
Expand Down Expand Up @@ -66,7 +73,8 @@ defmodule Electric.ShapeCache.ShapeStatus do
Electric.ShapeCache.ShapeCleaner.remove_shape_storage_async(
stack_id,
invalid_handles
) do
),
:ok <- rebuild_shape_routing_state(stack_id) do
populate_shape_meta_table(stack_id, 0)
end
end
Expand All @@ -90,7 +98,8 @@ defmodule Electric.ShapeCache.ShapeStatus do
Electric.ShapeCache.ShapeCleaner.remove_shape_storage_async(
stack_id,
invalid_handles
) do
),
:ok <- rebuild_shape_routing_state(stack_id) do
# Use a generation counter to avoid clearing the table (which would race
# with concurrent readers). Upsert all current shapes with a new generation,
# then delete any entries still on the old generation.
Expand All @@ -109,6 +118,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
def add_shape(stack_id, shape) when is_stack_id(stack_id) do
OpenTelemetry.with_child_span("shape_status.add_shape", [], stack_id, fn ->
{_, shape_handle} = Shape.generate_id(shape)
indexed? = Filter.indexed_shape?(shape)

# Add the lookup last as it is the one that enables clients to find the shape
with {:ok, shape_hash} <- ShapeDb.add_shape(stack_id, shape, shape_handle) do
Expand All @@ -119,6 +129,8 @@ defmodule Electric.ShapeCache.ShapeStatus do
# They are sequentially ordered by the Connection.Manager state machine.
{shape_handle, shape_hash, false, nil, 0}
) do
:ets.insert(shape_indexability_table(stack_id), {shape_handle, indexed?})
increment_shape_counts(stack_id, indexed?)
{:ok, shape_handle}
else
{:error, "duplicate shape #{inspect(shape_handle)}: #{inspect(shape)}"}
Expand Down Expand Up @@ -158,7 +170,23 @@ defmodule Electric.ShapeCache.ShapeStatus do

@spec count_shapes(stack_id()) :: non_neg_integer()
def count_shapes(stack_id) when is_stack_id(stack_id) do
ShapeDb.count_shapes!(stack_id)
case shape_counts(stack_id) do
%{total: total} -> total
:error -> ShapeDb.count_shapes!(stack_id)
end
end

@spec shape_counts(stack_id()) :: shape_counts() | :error
def shape_counts(stack_id) when is_stack_id(stack_id) do
case :ets.lookup(shape_counts_table(stack_id), @shape_counts_key) do
[{@shape_counts_key, total, indexed, unindexed}] ->
%{total: total, indexed: indexed, unindexed: unindexed}

[] ->
:error
end
rescue
ArgumentError -> :error
end

@spec list_shape_handles_for_relations(stack_id(), [Electric.oid_relation()]) :: [
Expand All @@ -179,6 +207,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
def remove_shape(stack_id, shape_handle) when is_stack_id(stack_id) do
with :ok <- ShapeDb.remove_shape(stack_id, shape_handle) do
:ets.delete(shape_meta_table(stack_id), shape_handle)
decrement_shape_counts(stack_id, shape_cached_as_indexed?(stack_id, shape_handle))
:ok
end
end
Expand All @@ -187,6 +216,8 @@ defmodule Electric.ShapeCache.ShapeStatus do
def reset(stack_id) when is_stack_id(stack_id) do
:ok = ShapeDb.reset(stack_id)
:ets.delete_all_objects(shape_meta_table(stack_id))
:ets.delete_all_objects(shape_indexability_table(stack_id))
put_shape_counts(stack_id, empty_shape_counts())
:ok
end

Expand Down Expand Up @@ -366,14 +397,33 @@ defmodule Electric.ShapeCache.ShapeStatus do
defp shape_meta_table(stack_id),
do: :"shape_meta_table:#{stack_id}"

@spec shape_indexability_table(stack_id()) :: atom()
defp shape_indexability_table(stack_id),
do: :"shape_indexability_table:#{stack_id}"

@spec shape_counts_table(stack_id()) :: atom()
defp shape_counts_table(stack_id),
do: :"shape_counts_table:#{stack_id}"

defp create_shape_meta_table(stack_id) do
:ets.new(shape_meta_table(stack_id), [
:named_table,
:public,
:set,
ensure_state_table(shape_meta_table(stack_id),
read_concurrency: true,
write_concurrency: :auto
)

ensure_state_table(shape_indexability_table(stack_id),
read_concurrency: true,
write_concurrency: :auto
])
)

ensure_state_table(shape_counts_table(stack_id),
read_concurrency: true,
write_concurrency: true
)

:ets.delete_all_objects(shape_meta_table(stack_id))
:ets.delete_all_objects(shape_indexability_table(stack_id))
put_shape_counts(stack_id, empty_shape_counts())
end

defp populate_shape_meta_table(stack_id, generation) do
Expand All @@ -393,4 +443,74 @@ defmodule Electric.ShapeCache.ShapeStatus do

:ok
end

defp rebuild_shape_routing_state(stack_id) do
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the performance of going through all shapes in shapedb? Is it trivial enough for, say, 100k shapes, such that we don't need to add a tracing span around it for visibility?

case ShapeDb.reduce_shapes(stack_id, {empty_shape_counts(), []}, fn {shape_handle, shape},
{counts, entries} ->
indexed? = Filter.indexed_shape?(shape)
{update_shape_counts(counts, indexed?, 1), [{shape_handle, indexed?} | entries]}
end) do
{:error, _reason} = error ->
error

{counts, entries} ->
:ets.delete_all_objects(shape_indexability_table(stack_id))

if entries != [] do
true = :ets.insert(shape_indexability_table(stack_id), entries)
end

put_shape_counts(stack_id, counts)
:ok
end
end

defp ensure_state_table(table_name, opts) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [:named_table, :public, :set] ++ opts)
end
end

defp empty_shape_counts do
%{total: 0, indexed: 0, unindexed: 0}
end

defp put_shape_counts(stack_id, %{total: total, indexed: indexed, unindexed: unindexed}) do
:ets.insert(shape_counts_table(stack_id), {@shape_counts_key, total, indexed, unindexed})
end

defp increment_shape_counts(stack_id, indexed?) do
:ets.update_counter(
shape_counts_table(stack_id),
@shape_counts_key,
[{2, 1}, {3, if(indexed?, do: 1, else: 0)}, {4, if(indexed?, do: 0, else: 1)}],
{@shape_counts_key, 0, 0, 0}
)
end

defp decrement_shape_counts(_stack_id, nil), do: :ok

defp decrement_shape_counts(stack_id, indexed?) do
:ets.update_counter(
shape_counts_table(stack_id),
@shape_counts_key,
[{2, -1}, {3, if(indexed?, do: -1, else: 0)}, {4, if(indexed?, do: 0, else: -1)}],
{@shape_counts_key, 0, 0, 0}
)
end

defp shape_cached_as_indexed?(stack_id, shape_handle) do
case :ets.take(shape_indexability_table(stack_id), shape_handle) do
[{^shape_handle, indexed?}] -> indexed?
[] -> nil
end
end

defp update_shape_counts(%{total: total, indexed: indexed, unindexed: unindexed}, true, delta) do
%{total: total + delta, indexed: indexed + delta, unindexed: unindexed}
end

defp update_shape_counts(%{total: total, indexed: indexed, unindexed: unindexed}, false, delta) do
%{total: total + delta, indexed: indexed, unindexed: unindexed + delta}
end
end
14 changes: 14 additions & 0 deletions packages/sync-service/lib/electric/shapes/filter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ defmodule Electric.Shapes.Filter do
:ets.select(table, [{{:"$1", :_}, [], [:"$1"]}])
end

@doc """
Returns `true` when ShapeLogCollector can route the shape through any of its
indexes instead of relying exclusively on `other_shapes` scans.

This includes both the primary equality/inclusion indexes and the sublink
inverted index used for dependency-driven subquery routing.
"""
@spec indexed_shape?(Shape.t()) :: boolean()
def indexed_shape?(%Shape{} = shape) do
WhereCondition.indexed_where?(shape.where) or
(Shape.dependency_handles_known?(shape) and
map_size(extract_sublink_fields(shape.where)) > 0)
end

@doc """
Add a shape for the filter to track.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ defmodule Electric.Shapes.Filter.WhereCondition do
:ets.insert(table, {condition_id, {MapSet.new(), %{}}})
end

@doc """
Returns `true` when the WHERE clause can use the primary equality/inclusion
indexes maintained by the filter.
"""
@spec indexed_where?(Expr.t() | nil) :: boolean()
def indexed_where?(where_clause), do: optimise_where(where_clause) != :not_optimised

def add_shape(%Filter{where_cond_table: table} = filter, condition_id, shape_id, where_clause) do
case optimise_where(where_clause) do
:not_optimised ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ defmodule Electric.StackSupervisor.Telemetry do
def count_shapes(stack_id, _telemetry_opts) do
# Telemetry is started before everything else in the stack, so we need to handle
# the case where the shape cache is not started yet.
with num_shapes when is_integer(num_shapes) <- Electric.ShapeCache.count_shapes(stack_id) do
with %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} <-
Electric.ShapeCache.shape_counts(stack_id) do
Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shapes, :total_shapes],
%{count: num_shapes},
%{count: num_shapes, count_indexed: indexed_shapes, count_unindexed: unindexed_shapes},
%{stack_id: stack_id}
)
end
Expand Down Expand Up @@ -178,6 +179,8 @@ defmodule Electric.StackSupervisor.Telemetry do
defp default_metrics_from_periodic_measurements do
[
Telemetry.Metrics.last_value("electric.shapes.total_shapes.count"),
Telemetry.Metrics.last_value("electric.shapes.total_shapes.count_indexed"),
Telemetry.Metrics.last_value("electric.shapes.total_shapes.count_unindexed"),
Telemetry.Metrics.last_value("electric.shapes.active_shapes.count"),
Telemetry.Metrics.last_value("electric.shape_db.write_buffer.pending_writes.count"),
Telemetry.Metrics.last_value("electric.postgres.replication.pg_wal_offset"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ defmodule Electric.ShapeCache.ShapeStatusTest do
assert [{^shape_handle, ^shape}] = ShapeStatus.list_shapes(state)
end

test "can count shapes by indexability", ctx do
{:ok, state, []} = new_state(ctx)

assert {:ok, indexed_handle} = ShapeStatus.add_shape(state, shape!("indexed"))
assert {:ok, _unindexed_handle} = ShapeStatus.add_shape(state, shape2!())

assert %{total: 2, indexed: 1, unindexed: 1} = ShapeStatus.shape_counts(state)

assert :ok = ShapeStatus.remove_shape(state, indexed_handle)
assert %{total: 1, indexed: 0, unindexed: 1} = ShapeStatus.shape_counts(state)
end

test "can delete shape instances", ctx do
{:ok, state, []} = new_state(ctx)
shape_1 = shape!()
Expand Down
41 changes: 41 additions & 0 deletions packages/sync-service/test/electric/shapes/filter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,42 @@ defmodule Electric.Shapes.FilterTest do
]
)

describe "indexed_shape?/1" do
test "returns true for equality-indexed shapes" do
shape = Shape.new!("t1", where: "id = 7", inspector: @inspector)

assert Filter.indexed_shape?(shape)
end

test "returns false for shapes without an indexable where clause" do
shape = Shape.new!("t1", inspector: @inspector)

refute Filter.indexed_shape?(shape)
end

test "returns true for non-optimisable subquery shapes with sublink fields" do
shape =
Shape.new!("t1",
where: "id = 1 OR id IN (SELECT id FROM t2)",
inspector: @inspector
)
|> with_known_dependency_handles()

assert Filter.indexed_shape?(shape)
end

test "returns false for row-expression subquery shapes with no indexable fields" do
shape =
Shape.new!("t1",
where: "(id, number) IN (SELECT id, number FROM t2)",
inspector: @inspector
)
|> with_known_dependency_handles()

refute Filter.indexed_shape?(shape)
end
end

describe "affected_shapes/2" do
test "returns shapes affected by insert" do
filter =
Expand Down Expand Up @@ -778,6 +814,11 @@ defmodule Electric.Shapes.FilterTest do
}
end

defp with_known_dependency_handles(%Shape{shape_dependencies: deps} = shape) do
handles = Enum.with_index(deps, fn _dep, index -> "dep-#{index}" end)
%{shape | shape_dependencies_handles: handles}
end

describe "refs_fun threading through indexes" do
import Support.DbSetup
import Support.DbStructureSetup
Expand Down
Loading
Loading