From 50ad68bbd4aa74245d8a32d48d706171cfdcdf7a Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Tue, 26 May 2026 17:20:49 -0700 Subject: [PATCH 1/9] Eliminate Rx Merge gate in queue-serialized operators #1079 moved cross-cache operators from Synchronize(lock) to SynchronizeSafe, which routes deliveries through a SharedDeliveryQueue that releases the lock before invoking downstream observers. The intent was to make the lock no longer held across cross-cache calls, so two operators on a bidirectional pipeline could not form an ABBA cycle. Six operators (Page, Virtualise, AutoRefresh, Sort, GroupOnImmutable, and QueryWhenChanged) routed every input through the queue but then combined the inputs with Observable.Merge before delivery. Rx's Merge installs its own private gate and holds it for the full duration of every downstream OnNext. When downstream delivery walks into another cache's writer lock, the two Merge gates on the two operators reconstruct the ABBA cycle that the queue- drain design was supposed to eliminate. DeadlockTortureTest.Page_DoesNotDeadlock caught this for Page; the other five had the same latent bug. This adds IObservable.UnsynchronizedMerge, a drop-in alternative to Observable.Merge that performs no synchronization of its own. It is safe to use only when every input is already serialized (in this library, by routing through the same SharedDeliveryQueue). All six operators now use it. Sort's three-source case becomes a single UnsynchronizedMerge call instead of nested .Merge().Merge(), removing one of the two gates that the chained form created. FullJoin uses the same Merge syntax but its two inputs come from independently materialized AsObservableCache().Connect() streams that share no queue. The Merge gate is the only thing serializing them; this PR leaves FullJoin alone. DeadlockTortureTest grows three new cases (GroupWithImmutableState, QueryWhenChanged, and Virtualise added to the stacked + multi-pair scenarios) so a future regression in any of the six operators is caught by the existing torture fixture. Verified: 14/14 DeadlockTortureTest pass at MaxParallelThreads=16 across 10 iterations; 422/422 Sort/Virtualise/Page/AutoRefresh/Group/QueryWhenChanged unit tests pass; full Cache + List suite passes (2321 passed, 1 skipped). --- .../Cache/DeadlockTortureTest.cs | 38 ++++++++++- src/DynamicData/Cache/Internal/AutoRefresh.cs | 2 +- .../Cache/Internal/GroupOnImmutable.cs | 2 +- src/DynamicData/Cache/Internal/Page.cs | 2 +- .../Cache/Internal/QueryWhenChanged.cs | 2 +- src/DynamicData/Cache/Internal/Sort.cs | 2 +- src/DynamicData/Cache/Internal/Virtualise.cs | 2 +- .../Internal/SynchronizeSafeExtensions.cs | 66 +++++++++++++++++++ 8 files changed, 109 insertions(+), 7 deletions(-) diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index 4f4ddf9a..8576d34c 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -64,6 +64,9 @@ [Fact] public async Task AutoRefresh_DoesNotDeadlock() => [Fact] public async Task GroupOn_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue(); + [Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey))).Should().BeTrue(); + [Fact] public async Task Page_DoesNotDeadlock() { using var req = new BehaviorSubject(new PageRequest(1, 50)); @@ -76,6 +79,34 @@ [Fact] public async Task Virtualise_DoesNotDeadlock() (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue(); } + [Fact] public async Task QueryWhenChanged_DoesNotDeadlock() + { + for (var iter = 0; iter < Iterations; iter++) + { + using var sourceA = new SourceCache(p => p.UniqueKey); + using var sourceB = new SourceCache(p => p.UniqueKey); + + // QueryWhenChanged with an itemChangedTrigger exercises the Merge branch. + // A side-channel write into the other cache closes the same ABBA cycle that + // PopulateInto would close for changeset-shaped operators. + using var aToB = sourceA.Connect() + .Filter(p => p.Name.StartsWith("A")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceB.AddOrUpdate(new Person("A-marker", 0))); + using var bToA = sourceB.Connect() + .Filter(p => p.Name.StartsWith("B")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceA.AddOrUpdate(new Person("B-marker", 0))); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } + } + [Fact] public async Task TransformWithForce_DoesNotDeadlock() { using var force = new Subject>(); @@ -94,14 +125,18 @@ [Fact] public async Task OnItemRemoved_DoesNotDeadlock() => [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); + using var virtReq = new BehaviorSubject(new VirtualRequest(0, 100)); using var force = new Subject>(); (await RunBidirectionalDeadlockTest( - s => s.AutoRefresh(p => p.Age) + s => s.GroupWithImmutableState(p => p.Age % 3) + .TransformMany(g => g.Items, p => p.UniqueKey) + .AutoRefresh(p => p.Age) .Filter(p => p.Age >= 0) .Transform((p, k) => new Person("X-" + p.Name, p.Age), force) .OnItemRemoved(_ => { }) .DisposeMany() .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .Virtualise(virtReq) .Page(pageReq), iterations: Iterations * 2)).Should().BeTrue(); } @@ -114,6 +149,7 @@ [Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock() RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), 30), RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30), RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30), + RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), 30), RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30), RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30), RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), 30), diff --git a/src/DynamicData/Cache/Internal/AutoRefresh.cs b/src/DynamicData/Cache/Internal/AutoRefresh.cs index ee81fe58..7edd3f01 100644 --- a/src/DynamicData/Cache/Internal/AutoRefresh.cs +++ b/src/DynamicData/Cache/Internal/AutoRefresh.cs @@ -33,7 +33,7 @@ public IObservable> Run() => Observable.Create> Run() => var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); - return new CompositeDisposable(groups.Merge(regroup).SubscribeSafe(observer), queue); + return new CompositeDisposable(groups.UnsynchronizedMerge(regroup).SubscribeSafe(observer), queue); }); private sealed class Grouper(Func groupSelectorKey) diff --git a/src/DynamicData/Cache/Internal/Page.cs b/src/DynamicData/Cache/Internal/Page.cs index e4159587..2d91ac29 100644 --- a/src/DynamicData/Cache/Internal/Page.cs +++ b/src/DynamicData/Cache/Internal/Page.cs @@ -19,7 +19,7 @@ public IObservable> Run() => Observable.Create updates is not null) .Select(x => x!) .SubscribeSafe(observer), queue); diff --git a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs index 01828e85..7e976424 100644 --- a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs +++ b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs @@ -49,7 +49,7 @@ public IObservable> Run() return cache; }).Select(list => new AnonymousQuery(list)); - return new CompositeDisposable(sourceChanged.Merge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); + return new CompositeDisposable(sourceChanged.UnsynchronizedMerge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); }); } } diff --git a/src/DynamicData/Cache/Internal/Sort.cs b/src/DynamicData/Cache/Internal/Sort.cs index 11b39035..6afb9632 100644 --- a/src/DynamicData/Cache/Internal/Sort.cs +++ b/src/DynamicData/Cache/Internal/Sort.cs @@ -57,7 +57,7 @@ public IObservable> Run() => Observable.Create result is not null).Select(x => x!).SubscribeSafe(observer), queue); + return new CompositeDisposable(comparerChanged.UnsynchronizedMerge(dataChanged, sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue); }); private sealed class Sorter(SortOptimisations optimisations, IComparer? comparer = null, int resetThreshold = -1) diff --git a/src/DynamicData/Cache/Internal/Virtualise.cs b/src/DynamicData/Cache/Internal/Virtualise.cs index 9a5fefbc..9e858f5d 100644 --- a/src/DynamicData/Cache/Internal/Virtualise.cs +++ b/src/DynamicData/Cache/Internal/Virtualise.cs @@ -23,7 +23,7 @@ public IObservable> Run() => Observable.Create< var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise).Where(x => x is not null).Select(x => x!); var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update).Where(x => x is not null).Select(x => x!); - return new CompositeDisposable(request.Merge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); + return new CompositeDisposable(request.UnsynchronizedMerge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); }); private sealed class Virtualiser(VirtualRequest? request = null) diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index ace8ed4a..577bc64a 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -2,6 +2,7 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; @@ -78,4 +79,69 @@ public static IObservable SynchronizeSafe(this IObservable source) => // Queue first: ensures in-flight deliveries complete before teardown side effects run return new CompositeDisposable(queue, source.SubscribeSafe(queue)); }); + + /// + /// Merges with into a single observable + /// without taking any synchronization gate. Functionally equivalent to + /// : completes only after + /// every source completes; the first error terminates; subscription occurs in argument order. + /// + /// + /// The caller MUST ensure that delivery from every source is already serialized. + /// In this library the precondition is satisfied by routing every source through the + /// same via + /// . The shared + /// queue's drain loop guarantees that at most one notification is in flight to the + /// downstream observer at a time, so the additional gate that Observable.Merge + /// would install is redundant. + /// Removing that gate matters in cross-cache pipelines: Observable.Merge + /// holds its private _gate for the entire duration of downstream delivery, and + /// when downstream delivery walks into another cache's writer lock, two such gates on + /// two operators form an ABBA cycle that the queue-drain design is meant to prevent. + /// Without the external serialization precondition, concurrent OnNext + /// calls into the shared observer will race. Do not use as a general-purpose + /// Observable.Merge replacement. + /// + public static IObservable UnsynchronizedMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var totalSources = others.Length + 1; + var subscriptions = new CompositeDisposable(totalSources); + var pending = totalSources; + var terminated = 0; + + void OnNextSafe(T value) + { + if (Volatile.Read(ref terminated) == 0) + { + observer.OnNext(value); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref pending) == 0 && + Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + + var fanOut = Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); + subscriptions.Add(first.SubscribeSafe(fanOut)); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(fanOut)); + } + + return subscriptions; + }); } From 7bfe343acfc59477fb8170ccd14003ca4509c2b5 Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 00:22:15 -0700 Subject: [PATCH 2/9] Fix UnsynchronizedMerge dropping terminal notifications Initial implementation subscribed every source to a single shared Observer.Create instance. The instance is an AnonymousObserver, which derives from ObserverBase and tracks a one-shot _isStopped flag inside its OnCompleted/OnError. Once any source's terminal notification flips that flag, every subsequent OnCompleted from the remaining sources is silently dropped before reaching the pending counter, so the merged observable never emits OnCompleted downstream. CrossCacheDeadlockStressTest.AllOperators_CrossCache_NoDeadlock_CorrectResults caught this consistently in CI: the sourceB.Sort.Virtualise pipeline received OnCompleted from virtBRequests (its first source), but the matching OnCompleted from sourceB.Dispose arrived at a stopped observer and was discarded, leaving LastOrDefaultAsync waiting forever. Each source now subscribes through its own Observer.Create instance. The OnNextSafe/OnErrorSafe/OnCompletedSafe actions close over the same shared pending and terminated counters, so the all-must-complete and first-error-wins semantics are unchanged; only the per-observer one-shot state is now isolated per source. This matches the per-InnerObserver pattern that Rx's own Observable.Merge uses internally. Also apply UnsynchronizedMerge to TransformWithForcedTransform, which was missed in the original survey. Its shared.Merge(refresher) routed both inputs through the same SharedDeliveryQueue but kept Rx's gate, giving the same latent ABBA exposure that DeadlockTortureTest.TransformWithForce_DoesNotDeadlock flagged in CI. Verified: CrossCacheDeadlockStressTest plus the full DeadlockTortureTest fixture pass 10/10 at xUnit.MaxParallelThreads=16; full test suite passes 2323/2323 at xUnit.MaxParallelThreads=4. --- .../Cache/Internal/TransformWithForcedTransform.cs | 2 +- src/DynamicData/Internal/SharedDeliveryQueue.cs | 2 +- .../Internal/SynchronizeSafeExtensions.cs | 12 +++++++++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs index c7c95aa6..4bce97ac 100644 --- a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs +++ b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs @@ -25,7 +25,7 @@ public IObservable> Run() => Observable.Create CaptureChanges(cache, selector)).Select(changes => new ChangeSet(changes)).NotEmpty(); - var sourceAndRefreshes = shared.Merge(refresher); + var sourceAndRefreshes = shared.UnsynchronizedMerge(refresher); // do raw transform var transform = new Transform(sourceAndRefreshes, transformFactory, exceptionCallback, true).Run(); diff --git a/src/DynamicData/Internal/SharedDeliveryQueue.cs b/src/DynamicData/Internal/SharedDeliveryQueue.cs index 6eab2889..9ec57a8e 100644 --- a/src/DynamicData/Internal/SharedDeliveryQueue.cs +++ b/src/DynamicData/Internal/SharedDeliveryQueue.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index 577bc64a..e20131e1 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -98,6 +98,11 @@ public static IObservable SynchronizeSafe(this IObservable source) => /// holds its private _gate for the entire duration of downstream delivery, and /// when downstream delivery walks into another cache's writer lock, two such gates on /// two operators form an ABBA cycle that the queue-drain design is meant to prevent. + /// Each source is subscribed through its own + /// instance. The actions close over shared pending and terminated counters, but + /// the observer instances must be distinct because Rx's ObserverBase sets a one-shot + /// stopped flag on the first OnCompleted/OnError; a single shared observer + /// would silently drop terminal notifications from every source after the first. /// Without the external serialization precondition, concurrent OnNext /// calls into the shared observer will race. Do not use as a general-purpose /// Observable.Merge replacement. @@ -135,11 +140,12 @@ void OnCompletedSafe() } } - var fanOut = Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); - subscriptions.Add(first.SubscribeSafe(fanOut)); + IObserver CreateInner() => Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); foreach (var source in others) { - subscriptions.Add(source.SubscribeSafe(fanOut)); + subscriptions.Add(source.SubscribeSafe(CreateInner())); } return subscriptions; From 860adb5d90b6a4070d1a868be18ee11cdc16a60b Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 00:49:09 -0700 Subject: [PATCH 3/9] Introduce DeliveryQueueMerge to combine SDQ routing and gate-free merge Six of the operators changed in this branch followed the same shape: var queue = new SharedDeliveryQueue(); var s1 = source1.SynchronizeSafe(queue).Select(projection1); var s2 = source2.SynchronizeSafe(queue).Select(projection2); return new CompositeDisposable(s1.UnsynchronizedMerge(s2)... , queue); Every site allocates its own queue, threads it through each input, and unwinds it in the disposable. The pattern is mechanical and easy to get wrong: the queue must outlive the subscription, every input must be serialized through the same queue, and the merge must skip Rx's gate. DeliveryQueueMerge wraps that pattern as one operator. Each overload owns its own SharedDeliveryQueue, routes every input through it via SynchronizeSafe(queue), and combines the serialized streams with UnsynchronizedMerge. The returned disposable tears down the merge before the queue so terminal notifications still flow through the still-active queue. Two flavours: DeliveryQueueMerge(IObservable, params IObservable[]) same-type merge, no projection (AutoRefresh) DeliveryQueueMerge(IObservable, Func, IObservable, Func) heterogeneous two-source merge with projections invoked inside the drain (Page, Virtualise, GroupOnImmutable, QueryWhenChanged) DeliveryQueueMerge(IObservable, ..., IObservable, ..., IObservable, ...) three-source heterogeneous merge (Sort, non-early-return branch) TransformWithForcedTransform keeps its current shape: its queue is shared with a Publish()/cacheLoader subscription that lives outside the merge, so the queue cannot be encapsulated by a merge operator. UnsynchronizedMerge remains the helper there. Verified locally: 437/437 unit tests across the six affected operators pass; DeadlockTortureTest plus CrossCacheDeadlockStressTest pass 10/10 at xUnit.MaxParallelThreads=16; full test suite passes at MaxParallelThreads=4. --- src/DynamicData/Cache/Internal/AutoRefresh.cs | 5 +- .../Cache/Internal/GroupOnImmutable.cs | 12 +- src/DynamicData/Cache/Internal/Page.cs | 10 +- .../Cache/Internal/QueryWhenChanged.cs | 19 ++- src/DynamicData/Cache/Internal/Sort.cs | 17 +-- src/DynamicData/Cache/Internal/Virtualise.cs | 11 +- .../Internal/DeliveryQueueMergeExtensions.cs | 122 ++++++++++++++++++ 7 files changed, 157 insertions(+), 39 deletions(-) create mode 100644 src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs diff --git a/src/DynamicData/Cache/Internal/AutoRefresh.cs b/src/DynamicData/Cache/Internal/AutoRefresh.cs index 7edd3f01..f1f17d67 100644 --- a/src/DynamicData/Cache/Internal/AutoRefresh.cs +++ b/src/DynamicData/Cache/Internal/AutoRefresh.cs @@ -32,9 +32,8 @@ public IObservable> Run() => Observable.Create list.Count > 0).Select(items => new ChangeSet(items)); // publish refreshes and underlying changes - var queue = new SharedDeliveryQueue(); - var publisher = shared.SynchronizeSafe(queue).UnsynchronizedMerge(refreshChanges.SynchronizeSafe(queue)).SubscribeSafe(observer); + var publisher = shared.DeliveryQueueMerge(refreshChanges).SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect(), queue); + return new CompositeDisposable(publisher, shared.Connect()); }); } diff --git a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs index fc6f3b16..c9b776b1 100644 --- a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs @@ -22,14 +22,12 @@ internal sealed class GroupOnImmutable(IObservable> Run() => Observable.Create>( observer => { - var queue = new SharedDeliveryQueue(); var grouper = new Grouper(_groupSelectorKey); - - var groups = _source.SynchronizeSafe(queue).Select(grouper.Update).Where(changes => changes.Count != 0); - - var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); - - return new CompositeDisposable(groups.UnsynchronizedMerge(regroup).SubscribeSafe(observer), queue); + return DeliveryQueueMergeExtensions.DeliveryQueueMerge, Unit, IImmutableGroupChangeSet>( + _source, grouper.Update, + _regrouper, _ => grouper.Regroup()) + .Where(changes => changes.Count != 0) + .SubscribeSafe(observer); }); private sealed class Grouper(Func groupSelectorKey) diff --git a/src/DynamicData/Cache/Internal/Page.cs b/src/DynamicData/Cache/Internal/Page.cs index 2d91ac29..f707b657 100644 --- a/src/DynamicData/Cache/Internal/Page.cs +++ b/src/DynamicData/Cache/Internal/Page.cs @@ -14,15 +14,13 @@ internal sealed class Page(IObservable> Run() => Observable.Create>( observer => { - var queue = new SharedDeliveryQueue(); var paginator = new Paginator(); - var request = pageRequests.SynchronizeSafe(queue).Select(paginator.Paginate); - var dataChange = source.SynchronizeSafe(queue).Select(paginator.Update); - - return new CompositeDisposable(request.UnsynchronizedMerge(dataChange) + return DeliveryQueueMergeExtensions.DeliveryQueueMerge, IPagedChangeSet?>( + pageRequests, paginator.Paginate, + source, paginator.Update) .Where(updates => updates is not null) .Select(x => x!) - .SubscribeSafe(observer), queue); + .SubscribeSafe(observer); }); private sealed class Paginator diff --git a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs index 7e976424..11dc5bdf 100644 --- a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs +++ b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs @@ -34,22 +34,21 @@ public IObservable> Run() return Observable.Create>(observer => { - var queue = new SharedDeliveryQueue(); var state = new Cache(); var shared = _source.Publish(); - var inlineChange = shared.MergeMany(itemChangedTrigger).SynchronizeSafe(queue).Select(_ => new AnonymousQuery(state)); - - var sourceChanged = shared.SynchronizeSafe(queue).Scan( - state, - (cache, changes) => + var merged = DeliveryQueueMergeExtensions.DeliveryQueueMerge, TValue, IQuery>( + shared, + changes => { - cache.Clone(changes); - return cache; - }).Select(list => new AnonymousQuery(list)); + state.Clone(changes); + return new AnonymousQuery(state); + }, + shared.MergeMany(itemChangedTrigger), + _ => new AnonymousQuery(state)); - return new CompositeDisposable(sourceChanged.UnsynchronizedMerge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); + return new CompositeDisposable(merged.SubscribeSafe(observer), shared.Connect()); }); } } diff --git a/src/DynamicData/Cache/Internal/Sort.cs b/src/DynamicData/Cache/Internal/Sort.cs index 6afb9632..4d3c2495 100644 --- a/src/DynamicData/Cache/Internal/Sort.cs +++ b/src/DynamicData/Cache/Internal/Sort.cs @@ -43,7 +43,6 @@ public IObservable> Run() => Observable.Create { var sorter = new Sorter(_sortOptimisations, _comparer, _resetThreshold); - var queue = new SharedDeliveryQueue(); // check for nulls so we can prevent a lock when not required if (_comparerChangedObservable is null && _resorter is null) @@ -51,13 +50,15 @@ public IObservable> Run() => Observable.Create result is not null).Select(x => x!).SubscribeSafe(observer); } - var comparerChanged = (_comparerChangedObservable ?? Observable.Never>()).SynchronizeSafe(queue).Select(sorter.Sort); - - var sortAgain = (_resorter ?? Observable.Never()).SynchronizeSafe(queue).Select(_ => sorter.Sort()); - - var dataChanged = _source.SynchronizeSafe(queue).Select(sorter.Sort); - - return new CompositeDisposable(comparerChanged.UnsynchronizedMerge(dataChanged, sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue); + var comparerSource = _comparerChangedObservable ?? Observable.Never>(); + var resorterSource = _resorter ?? Observable.Never(); + return DeliveryQueueMergeExtensions.DeliveryQueueMerge, IChangeSet, Unit, ISortedChangeSet?>( + comparerSource, sorter.Sort, + _source, sorter.Sort, + resorterSource, _ => sorter.Sort()) + .Where(result => result is not null) + .Select(x => x!) + .SubscribeSafe(observer); }); private sealed class Sorter(SortOptimisations optimisations, IComparer? comparer = null, int resetThreshold = -1) diff --git a/src/DynamicData/Cache/Internal/Virtualise.cs b/src/DynamicData/Cache/Internal/Virtualise.cs index 9e858f5d..3ed1e79c 100644 --- a/src/DynamicData/Cache/Internal/Virtualise.cs +++ b/src/DynamicData/Cache/Internal/Virtualise.cs @@ -19,11 +19,12 @@ public IObservable> Run() => Observable.Create< observer => { var virtualiser = new Virtualiser(); - var queue = new SharedDeliveryQueue(); - - var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise).Where(x => x is not null).Select(x => x!); - var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update).Where(x => x is not null).Select(x => x!); - return new CompositeDisposable(request.UnsynchronizedMerge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); + return DeliveryQueueMergeExtensions.DeliveryQueueMerge, IVirtualChangeSet?>( + _virtualRequests, virtualiser.Virtualise, + _source, virtualiser.Update) + .Where(updates => updates is not null) + .Select(x => x!) + .SubscribeSafe(observer); }); private sealed class Virtualiser(VirtualRequest? request = null) diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs new file mode 100644 index 00000000..6783d3bc --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -0,0 +1,122 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Disposables; +using System.Reactive.Linq; + +namespace DynamicData.Internal; + +/// +/// Provides DeliveryQueueMerge extension methods, which combine the +/// serialization step and the gate-free merge step +/// from into a single +/// operator. +/// +/// +/// The motivation is the cross-cache deadlock that operators like Page, +/// Virtualise, AutoRefresh, Sort, GroupWithImmutableState, and +/// QueryWhenChanged all needed to solve: route every input through a single +/// so downstream delivery is serialized without holding +/// any operator-level lock, then merge the serialized inputs without re-introducing an +/// Observable.Merge gate that would reconstruct the ABBA cycle. +/// Each overload creates and owns its own , wraps +/// every source through , +/// and finally combines the wrapped streams with +/// . +/// The returned tears down the merge subscription before +/// the queue, so any final terminal notification still flows through the still-active queue. +/// The heterogeneous overloads accept a projection per input. Each projection is +/// applied inside the queue's drain (via SynchronizeSafe(queue).Select(projection)), +/// so stateful projections (paginators, sorters, virtualisers, group trackers) see a single, +/// serialized stream of invocations and may safely mutate operator-private state without +/// additional locking. +/// +internal static class DeliveryQueueMergeExtensions +{ + /// + /// Merges with after routing every + /// source through a single . + /// + /// The element type, common to every input. + /// The first input observable. + /// Additional input observables. + /// An observable that emits items from every input, serialized through a shared queue. + public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var queue = new SharedDeliveryQueue(); + var firstSync = first.SynchronizeSafe(queue); + var othersSync = new IObservable[others.Length]; + for (var i = 0; i < others.Length; i++) + { + othersSync[i] = others[i].SynchronizeSafe(queue); + } + + return new CompositeDisposable( + firstSync.UnsynchronizedMerge(othersSync).SubscribeSafe(observer), + queue); + }); + + /// + /// Merges two inputs of different element types into a single observable of + /// by routing each input through a single shared and applying its + /// projection inside the drain. + /// + /// Element type of the first input. + /// Element type of the second input. + /// Common output element type produced by both projections. + /// The first input observable. + /// Projection applied to each element from . + /// The second input observable. + /// Projection applied to each element from . + /// An observable emitting the projected items from both inputs, serialized through a shared queue. + public static IObservable DeliveryQueueMerge( + IObservable first, + Func firstProjection, + IObservable second, + Func secondProjection) => + Observable.Create(observer => + { + var queue = new SharedDeliveryQueue(); + var firstProjected = first.SynchronizeSafe(queue).Select(firstProjection); + var secondProjected = second.SynchronizeSafe(queue).Select(secondProjection); + + return new CompositeDisposable( + firstProjected.UnsynchronizedMerge(secondProjected).SubscribeSafe(observer), + queue); + }); + + /// + /// Three-input variant of the heterogeneous overload. + /// + /// Element type of the first input. + /// Element type of the second input. + /// Element type of the third input. + /// Common output element type produced by all projections. + /// The first input observable. + /// Projection applied to each element from . + /// The second input observable. + /// Projection applied to each element from . + /// The third input observable. + /// Projection applied to each element from . + /// An observable emitting the projected items from all three inputs, serialized through a shared queue. + public static IObservable DeliveryQueueMerge( + IObservable first, + Func firstProjection, + IObservable second, + Func secondProjection, + IObservable third, + Func thirdProjection) => + Observable.Create(observer => + { + var queue = new SharedDeliveryQueue(); + var firstProjected = first.SynchronizeSafe(queue).Select(firstProjection); + var secondProjected = second.SynchronizeSafe(queue).Select(secondProjection); + var thirdProjected = third.SynchronizeSafe(queue).Select(thirdProjection); + + return new CompositeDisposable( + firstProjected.UnsynchronizedMerge(secondProjected, thirdProjected).SubscribeSafe(observer), + queue); + }); +} \ No newline at end of file From f4e983ad5d853a2e2bf2dc94cbdad893d88d9eda Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 10:49:57 -0700 Subject: [PATCH 4/9] Scope DeliveryQueueMerge to AutoRefresh's same-type case The heterogeneous DeliveryQueueMerge overloads pushed too much into each call site to read like idiomatic Rx, and at five of the six operators the projections had to run inside the shared delivery queue to preserve Rx semantics, which the operator-level signature could not express without exposing the queue type to the caller. Keep the same-type extension overload only: public static IObservable DeliveryQueueMerge( this IObservable first, params IObservable[] others) This reads as a drop-in for Observable.Merge at AutoRefresh's call site, which is the only place all inputs are already the same type and need no per-input projection inside the drain. Page, Virtualise, Sort, GroupOnImmutable, and QueryWhenChanged keep the explicit SharedDeliveryQueue + SynchronizeSafe(queue) + UnsynchronizedMerge shape introduced earlier in this branch. Each call site shows the queue plumbing because the projections must execute inside the drain; making that visible matches the rest of the code in the file. --- .../Cache/Internal/GroupOnImmutable.cs | 12 ++- src/DynamicData/Cache/Internal/Page.cs | 10 +- .../Cache/Internal/QueryWhenChanged.cs | 19 ++-- src/DynamicData/Cache/Internal/Sort.cs | 17 ++-- src/DynamicData/Cache/Internal/Virtualise.cs | 11 +-- .../Internal/DeliveryQueueMergeExtensions.cs | 98 +++---------------- 6 files changed, 50 insertions(+), 117 deletions(-) diff --git a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs index c9b776b1..fc6f3b16 100644 --- a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs @@ -22,12 +22,14 @@ internal sealed class GroupOnImmutable(IObservable> Run() => Observable.Create>( observer => { + var queue = new SharedDeliveryQueue(); var grouper = new Grouper(_groupSelectorKey); - return DeliveryQueueMergeExtensions.DeliveryQueueMerge, Unit, IImmutableGroupChangeSet>( - _source, grouper.Update, - _regrouper, _ => grouper.Regroup()) - .Where(changes => changes.Count != 0) - .SubscribeSafe(observer); + + var groups = _source.SynchronizeSafe(queue).Select(grouper.Update).Where(changes => changes.Count != 0); + + var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); + + return new CompositeDisposable(groups.UnsynchronizedMerge(regroup).SubscribeSafe(observer), queue); }); private sealed class Grouper(Func groupSelectorKey) diff --git a/src/DynamicData/Cache/Internal/Page.cs b/src/DynamicData/Cache/Internal/Page.cs index f707b657..2d91ac29 100644 --- a/src/DynamicData/Cache/Internal/Page.cs +++ b/src/DynamicData/Cache/Internal/Page.cs @@ -14,13 +14,15 @@ internal sealed class Page(IObservable> Run() => Observable.Create>( observer => { + var queue = new SharedDeliveryQueue(); var paginator = new Paginator(); - return DeliveryQueueMergeExtensions.DeliveryQueueMerge, IPagedChangeSet?>( - pageRequests, paginator.Paginate, - source, paginator.Update) + var request = pageRequests.SynchronizeSafe(queue).Select(paginator.Paginate); + var dataChange = source.SynchronizeSafe(queue).Select(paginator.Update); + + return new CompositeDisposable(request.UnsynchronizedMerge(dataChange) .Where(updates => updates is not null) .Select(x => x!) - .SubscribeSafe(observer); + .SubscribeSafe(observer), queue); }); private sealed class Paginator diff --git a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs index 11dc5bdf..7e976424 100644 --- a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs +++ b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs @@ -34,21 +34,22 @@ public IObservable> Run() return Observable.Create>(observer => { + var queue = new SharedDeliveryQueue(); var state = new Cache(); var shared = _source.Publish(); - var merged = DeliveryQueueMergeExtensions.DeliveryQueueMerge, TValue, IQuery>( - shared, - changes => + var inlineChange = shared.MergeMany(itemChangedTrigger).SynchronizeSafe(queue).Select(_ => new AnonymousQuery(state)); + + var sourceChanged = shared.SynchronizeSafe(queue).Scan( + state, + (cache, changes) => { - state.Clone(changes); - return new AnonymousQuery(state); - }, - shared.MergeMany(itemChangedTrigger), - _ => new AnonymousQuery(state)); + cache.Clone(changes); + return cache; + }).Select(list => new AnonymousQuery(list)); - return new CompositeDisposable(merged.SubscribeSafe(observer), shared.Connect()); + return new CompositeDisposable(sourceChanged.UnsynchronizedMerge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); }); } } diff --git a/src/DynamicData/Cache/Internal/Sort.cs b/src/DynamicData/Cache/Internal/Sort.cs index 4d3c2495..6afb9632 100644 --- a/src/DynamicData/Cache/Internal/Sort.cs +++ b/src/DynamicData/Cache/Internal/Sort.cs @@ -43,6 +43,7 @@ public IObservable> Run() => Observable.Create { var sorter = new Sorter(_sortOptimisations, _comparer, _resetThreshold); + var queue = new SharedDeliveryQueue(); // check for nulls so we can prevent a lock when not required if (_comparerChangedObservable is null && _resorter is null) @@ -50,15 +51,13 @@ public IObservable> Run() => Observable.Create result is not null).Select(x => x!).SubscribeSafe(observer); } - var comparerSource = _comparerChangedObservable ?? Observable.Never>(); - var resorterSource = _resorter ?? Observable.Never(); - return DeliveryQueueMergeExtensions.DeliveryQueueMerge, IChangeSet, Unit, ISortedChangeSet?>( - comparerSource, sorter.Sort, - _source, sorter.Sort, - resorterSource, _ => sorter.Sort()) - .Where(result => result is not null) - .Select(x => x!) - .SubscribeSafe(observer); + var comparerChanged = (_comparerChangedObservable ?? Observable.Never>()).SynchronizeSafe(queue).Select(sorter.Sort); + + var sortAgain = (_resorter ?? Observable.Never()).SynchronizeSafe(queue).Select(_ => sorter.Sort()); + + var dataChanged = _source.SynchronizeSafe(queue).Select(sorter.Sort); + + return new CompositeDisposable(comparerChanged.UnsynchronizedMerge(dataChanged, sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue); }); private sealed class Sorter(SortOptimisations optimisations, IComparer? comparer = null, int resetThreshold = -1) diff --git a/src/DynamicData/Cache/Internal/Virtualise.cs b/src/DynamicData/Cache/Internal/Virtualise.cs index 3ed1e79c..9e858f5d 100644 --- a/src/DynamicData/Cache/Internal/Virtualise.cs +++ b/src/DynamicData/Cache/Internal/Virtualise.cs @@ -19,12 +19,11 @@ public IObservable> Run() => Observable.Create< observer => { var virtualiser = new Virtualiser(); - return DeliveryQueueMergeExtensions.DeliveryQueueMerge, IVirtualChangeSet?>( - _virtualRequests, virtualiser.Virtualise, - _source, virtualiser.Update) - .Where(updates => updates is not null) - .Select(x => x!) - .SubscribeSafe(observer); + var queue = new SharedDeliveryQueue(); + + var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise).Where(x => x is not null).Select(x => x!); + var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update).Where(x => x is not null).Select(x => x!); + return new CompositeDisposable(request.UnsynchronizedMerge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); }); private sealed class Virtualiser(VirtualRequest? request = null) diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs index 6783d3bc..720189f8 100644 --- a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -8,35 +8,27 @@ namespace DynamicData.Internal; /// -/// Provides DeliveryQueueMerge extension methods, which combine the -/// serialization step and the gate-free merge step -/// from into a single -/// operator. +/// Provides DeliveryQueueMerge, an Rx extension method that combines +/// the serialization step and the gate-free +/// merge step from +/// into a single operator. /// /// -/// The motivation is the cross-cache deadlock that operators like Page, -/// Virtualise, AutoRefresh, Sort, GroupWithImmutableState, and -/// QueryWhenChanged all needed to solve: route every input through a single -/// so downstream delivery is serialized without holding -/// any operator-level lock, then merge the serialized inputs without re-introducing an -/// Observable.Merge gate that would reconstruct the ABBA cycle. -/// Each overload creates and owns its own , wraps -/// every source through , -/// and finally combines the wrapped streams with -/// . -/// The returned tears down the merge subscription before -/// the queue, so any final terminal notification still flows through the still-active queue. -/// The heterogeneous overloads accept a projection per input. Each projection is -/// applied inside the queue's drain (via SynchronizeSafe(queue).Select(projection)), -/// so stateful projections (paginators, sorters, virtualisers, group trackers) see a single, -/// serialized stream of invocations and may safely mutate operator-private state without -/// additional locking. +/// Use this when every input is already of the same element type and no per-input +/// projection is needed before the merge; the operator owns the queue lifecycle so the +/// call site reads like an ordinary . +/// When the inputs have different element types or require operator-private projections +/// invoked inside the queue's drain, use +/// and directly so the +/// projections sit inside the serialized section. /// internal static class DeliveryQueueMergeExtensions { /// /// Merges with after routing every - /// source through a single . + /// source through a single . Drop-in alternative to + /// for cross-cache + /// pipelines where the Rx Merge gate would risk an ABBA cycle. /// /// The element type, common to every input. /// The first input observable. @@ -57,66 +49,4 @@ public static IObservable DeliveryQueueMerge(this IObservable first, pa firstSync.UnsynchronizedMerge(othersSync).SubscribeSafe(observer), queue); }); - - /// - /// Merges two inputs of different element types into a single observable of - /// by routing each input through a single shared and applying its - /// projection inside the drain. - /// - /// Element type of the first input. - /// Element type of the second input. - /// Common output element type produced by both projections. - /// The first input observable. - /// Projection applied to each element from . - /// The second input observable. - /// Projection applied to each element from . - /// An observable emitting the projected items from both inputs, serialized through a shared queue. - public static IObservable DeliveryQueueMerge( - IObservable first, - Func firstProjection, - IObservable second, - Func secondProjection) => - Observable.Create(observer => - { - var queue = new SharedDeliveryQueue(); - var firstProjected = first.SynchronizeSafe(queue).Select(firstProjection); - var secondProjected = second.SynchronizeSafe(queue).Select(secondProjection); - - return new CompositeDisposable( - firstProjected.UnsynchronizedMerge(secondProjected).SubscribeSafe(observer), - queue); - }); - - /// - /// Three-input variant of the heterogeneous overload. - /// - /// Element type of the first input. - /// Element type of the second input. - /// Element type of the third input. - /// Common output element type produced by all projections. - /// The first input observable. - /// Projection applied to each element from . - /// The second input observable. - /// Projection applied to each element from . - /// The third input observable. - /// Projection applied to each element from . - /// An observable emitting the projected items from all three inputs, serialized through a shared queue. - public static IObservable DeliveryQueueMerge( - IObservable first, - Func firstProjection, - IObservable second, - Func secondProjection, - IObservable third, - Func thirdProjection) => - Observable.Create(observer => - { - var queue = new SharedDeliveryQueue(); - var firstProjected = first.SynchronizeSafe(queue).Select(firstProjection); - var secondProjected = second.SynchronizeSafe(queue).Select(secondProjection); - var thirdProjected = third.SynchronizeSafe(queue).Select(thirdProjection); - - return new CompositeDisposable( - firstProjected.UnsynchronizedMerge(secondProjected, thirdProjected).SubscribeSafe(observer), - queue); - }); } \ No newline at end of file From 0cab88e42e0d03f69b8e48ab81d8db0bd3e3afbd Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 11:25:43 -0700 Subject: [PATCH 5/9] Exercise subject-driven branches in DeadlockTortureTest Tests with subject inputs (Page, Virtualise, BatchIf, TransformWithForce, AllDangerous_Stacked, MultiplePairs) created the subject but nothing ever called OnNext on it. The bidirectional source writes still flowed through the operator's Merge gate, so the original deadlock was triggered, but the operator's subject-driven branch (refresher, request changes, pause toggle) was never invoked during the race. A regression that broke only that branch would not be caught. Add an optional subjectPusher callback to RunBidirectionalDeadlockTest that runs on a third worker thread, gated by the same Barrier as the two writer threads, and have each subject-bearing test push its own pattern on the subject while sources are writing. For the Page/Virtualise/BatchIf inline subjects in MultiplePairs, lift them to named locals so they can be referenced from the pusher closure. Also collapse the vertical layout introduced in the previous commits for DeliveryQueueMerge's CompositeDisposable construction and the UnsynchronizedMerge OnCompleted predicate. --- .../Cache/DeadlockTortureTest.cs | 65 ++++++++++++++----- .../Internal/DeliveryQueueMergeExtensions.cs | 6 +- .../Internal/SynchronizeSafeExtensions.cs | 3 +- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index 8576d34c..f71971b4 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -34,6 +34,7 @@ public sealed class DeadlockTortureTest private static async Task RunBidirectionalDeadlockTest( Func>, IObservable>> pipeline, + Action? subjectPusher = null, int iterations = Iterations) { for (var iter = 0; iter < iterations; iter++) @@ -44,11 +45,13 @@ private static async Task RunBidirectionalDeadlockTest( using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB); using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA); - using var barrier = new Barrier(2); + var participants = subjectPusher is null ? 2 : 3; + using var barrier = new Barrier(participants); var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + var taskC = subjectPusher is null ? null : Task.Run(() => { barrier.SignalAndWait(); subjectPusher(); }); - var completed = Task.WhenAll(taskA, taskB); + var completed = taskC is null ? Task.WhenAll(taskA, taskB) : Task.WhenAll(taskA, taskB, taskC); if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed) return false; } @@ -70,13 +73,17 @@ [Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() => [Fact] public async Task Page_DoesNotDeadlock() { using var req = new BehaviorSubject(new PageRequest(1, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); } [Fact] public async Task Virtualise_DoesNotDeadlock() { using var req = new BehaviorSubject(new VirtualRequest(0, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); } [Fact] public async Task QueryWhenChanged_DoesNotDeadlock() @@ -110,11 +117,18 @@ [Fact] public async Task QueryWhenChanged_DoesNotDeadlock() [Fact] public async Task TransformWithForce_DoesNotDeadlock() { using var force = new Subject>(); - (await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (p, _) => true); })).Should().BeTrue(); } - [Fact] public async Task BatchIf_DoesNotDeadlock() => - (await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null))).Should().BeTrue(); + [Fact] public async Task BatchIf_DoesNotDeadlock() + { + using var pause = new BehaviorSubject(false); + (await RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); })).Should().BeTrue(); + } [Fact] public async Task DisposeMany_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue(); @@ -138,6 +152,15 @@ [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() .Sort(SortExpressionComparer.Ascending(p => p.Age)) .Virtualise(virtReq) .Page(pageReq), + subjectPusher: () => + { + for (var j = 0; j < ItemCount; j++) + { + force.OnNext(static (p, _) => true); + pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50)); + virtReq.OnNext(new VirtualRequest(j * 5, 50 + (j % 4) * 50)); + } + }, iterations: Iterations * 2)).Should().BeTrue(); } @@ -145,16 +168,26 @@ [Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 50)); using var virtReq = new BehaviorSubject(new VirtualRequest(0, 50)); + using var pause = new BehaviorSubject(false); var results = await Task.WhenAll( - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), 30), - RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30), - RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30), - RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), 30), - RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30), - RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), 30), - RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null), 30)); + RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), iterations: 30), + RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), iterations: 30), + RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), iterations: 30), + RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), iterations: 30), + RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), iterations: 30), + RunBidirectionalDeadlockTest(s => s.DisposeMany(), iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); }, + iterations: 30)); results.Should().AllSatisfy(r => r.Should().BeTrue()); } diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs index 720189f8..34e649ea 100644 --- a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -45,8 +45,6 @@ public static IObservable DeliveryQueueMerge(this IObservable first, pa othersSync[i] = others[i].SynchronizeSafe(queue); } - return new CompositeDisposable( - firstSync.UnsynchronizedMerge(othersSync).SubscribeSafe(observer), - queue); + return new CompositeDisposable(firstSync.UnsynchronizedMerge(othersSync).SubscribeSafe(observer), queue); }); -} \ No newline at end of file +} diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index e20131e1..283dfff5 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -133,8 +133,7 @@ void OnErrorSafe(Exception error) void OnCompletedSafe() { - if (Interlocked.Decrement(ref pending) == 0 && - Interlocked.Exchange(ref terminated, 1) == 0) + if (Interlocked.Decrement(ref pending) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) { observer.OnCompleted(); } From 9d526a36da808f1b6e0dd2ebb933189ea0b92dbd Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 13:20:31 -0700 Subject: [PATCH 6/9] Use typed DeliveryQueue for DeliveryQueueMerge Every input has the same element type T, so the type-erased SharedDeliveryQueue with its per-source DeliverySubQueue wrappers was carrying machinery (bitset, sub-queue list, type-erased StageNext/ DeliverStaged dispatch) that the same-type merge never used. Replace the implementation with one DeliveryQueue and per-source Observer.Create instances: - OnNext: forwarded directly to queue.OnNext. The queue's gate serializes concurrent calls from multiple producers; the drain delivers items in arrival order outside the lock, so a downstream observer that walks into another cache's writer lock cannot deadlock with this serialization point. - OnError: forwarded directly to queue.OnError. The queue marks itself terminated at the first error reaching the drain, so a second concurrent error from another source is dropped at enqueue and the downstream observer sees OnError exactly once. - OnCompleted: counter-gated; only the last surviving source's completion calls queue.OnCompleted, matching Observable.Merge's all-must-complete semantic. If a source has already errored, the queue is terminated and the eventual OnCompleted at the counter's floor is dropped at enqueue. The per-source Observer.Create instance is required for the same reason it is in UnsynchronizedMerge: Rx's ObserverBase sets a one-shot stopped flag on the first OnCompleted/OnError, and a single shared observer would silently drop terminal notifications from every source after the first. AutoRefresh is the only consumer of DeliveryQueueMerge. All tests across AutoRefresh, DeadlockTortureTest, and CrossCacheDeadlockStressTest pass; deadlock fixture passes 5/5 at xUnit.MaxParallelThreads=16. --- .../Internal/DeliveryQueueMergeExtensions.cs | 73 +++++++++++++------ 1 file changed, 51 insertions(+), 22 deletions(-) diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs index 34e649ea..9adefaeb 100644 --- a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -2,49 +2,78 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.Internal; /// -/// Provides DeliveryQueueMerge, an Rx extension method that combines -/// the serialization step and the gate-free -/// merge step from -/// into a single operator. +/// Provides DeliveryQueueMerge, an Rx extension method that serializes the +/// notifications of every input through a single +/// and emits them on the downstream observer outside the queue's lock. /// /// -/// Use this when every input is already of the same element type and no per-input -/// projection is needed before the merge; the operator owns the queue lifecycle so the -/// call site reads like an ordinary . -/// When the inputs have different element types or require operator-private projections -/// invoked inside the queue's drain, use -/// and directly so the -/// projections sit inside the serialized section. +/// Drop-in alternative to +/// for cross-cache pipelines where the Rx Merge gate, held during downstream delivery, +/// would risk an ABBA cycle. serializes enqueues across +/// concurrent producers but releases its gate before delivering, so a downstream +/// observer that walks into another cache's writer lock cannot deadlock with this +/// operator's serialization point. +/// Every input must share the same element type. When the inputs have different +/// element types or require operator-private projections invoked inside the queue's +/// drain, use +/// with a and finish with +/// . /// internal static class DeliveryQueueMergeExtensions { /// - /// Merges with after routing every - /// source through a single . Drop-in alternative to - /// for cross-cache - /// pipelines where the Rx Merge gate would risk an ABBA cycle. + /// Merges with by routing every + /// source through a single . Functionally equivalent + /// to : completes + /// only after every source completes; the first error terminates; subscription + /// occurs in argument order. /// /// The element type, common to every input. /// The first input observable. /// Additional input observables. - /// An observable that emits items from every input, serialized through a shared queue. + /// An observable that emits items from every input, serialized through the queue. public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) => Observable.Create(observer => { - var queue = new SharedDeliveryQueue(); - var firstSync = first.SynchronizeSafe(queue); - var othersSync = new IObservable[others.Length]; - for (var i = 0; i < others.Length; i++) + var queue = new DeliveryQueue(observer); + var totalSources = others.Length + 1; + var subscriptions = new CompositeDisposable(totalSources + 1); + var pending = totalSources; + + // Each source needs its own inner observer instance because Rx's ObserverBase + // sets a one-shot stopped flag on the first OnCompleted/OnError; a single shared + // observer would silently drop terminal notifications from every source after + // the first. OnNext and OnError forward straight to the queue (the queue's gate + // serializes concurrent calls); OnCompleted is counter-gated so only the last + // surviving source's completion terminates the merged stream. + IObserver CreateInner() => + Observer.Create( + queue.OnNext, + queue.OnError, + () => + { + if (Interlocked.Decrement(ref pending) == 0) + { + queue.OnCompleted(); + } + }); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) { - othersSync[i] = others[i].SynchronizeSafe(queue); + subscriptions.Add(source.SubscribeSafe(CreateInner())); } - return new CompositeDisposable(firstSync.UnsynchronizedMerge(othersSync).SubscribeSafe(observer), queue); + // Subscription first so any terminal notification produced during Rx's disposal + // cascade still flows through the still-active queue. Queue last as cleanup. + subscriptions.Add(queue); + return subscriptions; }); } From b362db91cb136fca762b039e1fc51161754463cc Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 13:43:43 -0700 Subject: [PATCH 7/9] Trim AllDangerous_Stacked pusher load to fit per-iteration timeout PR build failed AllDangerous_Stacked_DoNotDeadlock after 27s on a single iteration (the per-iteration TimeoutSeconds=15 budget was exceeded, then RunBidirectionalDeadlockTest returned false). It was not a deadlock; the pipeline was just doing too much work. Each force.OnNext in this test triggers TransformWithForcedTransform's refresher, which scans cache.KeyValues and emits a refresh changeset that flows through the full 9-operator stack (GroupWithImmutableState, TransformMany, AutoRefresh, Filter, Transform, OnItemRemoved, DisposeMany, Sort, Virtualise, Page). At ItemCount=200 pusher iterations with three subjects pushed per iteration (force, pageReq, virtReq), the pusher thread did ~600 push operations per iteration on top of the two writer threads' 200 source AddOrUpdates each. The other torture tests have a single-operator pipeline and one pusher and fit well within the budget; only the stacked case combines a heavy pipeline with three concurrent pushers. Reduce StackedPushCount to ItemCount/4 = 50, three subjects each. That keeps the subject branches under contention (still 150 pushes per iteration, still well above source-write rate) while bringing each iteration's worst case comfortably under TimeoutSeconds. The other subject-bearing tests are unchanged. --- src/DynamicData.Tests/Cache/DeadlockTortureTest.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index f71971b4..854ce050 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -141,6 +141,11 @@ [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); using var virtReq = new BehaviorSubject(new VirtualRequest(0, 100)); using var force = new Subject>(); + + // Stacked pipeline is heavy per notification; keep the per-subject pusher loop + // short so each iteration stays under TimeoutSeconds. + const int StackedPushCount = ItemCount / 4; + (await RunBidirectionalDeadlockTest( s => s.GroupWithImmutableState(p => p.Age % 3) .TransformMany(g => g.Items, p => p.UniqueKey) @@ -154,7 +159,7 @@ [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() .Page(pageReq), subjectPusher: () => { - for (var j = 0; j < ItemCount; j++) + for (var j = 0; j < StackedPushCount; j++) { force.OnNext(static (p, _) => true); pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50)); From 07e202f8aac2e88e7a59427d68f34ec56fd4856b Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 13:48:12 -0700 Subject: [PATCH 8/9] Raise DeadlockTortureTest per-iteration timeout to 60s Previous commit reduced the AllDangerous_Stacked pusher load to fit the 15s per-iteration budget on the CI runner. That was the wrong trade: the test is a torture test, and shaving load to match the slowest hardware costs coverage. The CI runners are deliberately stripped down; the test budget should account for them. Raise TimeoutSeconds from 15 to 60 across the fixture and restore the full ItemCount pusher loop in AllDangerous_Stacked. The timeout still catches an actual deadlock (which hangs forever, not 60s), and the extra budget covers worst-case scheduling on a small VM. --- src/DynamicData.Tests/Cache/DeadlockTortureTest.cs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index 854ce050..7d90302f 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -30,7 +30,7 @@ public sealed class DeadlockTortureTest { private const int ItemCount = 200; private const int Iterations = 50; - private const int TimeoutSeconds = 15; + private const int TimeoutSeconds = 60; private static async Task RunBidirectionalDeadlockTest( Func>, IObservable>> pipeline, @@ -141,11 +141,6 @@ [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); using var virtReq = new BehaviorSubject(new VirtualRequest(0, 100)); using var force = new Subject>(); - - // Stacked pipeline is heavy per notification; keep the per-subject pusher loop - // short so each iteration stays under TimeoutSeconds. - const int StackedPushCount = ItemCount / 4; - (await RunBidirectionalDeadlockTest( s => s.GroupWithImmutableState(p => p.Age % 3) .TransformMany(g => g.Items, p => p.UniqueKey) @@ -159,7 +154,7 @@ [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() .Page(pageReq), subjectPusher: () => { - for (var j = 0; j < StackedPushCount; j++) + for (var j = 0; j < ItemCount; j++) { force.OnNext(static (p, _) => true); pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50)); From 1b3a03db40282eb9f3ef184cda1388397e2d5f6a Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Thu, 28 May 2026 09:32:38 -0700 Subject: [PATCH 9/9] Apply UnsynchronizedMerge to SortAndPage/SortAndVirtualize and add focused helper coverage Address reviewer feedback on #1097. SortAndPage and SortAndVirtualize had the same shape that motivated the rest of this PR: three queue-serialized inputs combined with Observable.Merge, which reinstates the gate we removed elsewhere. Replace the Merge with UnsynchronizedMerge at both sites. SortAndPage drops the static Observable.Merge form for the extension-method form; SortAndVirtualize collapses chained .Merge().Merge() into a single UnsynchronizedMerge call, removing the second redundant gate too. DeadlockTortureTest now covers both new operators alongside the older Sort().Page() and Sort().Virtualise() forms. Each test pushes on its own subject during the race so the request branch of the merge fires under contention. MultiplePairs_Simultaneous_NoDeadlock gains two more parallel lanes (SortAndPage, SortAndVirtualize) wired through separate BehaviorSubjects so all four request streams are pushed concurrently. Add focused unit tests for the two helpers: UnsynchronizedMergeFixture covers the Rx Merge-compatible contract: arrival-order forwarding, all-must-complete OnCompleted, first-error-wins, late-terminal-after-error suppression, argument-order subscription, synchronous Empty/Throw sources at subscribe, and the no-others fallback. DeliveryQueueMergeFixture covers the same behavioural contract for the queue-backed variant plus a serialization check: two producers race 1000 items each through the merged stream while the observer asserts a max of one in-flight OnNext, with the full bag delivered exactly once. Verification: - 36/36 helper + DeadlockTortureTest pass in a single run. - DeadlockTortureTest 16/16 pass 5/5 consecutive runs at xUnit.MaxParallelThreads=16. - 422/422 affected operator tests pass. --- .../Cache/DeadlockTortureTest.cs | 26 ++ .../Internal/DeliveryQueueMergeFixture.cs | 232 ++++++++++++++++++ .../Internal/UnsynchronizedMergeFixture.cs | 175 +++++++++++++ src/DynamicData/Cache/Internal/SortAndPage.cs | 8 +- .../Cache/Internal/SortAndVirtualize.cs | 3 +- 5 files changed, 438 insertions(+), 6 deletions(-) create mode 100644 src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs create mode 100644 src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index 7d90302f..30d2dc70 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -78,6 +78,14 @@ [Fact] public async Task Page_DoesNotDeadlock() subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); } + [Fact] public async Task SortAndPage_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new PageRequest(1, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + [Fact] public async Task Virtualise_DoesNotDeadlock() { using var req = new BehaviorSubject(new VirtualRequest(0, 50)); @@ -86,6 +94,14 @@ [Fact] public async Task Virtualise_DoesNotDeadlock() subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); } + [Fact] public async Task SortAndVirtualize_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new VirtualRequest(0, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + [Fact] public async Task QueryWhenChanged_DoesNotDeadlock() { for (var iter = 0; iter < Iterations; iter++) @@ -167,7 +183,9 @@ [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() [Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 50)); + using var pageReq2 = new BehaviorSubject(new PageRequest(1, 50)); using var virtReq = new BehaviorSubject(new VirtualRequest(0, 50)); + using var virtReq2 = new BehaviorSubject(new VirtualRequest(0, 50)); using var pause = new BehaviorSubject(false); var results = await Task.WhenAll( RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), iterations: 30), @@ -180,10 +198,18 @@ [Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock() s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), pageReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq2.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), RunBidirectionalDeadlockTest( s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), virtReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq2.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), RunBidirectionalDeadlockTest( s => s.BatchIf(pause, false, (TimeSpan?)null), subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); }, diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs new file mode 100644 index 00000000..fe67508a --- /dev/null +++ b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs @@ -0,0 +1,232 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Verifies the Rx Merge-compatible terminal semantics and the queue's serialization guarantee +/// for concurrent producers. +/// +public sealed class DeliveryQueueMergeFixture +{ + [Fact] + public void OnNext_FromAllSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var received = new List(); + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + c.OnNext(3); + a.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterEverySourceCompletes() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse(); + + b.OnCompleted(); + completed.Should().BeFalse(); + + c.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void OnError_AfterFirstError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first); + } + + [Fact] + public void OnCompleted_AfterError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void SynchronousTerminal_AtSubscribe_IsCountedTowardCompletion() + { + var immediate = Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse(); + live.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_AtSubscribe_PropagatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public async Task ConcurrentOnNext_FromManyProducers_IsSerializedToObserver() + { + // The queue's contract is that the downstream observer never sees concurrent OnNext calls, + // regardless of how many producers are racing on the inputs. Subscribe to two sources via + // two concurrent tasks, push interleaved items, and verify that no two OnNext calls overlap + // and every item is delivered exactly once. + const int itemsPerProducer = 1_000; + + using var a = new Subject(); + using var b = new Subject(); + + var inFlight = 0; + var maxInFlight = 0; + var received = new ConcurrentQueue(); + + using var sub = a.DeliveryQueueMerge(b).Subscribe(v => + { + var now = Interlocked.Increment(ref inFlight); + var prev = Volatile.Read(ref maxInFlight); + while (now > prev && Interlocked.CompareExchange(ref maxInFlight, now, prev) != prev) + { + prev = Volatile.Read(ref maxInFlight); + } + received.Enqueue(v); + Interlocked.Decrement(ref inFlight); + }); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) a.OnNext(i); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) b.OnNext(itemsPerProducer + i); }); + + await Task.WhenAll(taskA, taskB); + + received.Count.Should().Be(itemsPerProducer * 2); + maxInFlight.Should().Be(1, "concurrent OnNext to the observer must be serialized by the queue"); + + var expected = Enumerable.Range(0, itemsPerProducer * 2).ToHashSet(); + received.Should().BeEquivalentTo(expected); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.DeliveryQueueMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void Dispose_StopsForwardingFromAnySource() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + var sub = a.DeliveryQueueMerge(b).Subscribe(received.Add); + + a.OnNext(1); + sub.Dispose(); + a.OnNext(2); + b.OnNext(3); + + received.Should().Equal(1); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.DeliveryQueueMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs new file mode 100644 index 00000000..85b95b8e --- /dev/null +++ b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs @@ -0,0 +1,175 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Reactive.Subjects; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Covers the contract the helper has to honour as a drop-in +/// replacement: subscription order, all-must-complete OnCompleted, first-error-wins OnError, and synchronous terminal +/// notifications. +/// +public sealed class UnsynchronizedMergeFixture +{ + [Fact] + public void OnNext_FromBothSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedMerge(b).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + a.OnNext(3); + b.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterAllSourcesComplete() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse("a single source completion must not terminate the merged stream"); + + b.OnCompleted(); + completed.Should().BeFalse("two of three completions still leave one source live"); + + c.OnCompleted(); + completed.Should().BeTrue("after every source has completed the merged stream must emit OnCompleted"); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException("first"); + b.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("OnCompleted must not fire after OnError"); + } + + [Fact] + public void OnError_AfterFirstError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first, "first error wins; subsequent errors from other sources must be dropped"); + } + + [Fact] + public void OnCompleted_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("a late OnCompleted from a surviving source must not arrive after OnError has fired"); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.UnsynchronizedMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void SynchronousTerminal_BeforeOtherSourcesSubscribe_IsHandled() + { + // A source that completes synchronously at subscribe time decrements the pending counter immediately. + // If the helper miscounted, the merged stream would either complete prematurely or never complete. + var immediate = System.Reactive.Linq.Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse("the live source has not completed yet"); + + live.OnCompleted(); + + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_BeforeOtherSourcesSubscribe_TerminatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = System.Reactive.Linq.Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + // Boundary: zero entries in the params array. Behaviour must mirror Observable.Merge over a single source. + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.UnsynchronizedMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData/Cache/Internal/SortAndPage.cs b/src/DynamicData/Cache/Internal/SortAndPage.cs index 2d612a9b..02ef996d 100644 --- a/src/DynamicData/Cache/Internal/SortAndPage.cs +++ b/src/DynamicData/Cache/Internal/SortAndPage.cs @@ -111,10 +111,10 @@ public IObservable>> Run() => return ApplyPagedChanges(changes); }); - return new CompositeDisposable(Observable.Merge( - comparerChanged.Skip(1), - paramsChanged.Where(changes => changes.Count is not 0), - dataChange.Where(changes => changes.Count is not 0)) + return new CompositeDisposable(comparerChanged.Skip(1) + .UnsynchronizedMerge( + paramsChanged.Where(changes => changes.Count is not 0), + dataChange.Where(changes => changes.Count is not 0)) .SubscribeSafe(observer), queue); ChangeSet> ApplyPagedChanges(IChangeSet? changeSet = null) diff --git a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs index 44c6d3dc..0e6ae550 100644 --- a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs +++ b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs @@ -113,8 +113,7 @@ public IObservable>> Run() => return new CompositeDisposable( comparerChanged - .Merge(paramsChanged) - .Merge(dataChange) + .UnsynchronizedMerge(paramsChanged, dataChange) .Where(changes => changes.Count is not 0) .SubscribeSafe(observer), queue);