Skip to content

Commit 5c3f5fc

Browse files
samwilliskevin-dp
andauthored
WIP feat(client): experimental support for streaming SSE in live mode (#2546)
Co-authored-by: Kevin <kevin@electric-sql.com>
1 parent 4d2c23a commit 5c3f5fc

16 files changed

Lines changed: 1986 additions & 1495 deletions

File tree

package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,10 @@
2020
},
2121
"devDependencies": {
2222
"glob": "^10.3.10"
23+
},
24+
"pnpm": {
25+
"patchedDependencies": {
26+
"@microsoft/fetch-event-source": "patches/@microsoft__fetch-event-source.patch"
27+
}
2328
}
2429
}

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,18 @@ defmodule Electric.Shapes.Api do
317317

318318
# TODO: discuss returning a 307 redirect rather than a 409, the client
319319
# will have to detect this and throw out old data
320+
321+
# In SSE mode we send the must refetch object as an event
322+
# instead of a singleton array containing that object
323+
must_refetch =
324+
if request.params.experimental_live_sse do
325+
hd(@must_refetch)
326+
else
327+
@must_refetch
328+
end
329+
320330
{:error,
321-
Response.error(request, @must_refetch,
331+
Response.error(request, must_refetch,
322332
handle: active_shape_handle,
323333
status: 409
324334
)}
@@ -610,10 +620,9 @@ defmodule Electric.Shapes.Api do
610620
last_message_time: last_message_time,
611621
request:
612622
%{
613-
api:
614-
%{
615-
keepalive_interval: keepalive_interval
616-
} = api,
623+
api: %{
624+
keepalive_interval: keepalive_interval
625+
},
617626
handle: shape_handle,
618627
new_changes_ref: ref
619628
} = request,
@@ -664,7 +673,7 @@ defmodule Electric.Shapes.Api do
664673

665674
{^ref, :shape_rotation} ->
666675
must_refetch = %{headers: %{control: "must-refetch"}}
667-
message = encode_message(api, must_refetch)
676+
message = encode_message(request, must_refetch)
668677

669678
{message, %{state | mode: :done}}
670679

@@ -793,11 +802,19 @@ defmodule Electric.Shapes.Api do
793802
encode(api, :log, stream)
794803
end
795804

796-
@spec encode_message(Api.t() | Request.t(), term()) :: Enum.t()
797-
def encode_message(%Api{} = api, message) do
805+
# Error messages are encoded normally, even when using SSE
806+
# because they are returned on the original fetch request
807+
# with a status code that is not 2xx.
808+
@spec encode_error_message(Api.t() | Request.t(), term()) :: Enum.t()
809+
def encode_error_message(%Api{} = api, message) do
810+
encode(api, :message, message)
811+
end
812+
813+
def encode_error_message(%Request{api: api}, message) do
798814
encode(api, :message, message)
799815
end
800816

817+
@spec encode_message(Request.t(), term()) :: Enum.t()
801818
def encode_message(
802819
%Request{api: api, params: %{live: true, experimental_live_sse: true}},
803820
message

packages/sync-service/lib/electric/shapes/api/response.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ defmodule Electric.Shapes.Api.Response do
8686
message
8787
end
8888

89-
Api.encode_message(api_or_request, body)
89+
Api.encode_error_message(api_or_request, body)
9090
end
9191

9292
@spec send(Plug.Conn.t(), t()) :: Plug.Conn.t()

packages/typescript-client/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
"bugs": {
77
"url": "https://github.com/electric-sql/electric/issues"
88
},
9-
"dependencies": {},
9+
"dependencies": {
10+
"@microsoft/fetch-event-source": "^2.0.1"
11+
},
1012
"devDependencies": {
1113
"@types/pg": "^8.11.6",
1214
"@types/uuid": "^10.0.0",

0 commit comments

Comments
 (0)