Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/green-ladybugs-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@core/elixir-client': minor
'@core/sync-service': minor
---

Add Move-in/out support for subqueries combined using `AND`, `OR`, `NOT`, and other compound `WHERE` expressions. Previously these shapes would return `409` on a subquery move, forcing clients to discard the shape and resync it from scratch. The sync service now reconciles those changes in-stream.

This release also changes the wire protocol. Older `@core/elixir-client` versions are not compatible with the sync service from this release. TanStack DB clients need `@tanstack/db >= 0.6.2` and `@tanstack/electric-db-collection >= 0.3.0`.
92 changes: 86 additions & 6 deletions packages/elixir-client/lib/electric/client/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Electric.Client.Message do
txids: [],
op_position: 0,
tags: [],
removed_tags: []
removed_tags: [],
active_conditions: []
]

@type operation :: :insert | :update | :delete
Expand All @@ -29,7 +30,8 @@ defmodule Electric.Client.Message do
txids: txids(),
op_position: non_neg_integer(),
tags: [tag()],
removed_tags: [tag()]
removed_tags: [tag()],
active_conditions: [boolean()]
}

@doc false
Expand All @@ -44,7 +46,8 @@ defmodule Electric.Client.Message do
lsn: Map.get(msg, "lsn", nil),
op_position: Map.get(msg, "op_position", 0),
tags: Map.get(msg, "tags", []),
removed_tags: Map.get(msg, "removed_tags", [])
removed_tags: Map.get(msg, "removed_tags", []),
active_conditions: Map.get(msg, "active_conditions", [])
}
end

Expand Down Expand Up @@ -187,14 +190,22 @@ defmodule Electric.Client.Message do

@enforce_keys [:shape_handle, :offset, :schema]

defstruct [:shape_handle, :offset, :schema, tag_to_keys: %{}, key_data: %{}]
defstruct [
:shape_handle,
:offset,
:schema,
tag_to_keys: %{},
key_data: %{},
disjunct_positions: nil
]

@type t :: %__MODULE__{
shape_handle: Client.shape_handle(),
offset: Offset.t(),
schema: Client.schema(),
tag_to_keys: %{String.t() => MapSet.t(String.t())},
key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}}
tag_to_keys: %{optional(term()) => MapSet.t(String.t())},
key_data: %{optional(String.t()) => map()},
disjunct_positions: [[non_neg_integer()]] | nil
}
end

Expand Down Expand Up @@ -251,6 +262,57 @@ defmodule Electric.Client.Message do
end
end

defmodule MoveInMessage do
@moduledoc """
Represents a move-in event from the server.

Move-in events are sent when the server's subquery filter has changed and
rows may now be included in the shape. The `patterns` field contains position
and hash information that the client uses to update `active_conditions` on
tracked rows.
"""

defstruct [:patterns, :handle, :request_timestamp]

@type pattern :: %{pos: non_neg_integer(), value: String.t()}
@type t :: %__MODULE__{
patterns: [pattern()],
handle: Client.shape_handle(),
request_timestamp: DateTime.t()
}

def from_message(
%{"headers" => %{"event" => "move-in", "patterns" => patterns}},
handle,
request_timestamp
) do
%__MODULE__{
patterns: normalize_patterns(patterns),
handle: handle,
request_timestamp: request_timestamp
}
end

def from_message(
%{headers: %{event: "move-in", patterns: patterns}},
handle,
request_timestamp
) do
%__MODULE__{
patterns: normalize_patterns(patterns),
handle: handle,
request_timestamp: request_timestamp
}
end

defp normalize_patterns(patterns) do
Enum.map(patterns, fn
%{"pos" => pos, "value" => value} -> %{pos: pos, value: value}
%{pos: _, value: _} = pattern -> pattern
end)
end
end

defguard is_insert(msg) when is_struct(msg, ChangeMessage) and msg.headers.operation == :insert

def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun, request_timestamp) do
Expand Down Expand Up @@ -288,6 +350,24 @@ defmodule Electric.Client.Message do
[MoveOutMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse(
%{"headers" => %{"event" => "move-in"}} = msg,
shape_handle,
_value_mapper_fun,
request_timestamp
) do
[MoveInMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse(
%{headers: %{event: "move-in"}} = msg,
shape_handle,
_value_mapper_fun,
request_timestamp
) do
[MoveInMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse("", _handle, _value_mapper_fun, _request_timestamp) do
[]
end
Expand Down
32 changes: 29 additions & 3 deletions packages/elixir-client/lib/electric/client/poll.ex
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,21 @@ defmodule Electric.Client.Poll do
end

defp handle_message(%Message.ChangeMessage{} = msg, state) do
{tag_to_keys, key_data} =
TagTracker.update_tag_index(state.tag_to_keys, state.key_data, msg)
{tag_to_keys, key_data, disjunct_positions} =
TagTracker.update_tag_index(
state.tag_to_keys,
state.key_data,
state.disjunct_positions,
msg
)

{:message, msg, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
{:message, msg,
%{
state
| tag_to_keys: tag_to_keys,
key_data: key_data,
disjunct_positions: disjunct_positions
}}
end

defp handle_message(
Expand All @@ -248,13 +259,28 @@ defmodule Electric.Client.Poll do
TagTracker.generate_synthetic_deletes(
state.tag_to_keys,
state.key_data,
state.disjunct_positions,
patterns,
request_timestamp
)

{:messages, synthetic_deletes, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
end

defp handle_message(
%Message.MoveInMessage{patterns: patterns},
state
) do
{tag_to_keys, key_data} =
TagTracker.handle_move_in(
state.tag_to_keys,
state.key_data,
patterns
)

{:skip, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
end

defp handle_schema(%Fetch.Response{schema: schema}, client, %{value_mapper_fun: nil} = state)
when is_map(schema) do
{parser_module, parser_opts} = client.parser
Expand Down
11 changes: 8 additions & 3 deletions packages/elixir-client/lib/electric/client/shape_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ defmodule Electric.Client.ShapeState do
tag_to_keys: %{},
key_data: %{},
stale_cache_retry_count: 0,
disjunct_positions: nil,
recent_requests: [],
fast_loop_consecutive_count: 0
]
Expand All @@ -59,6 +60,7 @@ defmodule Electric.Client.ShapeState do
up_to_date?: boolean(),
tag_to_keys: %{optional(term()) => MapSet.t()},
key_data: %{optional(term()) => %{tags: MapSet.t(), msg: term()}},
disjunct_positions: [[non_neg_integer()]] | nil,
stale_cache_buster: String.t() | nil,
stale_cache_retry_count: non_neg_integer(),
recent_requests: [{integer(), Offset.t()}],
Expand Down Expand Up @@ -95,7 +97,8 @@ defmodule Electric.Client.ShapeState do
schema: resume.schema,
up_to_date?: true,
tag_to_keys: Map.get(resume, :tag_to_keys, %{}),
key_data: Map.get(resume, :key_data, %{})
key_data: Map.get(resume, :key_data, %{}),
disjunct_positions: Map.get(resume, :disjunct_positions)
}
end

Expand All @@ -116,7 +119,8 @@ defmodule Electric.Client.ShapeState do
tag_to_keys: %{},
key_data: %{},
recent_requests: [],
fast_loop_consecutive_count: 0
fast_loop_consecutive_count: 0,
disjunct_positions: nil
}
end

Expand All @@ -130,7 +134,8 @@ defmodule Electric.Client.ShapeState do
offset: state.offset,
schema: state.schema,
tag_to_keys: state.tag_to_keys,
key_data: state.key_data
key_data: state.key_data,
disjunct_positions: state.disjunct_positions
}
end

Expand Down
Loading
Loading