Skip to content

Commit b3de307

Browse files
committed
Getting close to supporting HasThrough
1 parent e575bfa commit b3de307

4 files changed

Lines changed: 141 additions & 19 deletions

File tree

lib/ecto_sync/helpers.ex

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ defmodule EctoSync.Helpers do
6868
}) do
6969
preloads =
7070
Map.get(preloads || %{}, schema, [])
71-
|> IO.inspect(label: :preloads)
71+
|> IO.inspect(label: :get_from_cache_preloads)
7272
|> normalize_to_preloads()
7373
|> nested_sort()
7474

@@ -85,6 +85,18 @@ defmodule EctoSync.Helpers do
8585
value
8686
end
8787

88+
def kw_deep_merge([{k1, v1} | list1], [{k1, v2} | list2]) do
89+
[{k1, kw_deep_merge(v1, v2)} | kw_deep_merge(list1, list2)]
90+
end
91+
92+
def kw_deep_merge([{k1, v1} | list1], [{k2, v2} | list2]) do
93+
[{k1, v1}, {k2, v2} | kw_deep_merge(list1, list2)]
94+
end
95+
96+
def kw_deep_merge([], list), do: list
97+
def kw_deep_merge(list, []), do: list
98+
def kw_deep_merge(list, list), do: list
99+
88100
def primary_key(%{__struct__: schema_mod} = value) when is_struct(value) do
89101
primary_key(schema_mod)
90102
|> then(&Map.get(value, &1))

lib/ecto_sync/subscriber.ex

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ defmodule EctoSync.Subscriber do
4040
events
4141
end
4242
end)
43+
|> merge_assocs()
4344
|> Enum.map(fn {{watcher_identifier, id}, opts} ->
4445
do_subscribe(watcher_identifier, id, opts)
4546
end)
@@ -60,7 +61,12 @@ defmodule EctoSync.Subscriber do
6061

6162
if self() not in pids do
6263
Logger.debug("EventRegistry | #{inspect({watcher_identifier, id})}")
63-
Registry.register(EventRegistry, {encoded_identifier, id}, opts)
64+
65+
Registry.register(
66+
EventRegistry,
67+
{encoded_identifier, id},
68+
opts
69+
)
6470

6571
EctoWatch.subscribe(encoded_identifier, id)
6672
end
@@ -86,17 +92,20 @@ defmodule EctoSync.Subscriber do
8692
parent_id = primary_key(struct)
8793
assoc_field = {related_key, parent_id}
8894
assocs = Map.get(struct, field)
89-
90-
[{{schema, :inserted}, assoc_field} | subscribe_events(struct)] ++
91-
[Enum.map(assocs, &subscribe_events/1)]
95+
# | subscribe_events(struct)
96+
[{{schema, :inserted}, assoc_field}] ++ [Enum.map(assocs, &subscribe_events/1)]
9297
end
9398

94-
def subscribe_events(struct, %HasThrough{through: [k | through]}),
95-
do:
96-
subscribe_events_assocs(
97-
Map.get(struct, k),
98-
through
99-
)
99+
def subscribe_events(struct, %HasThrough{through: through} = assoc) do
100+
preloads =
101+
through
102+
|> Enum.reverse()
103+
|> Enum.reduce([], fn k, acc ->
104+
[{k, acc}]
105+
end)
106+
107+
subscribe_events_assocs(struct, preloads)
108+
end
100109

101110
def subscribe_events(struct, %ManyToMany{
102111
join_through: join_through,
@@ -264,12 +273,26 @@ defmodule EctoSync.Subscriber do
264273
subscribe_events(parent, assoc_info)
265274
|> add_opts(opts)
266275

267-
# |> IO.inspect(label: :events)
268-
269276
subscribe_events_assocs(value, nested, events ++ acc)
270277
end
271278
end
272279

273-
defp add_opts(list, opts) when is_list(list), do: List.flatten(list) |> Enum.map(&{&1, opts})
280+
defp add_opts(list, opts) when is_list(list),
281+
do: List.flatten(list) |> Enum.map(&add_opts(&1, opts))
282+
283+
defp add_opts({{{_, _}, _}, _} = tuple, _opts), do: tuple
274284
defp add_opts(tuple, opts), do: {tuple, opts}
285+
286+
defp merge_assocs(watchers) do
287+
watchers
288+
|> Enum.group_by(fn {identifier_id, _opts} -> identifier_id end, fn {_, opts} -> opts end)
289+
|> Enum.map(fn {watcher_identifier, opts} ->
290+
opts =
291+
Enum.reduce(opts, [], fn opt, acc ->
292+
kw_deep_merge(acc, opt)
293+
end)
294+
295+
{watcher_identifier, opts}
296+
end)
297+
end
275298
end

lib/ecto_sync/syncer.ex

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule EctoSync.Syncer do
22
@moduledoc false
33
alias EctoSync.PubSub
44
alias EctoSync.{Config, Subscriber}
5-
alias Ecto.Association.{BelongsTo, Has, ManyToMany}
5+
alias Ecto.Association.{BelongsTo, Has, HasThrough, ManyToMany}
66
import EctoSync.Helpers
77

88
def sync(from_cache_or_value, event, opts \\ [])
@@ -41,11 +41,27 @@ defmodule EctoSync.Syncer do
4141
|> Enum.concat(config.preloads[schema] || [])
4242
|> List.flatten()
4343

44-
new = get_preloaded(config.schema, config.id, preloads, config)
44+
config = %{
45+
config
46+
| preloads: Map.update(config.preloads, schema, preloads, &kw_deep_merge(&1, preloads))
47+
}
48+
49+
new =
50+
get_preloaded(config.schema, config.id, preloads, config)
51+
|> IO.inspect(label: :gotten)
4552

4653
Subscriber.subscribe(new, assocs: preloads)
4754

48-
do_sync(value_or_values, new, config)
55+
value_or_values =
56+
do_sync(value_or_values, new, config)
57+
|> IO.inspect(label: :after_sync)
58+
59+
# reduce_preloaded_assocs(new, value_or_values, fn key, struct, acc ->
60+
# struct
61+
# |> IO.inspect(label: :other)
62+
# |> List.wrap()
63+
# |> Enum.reduce(acc, &do_sync(&2, &1, config))
64+
# end)
4965
end
5066

5167
def sync(value_or_values, event, opts) do
@@ -148,8 +164,13 @@ defmodule EctoSync.Syncer do
148164
end)
149165
end
150166

167+
defp deep_update(value, {_key, []} = path, new, config) when is_list(new) do
168+
Enum.reduce(new, value, &deep_update(&2, path, &1, config))
169+
end
170+
151171
defp deep_update(value, {key, []}, new, %{schema: schema} = config) do
152172
value_schema = get_schema(value)
173+
153174
assoc_info = value_schema.__schema__(:association, key)
154175

155176
Map.update!(value, key, fn
@@ -158,6 +179,29 @@ defmodule EctoSync.Syncer do
158179

159180
assoc ->
160181
case {assoc_info, assoc} do
182+
{%HasThrough{through: through}, assocs} ->
183+
case config.event do
184+
:inserted ->
185+
cond do
186+
is_list(assocs) and is_nil(find_by_primary_key(assocs, new)) ->
187+
assocs ++ [new]
188+
189+
is_list(assocs) ->
190+
assocs
191+
192+
true ->
193+
new
194+
end
195+
196+
:updated ->
197+
if is_list(assocs) do
198+
possible_index = find_by_primary_key(assocs, new)
199+
List.replace_at(assocs, possible_index, new)
200+
else
201+
new
202+
end
203+
end
204+
161205
{%Has{field: key, where: where}, assocs} ->
162206
possible_index = find_by_primary_key(assocs, new)
163207
related_id = Map.get(new, assoc_info.related_key)

test/ecto_sync_test.exs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,12 +238,12 @@ defmodule EctoSyncTest do
238238

239239
receive do
240240
{{Post, :inserted}, _} = sync_args ->
241-
synced = EctoSync.sync(post, sync_args, sync_opts)
241+
# synced = EctoSync.sync(post, sync_args, sync_opts)
242+
# assert synced == post
242243

243244
assert do_preload(person, posts: [person: :posts]) ==
244245
EctoSync.sync(person, sync_args, sync_opts)
245246

246-
assert synced == post
247247
assert [^post] = EctoSync.sync([], sync_args, sync_opts)
248248
assert [%Post{}, ^post] = EctoSync.sync([%Post{}], sync_args, sync_opts)
249249
assert ^post = EctoSync.sync(nil, sync_args, sync_opts)
@@ -669,6 +669,49 @@ defmodule EctoSyncTest do
669669
end
670670
end
671671

672+
describe "has with through clause" do
673+
@preloads [:posts, :all_tags]
674+
test "inserted", %{person: person} do
675+
person = do_preload(person, @preloads)
676+
677+
subscribe(person, assocs: @preloads)
678+
679+
{:ok, _post} =
680+
TestRepo.insert(%Post{person_id: person.id, name: "test", tags: [%{name: "test tag"}]})
681+
682+
# person = do_preload(person, @preloads)
683+
684+
# flush()
685+
# |> IO.inspect()
686+
687+
receive do
688+
{{Post, :inserted}, _} = sync_args ->
689+
synced = EctoSync.sync(person, sync_args)
690+
691+
assert do_preload(person, @preloads) == synced
692+
after
693+
500 -> raise "nothing POSTS"
694+
end
695+
end
696+
697+
test "deleted", %{person_with_posts_and_tags: person} do
698+
%{posts: [%{tags: [tag | _]} = post | _]} =
699+
person = do_preload(person, @preloads)
700+
701+
subscribe(person, assocs: @preloads)
702+
703+
{:ok, _} = TestRepo.delete(tag)
704+
705+
receive do
706+
{{Tag, :deleted}, _} = sync_args ->
707+
synced = EctoSync.sync(person, sync_args, preloads: %{Post => [:tags, :labels]})
708+
assert do_preload(person, @preloads) == synced
709+
after
710+
500 -> raise "nothing POSTS"
711+
end
712+
end
713+
end
714+
672715
describe "many to many with join_through module" do
673716
@preloads [:favourite_tags, posts: :tags]
674717
test "inserted", %{person_with_posts_and_tags: person} do

0 commit comments

Comments
 (0)