Skip to content

Commit 438e6cf

Browse files
committed
Support complex subqueries
1 parent 45b43a9 commit 438e6cf

80 files changed

Lines changed: 11714 additions & 3487 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@core/sync-service': patch
3+
'@core/elixir-client': patch
4+
---
5+
6+
Introduce `active_conditions` wire format for DNF-based visibility tracking. The server now includes `active_conditions` in change headers for shapes with subqueries, and the Elixir client handles position-based tag indexing and disjunctive normal form (DNF) visibility evaluation. This is a backward-compatible protocol addition preparing for OR/NOT support in WHERE clauses.

.changeset/wild-walls-fly.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
chore: improve Storage contract to have less coupling on snapshot appends

packages/elixir-client/lib/electric/client/message.ex

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ defmodule Electric.Client.Message do
1313
txids: [],
1414
op_position: 0,
1515
tags: [],
16-
removed_tags: []
16+
removed_tags: [],
17+
active_conditions: []
1718
]
1819

1920
@type operation :: :insert | :update | :delete
@@ -29,7 +30,8 @@ defmodule Electric.Client.Message do
2930
txids: txids(),
3031
op_position: non_neg_integer(),
3132
tags: [tag()],
32-
removed_tags: [tag()]
33+
removed_tags: [tag()],
34+
active_conditions: [boolean()]
3335
}
3436

3537
@doc false
@@ -44,7 +46,8 @@ defmodule Electric.Client.Message do
4446
lsn: Map.get(msg, "lsn", nil),
4547
op_position: Map.get(msg, "op_position", 0),
4648
tags: Map.get(msg, "tags", []),
47-
removed_tags: Map.get(msg, "removed_tags", [])
49+
removed_tags: Map.get(msg, "removed_tags", []),
50+
active_conditions: Map.get(msg, "active_conditions", [])
4851
}
4952
end
5053

@@ -187,14 +190,22 @@ defmodule Electric.Client.Message do
187190

188191
@enforce_keys [:shape_handle, :offset, :schema]
189192

190-
defstruct [:shape_handle, :offset, :schema, tag_to_keys: %{}, key_data: %{}]
193+
defstruct [
194+
:shape_handle,
195+
:offset,
196+
:schema,
197+
tag_to_keys: %{},
198+
key_data: %{},
199+
disjunct_positions: nil
200+
]
191201

192202
@type t :: %__MODULE__{
193203
shape_handle: Client.shape_handle(),
194204
offset: Offset.t(),
195205
schema: Client.schema(),
196-
tag_to_keys: %{String.t() => MapSet.t(String.t())},
197-
key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}}
206+
tag_to_keys: %{optional(term()) => MapSet.t(String.t())},
207+
key_data: %{optional(String.t()) => map()},
208+
disjunct_positions: [[non_neg_integer()]] | nil
198209
}
199210
end
200211

@@ -251,6 +262,57 @@ defmodule Electric.Client.Message do
251262
end
252263
end
253264

265+
defmodule MoveInMessage do
266+
@moduledoc """
267+
Represents a move-in event from the server.
268+
269+
Move-in events are sent when the server's subquery filter has changed and
270+
rows may now be included in the shape. The `patterns` field contains position
271+
and hash information that the client uses to update `active_conditions` on
272+
tracked rows.
273+
"""
274+
275+
defstruct [:patterns, :handle, :request_timestamp]
276+
277+
@type pattern :: %{pos: non_neg_integer(), value: String.t()}
278+
@type t :: %__MODULE__{
279+
patterns: [pattern()],
280+
handle: Client.shape_handle(),
281+
request_timestamp: DateTime.t()
282+
}
283+
284+
def from_message(
285+
%{"headers" => %{"event" => "move-in", "patterns" => patterns}},
286+
handle,
287+
request_timestamp
288+
) do
289+
%__MODULE__{
290+
patterns: normalize_patterns(patterns),
291+
handle: handle,
292+
request_timestamp: request_timestamp
293+
}
294+
end
295+
296+
def from_message(
297+
%{headers: %{event: "move-in", patterns: patterns}},
298+
handle,
299+
request_timestamp
300+
) do
301+
%__MODULE__{
302+
patterns: normalize_patterns(patterns),
303+
handle: handle,
304+
request_timestamp: request_timestamp
305+
}
306+
end
307+
308+
defp normalize_patterns(patterns) do
309+
Enum.map(patterns, fn
310+
%{"pos" => pos, "value" => value} -> %{pos: pos, value: value}
311+
%{pos: _, value: _} = pattern -> pattern
312+
end)
313+
end
314+
end
315+
254316
defguard is_insert(msg) when is_struct(msg, ChangeMessage) and msg.headers.operation == :insert
255317

256318
def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun, request_timestamp) do
@@ -288,6 +350,24 @@ defmodule Electric.Client.Message do
288350
[MoveOutMessage.from_message(msg, shape_handle, request_timestamp)]
289351
end
290352

353+
def parse(
354+
%{"headers" => %{"event" => "move-in"}} = msg,
355+
shape_handle,
356+
_value_mapper_fun,
357+
request_timestamp
358+
) do
359+
[MoveInMessage.from_message(msg, shape_handle, request_timestamp)]
360+
end
361+
362+
def parse(
363+
%{headers: %{event: "move-in"}} = msg,
364+
shape_handle,
365+
_value_mapper_fun,
366+
request_timestamp
367+
) do
368+
[MoveInMessage.from_message(msg, shape_handle, request_timestamp)]
369+
end
370+
291371
def parse("", _handle, _value_mapper_fun, _request_timestamp) do
292372
[]
293373
end

packages/elixir-client/lib/electric/client/poll.ex

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,21 @@ defmodule Electric.Client.Poll do
234234
end
235235

236236
defp handle_message(%Message.ChangeMessage{} = msg, state) do
237-
{tag_to_keys, key_data} =
238-
TagTracker.update_tag_index(state.tag_to_keys, state.key_data, msg)
237+
{tag_to_keys, key_data, disjunct_positions} =
238+
TagTracker.update_tag_index(
239+
state.tag_to_keys,
240+
state.key_data,
241+
state.disjunct_positions,
242+
msg
243+
)
239244

240-
{:message, msg, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
245+
{:message, msg,
246+
%{
247+
state
248+
| tag_to_keys: tag_to_keys,
249+
key_data: key_data,
250+
disjunct_positions: disjunct_positions
251+
}}
241252
end
242253

243254
defp handle_message(
@@ -248,13 +259,28 @@ defmodule Electric.Client.Poll do
248259
TagTracker.generate_synthetic_deletes(
249260
state.tag_to_keys,
250261
state.key_data,
262+
state.disjunct_positions,
251263
patterns,
252264
request_timestamp
253265
)
254266

255267
{:messages, synthetic_deletes, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
256268
end
257269

270+
defp handle_message(
271+
%Message.MoveInMessage{patterns: patterns},
272+
state
273+
) do
274+
{tag_to_keys, key_data} =
275+
TagTracker.handle_move_in(
276+
state.tag_to_keys,
277+
state.key_data,
278+
patterns
279+
)
280+
281+
{:skip, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
282+
end
283+
258284
defp handle_schema(%Fetch.Response{schema: schema}, client, %{value_mapper_fun: nil} = state)
259285
when is_map(schema) do
260286
{parser_module, parser_opts} = client.parser

packages/elixir-client/lib/electric/client/shape_state.ex

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ defmodule Electric.Client.ShapeState do
4646
tag_to_keys: %{},
4747
key_data: %{},
4848
stale_cache_retry_count: 0,
49+
disjunct_positions: nil,
4950
recent_requests: [],
5051
fast_loop_consecutive_count: 0
5152
]
@@ -59,6 +60,7 @@ defmodule Electric.Client.ShapeState do
5960
up_to_date?: boolean(),
6061
tag_to_keys: %{optional(term()) => MapSet.t()},
6162
key_data: %{optional(term()) => %{tags: MapSet.t(), msg: term()}},
63+
disjunct_positions: [[non_neg_integer()]] | nil,
6264
stale_cache_buster: String.t() | nil,
6365
stale_cache_retry_count: non_neg_integer(),
6466
recent_requests: [{integer(), Offset.t()}],
@@ -95,7 +97,8 @@ defmodule Electric.Client.ShapeState do
9597
schema: resume.schema,
9698
up_to_date?: true,
9799
tag_to_keys: Map.get(resume, :tag_to_keys, %{}),
98-
key_data: Map.get(resume, :key_data, %{})
100+
key_data: Map.get(resume, :key_data, %{}),
101+
disjunct_positions: Map.get(resume, :disjunct_positions)
99102
}
100103
end
101104

@@ -116,7 +119,8 @@ defmodule Electric.Client.ShapeState do
116119
tag_to_keys: %{},
117120
key_data: %{},
118121
recent_requests: [],
119-
fast_loop_consecutive_count: 0
122+
fast_loop_consecutive_count: 0,
123+
disjunct_positions: nil
120124
}
121125
end
122126

@@ -130,7 +134,8 @@ defmodule Electric.Client.ShapeState do
130134
offset: state.offset,
131135
schema: state.schema,
132136
tag_to_keys: state.tag_to_keys,
133-
key_data: state.key_data
137+
key_data: state.key_data,
138+
disjunct_positions: state.disjunct_positions
134139
}
135140
end
136141

0 commit comments

Comments
 (0)