Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 86 additions & 17 deletions src/DynamicData.Tests/Cache/DeadlockTortureTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public sealed class DeadlockTortureTest
{
private const int ItemCount = 200;
private const int Iterations = 50;
private const int TimeoutSeconds = 15;
private const int TimeoutSeconds = 60;

private static async Task<bool> RunBidirectionalDeadlockTest(
Func<IObservable<IChangeSet<Person, string>>, IObservable<IChangeSet<Person, string>>> pipeline,
Action? subjectPusher = null,
int iterations = Iterations)
{
for (var iter = 0; iter < iterations; iter++)
Expand All @@ -44,11 +45,13 @@ private static async Task<bool> RunBidirectionalDeadlockTest(
using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB);
using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA);

using var barrier = new Barrier(2);
var participants = subjectPusher is null ? 2 : 3;
using var barrier = new Barrier(participants);
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); });
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); });
var taskC = subjectPusher is null ? null : Task.Run(() => { barrier.SignalAndWait(); subjectPusher(); });

var completed = Task.WhenAll(taskA, taskB);
var completed = taskC is null ? Task.WhenAll(taskA, taskB) : Task.WhenAll(taskA, taskB, taskC);
if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed)
return false;
}
Expand All @@ -64,26 +67,68 @@ [Fact] public async Task AutoRefresh_DoesNotDeadlock() =>
[Fact] public async Task GroupOn_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue();

[Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey))).Should().BeTrue();

[Fact] public async Task Page_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(req))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task Virtualise_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task QueryWhenChanged_DoesNotDeadlock()
{
for (var iter = 0; iter < Iterations; iter++)
{
using var sourceA = new SourceCache<Person, string>(p => p.UniqueKey);
using var sourceB = new SourceCache<Person, string>(p => p.UniqueKey);

// QueryWhenChanged with an itemChangedTrigger exercises the Merge branch.
// A side-channel write into the other cache closes the same ABBA cycle that
// PopulateInto would close for changeset-shaped operators.
using var aToB = sourceA.Connect()
.Filter(p => p.Name.StartsWith("A"))
.QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age))
.Subscribe(_ => sourceB.AddOrUpdate(new Person("A-marker", 0)));
using var bToA = sourceB.Connect()
.Filter(p => p.Name.StartsWith("B"))
.QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age))
.Subscribe(_ => sourceA.AddOrUpdate(new Person("B-marker", 0)));

using var barrier = new Barrier(2);
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); });
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); });

var completed = Task.WhenAll(taskA, taskB);
(await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter);
}
}

[Fact] public async Task TransformWithForce_DoesNotDeadlock()
{
using var force = new Subject<Func<Person, string, bool>>();
(await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (p, _) => true); })).Should().BeTrue();
}

[Fact] public async Task BatchIf_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null))).Should().BeTrue();
[Fact] public async Task BatchIf_DoesNotDeadlock()
{
using var pause = new BehaviorSubject<bool>(false);
(await RunBidirectionalDeadlockTest(
s => s.BatchIf(pause, false, (TimeSpan?)null),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); })).Should().BeTrue();
}

[Fact] public async Task DisposeMany_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue();
Expand All @@ -94,31 +139,55 @@ [Fact] public async Task OnItemRemoved_DoesNotDeadlock() =>
[Fact] public async Task AllDangerous_Stacked_DoNotDeadlock()
{
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 100));
using var virtReq = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 100));
using var force = new Subject<Func<Person, string, bool>>();
(await RunBidirectionalDeadlockTest(
s => s.AutoRefresh(p => p.Age)
s => s.GroupWithImmutableState(p => p.Age % 3)
.TransformMany(g => g.Items, p => p.UniqueKey)
.AutoRefresh(p => p.Age)
.Filter(p => p.Age >= 0)
.Transform((p, k) => new Person("X-" + p.Name, p.Age), force)
.OnItemRemoved(_ => { })
.DisposeMany()
.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age))
.Virtualise(virtReq)
.Page(pageReq),
subjectPusher: () =>
{
for (var j = 0; j < ItemCount; j++)
{
force.OnNext(static (p, _) => true);
pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50));
virtReq.OnNext(new VirtualRequest(j * 5, 50 + (j % 4) * 50));
}
},
iterations: Iterations * 2)).Should().BeTrue();
}

[Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock()
{
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
using var virtReq = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
using var pause = new BehaviorSubject<bool>(false);
var results = await Task.WhenAll(
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)), 30),
RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30),
RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30),
RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30),
RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30),
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(pageReq), 30),
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(virtReq), 30),
RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null), 30));
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)), iterations: 30),
RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), iterations: 30),
RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), iterations: 30),
RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), iterations: 30),
RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), iterations: 30),
RunBidirectionalDeadlockTest(s => s.DisposeMany(), iterations: 30),
RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(pageReq),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(virtReq),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.BatchIf(pause, false, (TimeSpan?)null),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); },
iterations: 30));
results.Should().AllSatisfy(r => r.Should().BeTrue());
}

Expand Down
5 changes: 2 additions & 3 deletions src/DynamicData/Cache/Internal/AutoRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChange
changes.Buffer(buffer.Value, _scheduler).Where(list => list.Count > 0).Select(items => new ChangeSet<TObject, TKey>(items));

// publish refreshes and underlying changes
var queue = new SharedDeliveryQueue();
var publisher = shared.SynchronizeSafe(queue).Merge(refreshChanges.SynchronizeSafe(queue)).SubscribeSafe(observer);
var publisher = shared.DeliveryQueueMerge(refreshChanges).SubscribeSafe(observer);

return new CompositeDisposable(publisher, shared.Connect(), queue);
return new CompositeDisposable(publisher, shared.Connect());
});
}
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/GroupOnImmutable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public IObservable<IImmutableGroupChangeSet<TObject, TKey, TGroupKey>> Run() =>

var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0);

return new CompositeDisposable(groups.Merge(regroup).SubscribeSafe(observer), queue);
return new CompositeDisposable(groups.UnsynchronizedMerge(regroup).SubscribeSafe(observer), queue);
});

private sealed class Grouper(Func<TObject, TGroupKey> groupSelectorKey)
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/Page.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public IObservable<IPagedChangeSet<TObject, TKey>> Run() => Observable.Create<IP
var request = pageRequests.SynchronizeSafe(queue).Select(paginator.Paginate);
var dataChange = source.SynchronizeSafe(queue).Select(paginator.Update);

return new CompositeDisposable(request.Merge(dataChange)
return new CompositeDisposable(request.UnsynchronizedMerge(dataChange)
.Where(updates => updates is not null)
.Select(x => x!)
.SubscribeSafe(observer), queue);
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/QueryWhenChanged.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public IObservable<IQuery<TObject, TKey>> Run()
return cache;
}).Select(list => new AnonymousQuery<TObject, TKey>(list));

return new CompositeDisposable(sourceChanged.Merge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue);
return new CompositeDisposable(sourceChanged.UnsynchronizedMerge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue);
});
}
}
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/Sort.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public IObservable<ISortedChangeSet<TObject, TKey>> Run() => Observable.Create<I

var dataChanged = _source.SynchronizeSafe(queue).Select(sorter.Sort);

return new CompositeDisposable(comparerChanged.Merge(dataChanged).Merge(sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue);
return new CompositeDisposable(comparerChanged.UnsynchronizedMerge(dataChanged, sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue);
});

private sealed class Sorter(SortOptimisations optimisations, IComparer<TObject>? comparer = null, int resetThreshold = -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public IObservable<IChangeSet<TDestination, TKey>> Run() => Observable.Create<IC
// create change set of items where force refresh is applied
var refresher = forceTransform.SynchronizeSafe(queue).Select(selector => CaptureChanges(cache, selector)).Select(changes => new ChangeSet<TSource, TKey>(changes)).NotEmpty();

var sourceAndRefreshes = shared.Merge(refresher);
var sourceAndRefreshes = shared.UnsynchronizedMerge(refresher);

// do raw transform
var transform = new Transform<TDestination, TSource, TKey>(sourceAndRefreshes, transformFactory, exceptionCallback, true).Run();
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/Virtualise.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public IObservable<IVirtualChangeSet<TObject, TKey>> Run() => Observable.Create<

var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise).Where(x => x is not null).Select(x => x!);
var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update).Where(x => x is not null).Select(x => x!);
return new CompositeDisposable(request.Merge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue);
return new CompositeDisposable(request.UnsynchronizedMerge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue);
});

private sealed class Virtualiser(VirtualRequest? request = null)
Expand Down
79 changes: 79 additions & 0 deletions src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace DynamicData.Internal;

/// <summary>
/// Provides <c>DeliveryQueueMerge</c>, an Rx extension method that serializes the
/// notifications of every input through a single <see cref="DeliveryQueue{T}"/>
/// and emits them on the downstream observer outside the queue's lock.
/// </summary>
/// <remarks>
/// <para>Drop-in alternative to <see cref="Observable.Merge{TSource}(IObservable{TSource}[])"/>
/// for cross-cache pipelines where the Rx Merge gate, held during downstream delivery,
/// would risk an ABBA cycle. <see cref="DeliveryQueue{T}"/> serializes enqueues across
/// concurrent producers but releases its gate before delivering, so a downstream
/// observer that walks into another cache's writer lock cannot deadlock with this
/// operator's serialization point.</para>
/// <para>Every input must share the same element type. When the inputs have different
/// element types or require operator-private projections invoked inside the queue's
/// drain, use <see cref="SynchronizeSafeExtensions.SynchronizeSafe{T}(IObservable{T}, SharedDeliveryQueue)"/>
/// with a <see cref="SharedDeliveryQueue"/> and finish with
/// <see cref="SynchronizeSafeExtensions.UnsynchronizedMerge{T}(IObservable{T}, IObservable{T}[])"/>.</para>
/// </remarks>
internal static class DeliveryQueueMergeExtensions
{
/// <summary>
/// Merges <paramref name="first"/> with <paramref name="others"/> by routing every
/// source through a single <see cref="DeliveryQueue{T}"/>. Functionally equivalent
/// to <see cref="Observable.Merge{TSource}(IObservable{TSource}[])"/>: completes
/// only after every source completes; the first error terminates; subscription
/// occurs in argument order.
/// </summary>
/// <typeparam name="T">The element type, common to every input.</typeparam>
/// <param name="first">The first input observable.</param>
/// <param name="others">Additional input observables.</param>
/// <returns>An observable that emits items from every input, serialized through the queue.</returns>
public static IObservable<T> DeliveryQueueMerge<T>(this IObservable<T> first, params IObservable<T>[] others) =>
Observable.Create<T>(observer =>
{
var queue = new DeliveryQueue<T>(observer);
var totalSources = others.Length + 1;
var subscriptions = new CompositeDisposable(totalSources + 1);
var pending = totalSources;

// Each source needs its own inner observer instance because Rx's ObserverBase
// sets a one-shot stopped flag on the first OnCompleted/OnError; a single shared
// observer would silently drop terminal notifications from every source after
// the first. OnNext and OnError forward straight to the queue (the queue's gate
// serializes concurrent calls); OnCompleted is counter-gated so only the last
// surviving source's completion terminates the merged stream.
IObserver<T> CreateInner() =>
Observer.Create<T>(
queue.OnNext,
queue.OnError,
() =>
{
if (Interlocked.Decrement(ref pending) == 0)
{
queue.OnCompleted();
}
});

subscriptions.Add(first.SubscribeSafe(CreateInner()));
foreach (var source in others)
{
subscriptions.Add(source.SubscribeSafe(CreateInner()));
}

// Subscription first so any terminal notification produced during Rx's disposal
// cascade still flows through the still-active queue. Queue last as cleanup.
subscriptions.Add(queue);
return subscriptions;
});
}
2 changes: 1 addition & 1 deletion src/DynamicData/Internal/SharedDeliveryQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

Expand Down
Loading
Loading