From ef204b51e60baffa306b774a31f8c88ce20766a1 Mon Sep 17 00:00:00 2001 From: Jake Meiergerd Date: Fri, 29 May 2026 19:24:57 -0500 Subject: [PATCH] Rewrote testing for the cache variant of the AutoRefresh() and AutoRefreshOnObservable() operators, in accordance with #1014, and in a preliminary effort to resolve #1099. --- .../Cache/AutoRefreshFixture.Base.cs | 617 ++++++++++++++ ...AutoRefreshFixture.WithPropertyAccessor.cs | 71 ++ ...oRefreshFixture.WithoutPropertyAccessor.cs | 64 ++ .../Cache/AutoRefreshFixture.cs | 157 ++-- .../AutoRefreshOnObservableFixture.Base.cs | 777 ++++++++++++++++++ .../AutoRefreshOnObservableFixture.WithKey.cs | 34 + ...toRefreshOnObservableFixture.WithoutKey.cs | 34 + .../Cache/AutoRefreshOnObservableFixture.cs | 79 ++ 8 files changed, 1724 insertions(+), 109 deletions(-) create mode 100644 src/DynamicData.Tests/Cache/AutoRefreshFixture.Base.cs create mode 100644 src/DynamicData.Tests/Cache/AutoRefreshFixture.WithPropertyAccessor.cs create mode 100644 src/DynamicData.Tests/Cache/AutoRefreshFixture.WithoutPropertyAccessor.cs create mode 100644 src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.Base.cs create mode 100644 src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithKey.cs create mode 100644 src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithoutKey.cs create mode 100644 src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.cs diff --git a/src/DynamicData.Tests/Cache/AutoRefreshFixture.Base.cs b/src/DynamicData.Tests/Cache/AutoRefreshFixture.Base.cs new file mode 100644 index 000000000..ebdc26950 --- /dev/null +++ b/src/DynamicData.Tests/Cache/AutoRefreshFixture.Base.cs @@ -0,0 +1,617 @@ +using System; +using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using Microsoft.Reactive.Testing; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class AutoRefreshFixture +{ + public abstract class Base + { + [Fact] + public void ChangeSetBufferIsGiven_PropertyChangedNotificationsAreBufferedOnScheduler() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + var scheduler = new TestScheduler(); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + changeSetBuffer: TimeSpan.FromSeconds(10), + scheduler: scheduler) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish property change notification) + ++item2.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the property change notification should have been buffered"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, within buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the buffer window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, to buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(10).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "a buffer window expired"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 published a property change notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish property change notification) + ++item1.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the property change notification should have been buffered"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, within buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(15).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the buffer window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish additional property change notification) + ++item3.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the property change notification should have been buffered"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, to buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(20).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "a buffer window expired"); + results.RecordedChangeSets.Skip(2).First().Count.Should().Be(2, "2 items published a property change notification"); + results.RecordedChangeSets.Skip(2).First().Refreshes.Should().Be(2, "2 items published a property change notification"); + results.RecordedChangeSets.Skip(2).First().Select(change => change.Current).Should().BeEquivalentTo(new[] { item1, item3 }, "items #2 and #3 published property change notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (normal refresh) + source.Refresh(item2); + + // Normal refreshes should not be buffered + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "one source operation was performed"); + results.RecordedChangeSets.Skip(3).First().Count.Should().Be(1, "1 item was refreshed, within the source"); + results.RecordedChangeSets.Skip(3).First().Refreshes.Should().Be(1, "1 item was refreshed, within the source"); + results.RecordedChangeSets.Skip(3).First().First().Current.Should().Be(item2, "item #2 was refreshed, within the source"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemIsAdded_SubscribesToPropertyChanged() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + // UUT Initialization + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + item1.HasSubscriptions.Should().BeTrue("the PropertyChanged event should be subscribed to, for each added item"); + item2.HasSubscriptions.Should().BeTrue("the PropertyChanged event should be subscribed to, for each added item"); + item3.HasSubscriptions.Should().BeTrue("the PropertyChanged event should be subscribed to, for each added item"); + } + + [Fact] + public void ItemIsMoved_NotificationPropagates() + { + // Setup + using var source = new Subject>(); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + var items = new [] { item1, item2, item3 }; + + var initialChangeset = new ChangeSet() + { + new Change(reason: ChangeReason.Add, key: item1.Id, current: item1, index: 0), + new Change(reason: ChangeReason.Add, key: item2.Id, current: item2, index: 1), + new Change(reason: ChangeReason.Add, key: item3.Id, current: item3, index: 2) + }; + + // UUT Initialization + using var subscription = BuildUut(source.Prepend(initialChangeset)) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "3 items were added to the source"); + results.RecordedItemsSorted.Should().BeEquivalentTo( + items, + options => options.WithStrictOrdering(), + "item indexes should propagate"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.OnNext(new ChangeSet() + { + new Change( + key: item3.Id, + current: item3, + currentIndex: 0, + previousIndex: 2) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "an item was moved within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + results.RecordedItemsSorted.Should().BeEquivalentTo( + new[] { item3, item1, item2 }, + options => options.WithStrictOrdering(), + "an item was moved within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemIsRefreshed_NotificationPropagates() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + // UUT Action + source.Refresh(item2); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item was refreshed within the source"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item was refreshed within the source"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 was refreshed within the source"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items were changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemIsRemoved_UnsubscribesFromPropertyChanged() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Remove(item2); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "1 item was removed from the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + item2.HasSubscriptions.Should().BeFalse("removing an item should trigger unsubscription from its reevaluator"); + item1.HasSubscriptions.Should().BeTrue("the item was not removed from the source"); + item3.HasSubscriptions.Should().BeTrue("the item was not removed from the source"); + } + + [Fact] + public void ItemIsUpdated_ReSubscribesToPropertyChanged() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + var item4 = new Item() { Id = 2 }; + source.AddOrUpdate(item4); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "1 item was replaced within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + item2.HasSubscriptions.Should().BeFalse("replacing an item should trigger unsubscription from its reevaluator"); + item4.HasSubscriptions.Should().BeTrue("adding an item should invoke its reevaluator and subscribe to it"); + item1.HasSubscriptions.Should().BeTrue("the item was not removed from the source"); + item3.HasSubscriptions.Should().BeTrue("the item was not removed from the source"); + } + + [Fact] + public void PropertyChangedOccurs_ItemRefreshes() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + ++item2.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 published a property change notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void PropertyChangeThrottleIsGiven_PropertyChangedNotificationsAreThrottledByScheduler() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + var scheduler = new TestScheduler(); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + propertyChangeThrottle: TimeSpan.FromSeconds(10), + scheduler: scheduler) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish property change notification) + ++item2.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the throttle window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish additional property change notification, immediately) + ++item2.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the throttle window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time to end of throttle window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(10).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "the throttle window ended"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item published property change notifications"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item published property change notifications"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 published property change notifications"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish property change notification) + ++item2.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the throttle window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish additional property change notification, within throttle window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(15).Ticks); + ++item2.Value; + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the throttle window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time to end of original throttle window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(20).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the throttle window should have been extended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time to end of throttle window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(25).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "the throttle window ended"); + results.RecordedChangeSets.Skip(2).First().Count.Should().Be(1, "1 item published property change notifications"); + results.RecordedChangeSets.Skip(2).First().Refreshes.Should().Be(1, "1 item published property change notifications"); + results.RecordedChangeSets.Skip(2).First().First().Current.Should().Be(item2, "item #2 published property change notifications"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void SourceCompletesWhenEmpty_CompletionPropagates(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization & Action + if (notificationStrategy is NotificationStrategy.Immediate) + source.Complete(); + + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all notification sources have completed"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void SourceCompletesWhenNotEmpty_CompletionDoesNotPropagate(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization & Action (source completion) + if (notificationStrategy is NotificationStrategy.Immediate) + source.Complete(); + + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("PropertyChanged events can still publish notifications"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void SourceFails_ErrorPropagates(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + var error = new Exception("Test"); + + + // UUT Initialization & Action + if (notificationStrategy is NotificationStrategy.Immediate) + source.SetError(error); + + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + source.SetError(error); + + results.Error.Should().Be(error, "upstream errors should propagate downstream"); + if (notificationStrategy is NotificationStrategy.Immediate) + results.RecordedChangeSets.Should().BeEmpty("an error occurred before the initial changeset"); + else + { + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + } + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => BuildUut(source: null!)) + .Should() + .Throw(); + + [Fact] + public void SubscriptionIsDisposed_SubscriptionDisposalPropagates() + { + // Setup + using var source = new Subject>(); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + var initialChangeset = new ChangeSet() + { + new Change(reason: ChangeReason.Add, key: item1.Id, current: item1), + new Change(reason: ChangeReason.Add, key: item2.Id, current: item2), + new Change(reason: ChangeReason.Add, key: item3.Id, current: item3) + }; + + + // UUT Initialization + using var subscription = BuildUut(source.Prepend(initialChangeset)) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + subscription.Dispose(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + source.HasObservers.Should().BeFalse("subscription disposal should propagate"); + item1.HasSubscriptions.Should().BeFalse("subscription disposal should propagate"); + item2.HasSubscriptions.Should().BeFalse("subscription disposal should propagate"); + item3.HasSubscriptions.Should().BeFalse("subscription disposal should propagate"); + } + + protected abstract IObservable> BuildUut( + IObservable> source, + TimeSpan? changeSetBuffer = null, + TimeSpan? propertyChangeThrottle = null, + IScheduler? scheduler = null); + } +} diff --git a/src/DynamicData.Tests/Cache/AutoRefreshFixture.WithPropertyAccessor.cs b/src/DynamicData.Tests/Cache/AutoRefreshFixture.WithPropertyAccessor.cs new file mode 100644 index 000000000..c948aedb8 --- /dev/null +++ b/src/DynamicData.Tests/Cache/AutoRefreshFixture.WithPropertyAccessor.cs @@ -0,0 +1,71 @@ +using System; +using System.Linq; +using System.Linq.Expressions; +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class AutoRefreshFixture +{ + public class WithPropertyAccessor + : Base + { + [Fact(Skip = "Existing defect: propertyAccessor is not null checked, throws NRE on first notification, instead")] + public void PropertyAccessorIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.AutoRefresh( + source: Observable.Never>(), + propertyAccessor: (null as Expression>)!)) + .Should() + .Throw(); + + [Fact] + public void PropertyChangedNotificationDoesNotMatchPropertyAccessor_IgnoresNotification() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + ++item2.OtherValue; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the property change notification should have been ignored"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + protected override IObservable> BuildUut( + IObservable> source, + TimeSpan? changeSetBuffer = null, + TimeSpan? propertyChangeThrottle = null, + IScheduler? scheduler = null) + => source.AutoRefresh( + propertyAccessor: static item => item.Value, + changeSetBuffer: changeSetBuffer, + propertyChangeThrottle: propertyChangeThrottle, + scheduler: scheduler); + } +} diff --git a/src/DynamicData.Tests/Cache/AutoRefreshFixture.WithoutPropertyAccessor.cs b/src/DynamicData.Tests/Cache/AutoRefreshFixture.WithoutPropertyAccessor.cs new file mode 100644 index 000000000..402d175ff --- /dev/null +++ b/src/DynamicData.Tests/Cache/AutoRefreshFixture.WithoutPropertyAccessor.cs @@ -0,0 +1,64 @@ +using System; +using System.Linq; +using System.Reactive.Concurrency; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class AutoRefreshFixture +{ + public class WithoutPropertyAccessor + : Base + { + [Fact] + public void PropertyChangedNotificationDoesNotSpecifyPropertyName_ItemRefreshes() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut(source.Connect()) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + item2.RaiseAllPropertiesChanged(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item published a property change notification"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 published a property change notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + protected override IObservable> BuildUut( + IObservable> source, + TimeSpan? changeSetBuffer = null, + TimeSpan? propertyChangeThrottle = null, + IScheduler? scheduler = null) + => source.AutoRefresh( + changeSetBuffer: changeSetBuffer, + propertyChangeThrottle: propertyChangeThrottle, + scheduler: scheduler); + } +} diff --git a/src/DynamicData.Tests/Cache/AutoRefreshFixture.cs b/src/DynamicData.Tests/Cache/AutoRefreshFixture.cs index b3d36aa41..be5b936a3 100644 --- a/src/DynamicData.Tests/Cache/AutoRefreshFixture.cs +++ b/src/DynamicData.Tests/Cache/AutoRefreshFixture.cs @@ -1,127 +1,66 @@ -using System; -using System.Linq; -using System.Reactive.Linq; - -using DynamicData.Binding; -using DynamicData.Tests.Domain; - -using FluentAssertions; - -using Xunit; +using System.ComponentModel; namespace DynamicData.Tests.Cache; -public class AutoRefreshFixture +public static partial class AutoRefreshFixture { - [Fact] - public void AutoRefresh() + public enum NotificationStrategy { - var items = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, 1)).ToArray(); - - //result should only be true when all items are set to true - using var cache = new SourceCache(m => m.Name); - using var results = cache.Connect().AutoRefresh(p => p.Age).AsAggregator(); - cache.AddOrUpdate(items); - - results.Data.Count.Should().Be(100); - results.Messages.Count.Should().Be(1); - - items[0].Age = 10; - results.Data.Count.Should().Be(100); - results.Messages.Count.Should().Be(2); - - results.Messages[1].First().Reason.Should().Be(ChangeReason.Refresh); - - //remove an item and check no change is fired - var toRemove = items[1]; - cache.Remove(toRemove); - results.Data.Count.Should().Be(99); - results.Messages.Count.Should().Be(3); - toRemove.Age = 100; - results.Messages.Count.Should().Be(3); - - //add it back in and check it updates - cache.AddOrUpdate(toRemove); - results.Messages.Count.Should().Be(4); - toRemove.Age = 101; - results.Messages.Count.Should().Be(5); - - results.Messages.Last().First().Reason.Should().Be(ChangeReason.Refresh); - } - - [Fact] - public void AutoRefreshFromObservable() - { - var items = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, 1)).ToArray(); - - //result should only be true when all items are set to true - using var cache = new SourceCache(m => m.Name); - using var results = cache.Connect().AutoRefreshOnObservable(p => p.WhenAnyPropertyChanged()).AsAggregator(); - cache.AddOrUpdate(items); - - results.Data.Count.Should().Be(100); - results.Messages.Count.Should().Be(1); - - items[0].Age = 10; - results.Data.Count.Should().Be(100); - results.Messages.Count.Should().Be(2); - - results.Messages[1].First().Reason.Should().Be(ChangeReason.Refresh); - - //remove an item and check no change is fired - var toRemove = items[1]; - cache.Remove(toRemove); - results.Data.Count.Should().Be(99); - results.Messages.Count.Should().Be(3); - toRemove.Age = 100; - results.Messages.Count.Should().Be(3); - - //add it back in and check it updates - cache.AddOrUpdate(toRemove); - results.Messages.Count.Should().Be(4); - toRemove.Age = 101; - results.Messages.Count.Should().Be(5); - - results.Messages.Last().First().Reason.Should().Be(ChangeReason.Refresh); + Immediate, + Asynchronous } - [Fact] - public void MakeSelectMagicWorkWithObservable() + public class Item + : INotifyPropertyChanged { - var initialItem = new IntHolder(1, "Initial Description"); - - var sourceList = new SourceList(); - sourceList.Add(initialItem); - - var descriptionStream = sourceList.Connect().AutoRefresh(intHolder => intHolder!.Description).Transform(intHolder => intHolder!.Description, true).Do(x => { }) // <--- Add break point here to check the overload fixes it - .Bind(out var resultCollection); - - using (descriptionStream.Subscribe()) + public static int SelectId(Item item) + => item.Id; + + public required int Id { - var newDescription = "New Description"; - initialItem.Description = newDescription; - - newDescription.Should().Be(resultCollection[0]); - //Assert.AreEqual(newDescription, resultCollection[0]); + get => _id; + init => _id = value; } - } - - public class IntHolder(int value, string description) : AbstractNotifyPropertyChanged - { - public string _description_ = description; - - public int _value = value; - - public string Description + + public bool HasSubscriptions + => PropertyChanged is not null; + + public int OtherValue { - get => _description_; - set => SetAndRaise(ref _description_, value); + get => _otherValue; + set + { + if (_otherValue == value) + return; + + _otherValue = value; + + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(OtherValue))); + } } - + public int Value { get => _value; - set => SetAndRaise(ref _value, value); + set + { + if (_value == value) + return; + + _value = value; + + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value))); + } } + + public event PropertyChangedEventHandler? PropertyChanged; + + public void RaiseAllPropertiesChanged() + => PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(string.Empty)); + + private readonly int _id; + + private int _otherValue; + private int _value; } } diff --git a/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.Base.cs b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.Base.cs new file mode 100644 index 000000000..a09e0ea9c --- /dev/null +++ b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.Base.cs @@ -0,0 +1,777 @@ +using System; +using System.Linq; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using Microsoft.Reactive.Testing; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class AutoRefreshOnObservableFixture +{ + public abstract class Base + { + [Fact] + public void ChangeSetBufferIsGiven_ReevaluatorNotificationsAreBufferedOnScheduler() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + var scheduler = new TestScheduler(); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged, + changeSetBuffer: TimeSpan.FromSeconds(10), + scheduler: scheduler) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish reevaluator notification) + ++item2.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the reevaluator notification should have been buffered"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, within buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("the buffer window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, to buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(10).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "a buffer window expired"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item published a reevaluator notification"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item published a reevaluator notification"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 published a reevaluator notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish reevaluator notification) + ++item1.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the reevaluator notification should have been buffered"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, within buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(15).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the buffer window has not yet ended"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (publish additional reevaluator notification) + ++item3.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("the reevaluator notification should have been buffered"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (advance time, to buffer window) + scheduler.AdvanceTo(TimeSpan.FromSeconds(20).Ticks); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "a buffer window expired"); + results.RecordedChangeSets.Skip(2).First().Count.Should().Be(2, "2 items published a reevaluator notification"); + results.RecordedChangeSets.Skip(2).First().Refreshes.Should().Be(2, "2 items published a reevaluator notification"); + results.RecordedChangeSets.Skip(2).First().Select(change => change.Current).Should().BeEquivalentTo(new[] { item1, item3 }, "items #2 and #3 published reevaluator notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (normal refresh) + source.Refresh(item2); + + // Normal refreshes should not be buffered + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "one source operation was performed"); + results.RecordedChangeSets.Skip(3).First().Count.Should().Be(1, "1 item was refreshed, within the source"); + results.RecordedChangeSets.Skip(3).First().Refreshes.Should().Be(1, "1 item was refreshed, within the source"); + results.RecordedChangeSets.Skip(3).First().First().Current.Should().Be(item2, "item #2 was refreshed, within the source"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemIsAdded_SubscribesToReevaluator() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.AddOrUpdate(new[] { item1, item2, item3 }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + item1.HasObservers.Should().BeTrue("the reevaluator should be invoked and subscribed to, for each added item"); + item2.HasObservers.Should().BeTrue("the reevaluator should be invoked and subscribed to, for each added item"); + item3.HasObservers.Should().BeTrue("the reevaluator should be invoked and subscribed to, for each added item"); + } + + [Fact] + public void ItemIsMoved_NotificationPropagates() + { + // Setup + using var source = new Subject>(); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + var items = new[] { item1, item2, item3 }; + + var initialChangeset = new ChangeSet() + { + new Change(reason: ChangeReason.Add, key: item1.Id, current: item1, index: 0), + new Change(reason: ChangeReason.Add, key: item2.Id, current: item2, index: 1), + new Change(reason: ChangeReason.Add, key: item3.Id, current: item3, index: 2) + }; + + // UUT Initialization + using var subscription = BuildUut( + source: source.Prepend(initialChangeset), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "3 items were added to the source"); + results.RecordedItemsSorted.Should().BeEquivalentTo( + items, + options => options.WithStrictOrdering(), + "item indexes should propagate"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.OnNext(new ChangeSet() + { + new Change( + key: item3.Id, + current: item3, + currentIndex: 0, + previousIndex: 2) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "an item was moved within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + results.RecordedItemsSorted.Should().BeEquivalentTo( + new[] { item3, item1, item2 }, + options => options.WithStrictOrdering(), + "an item was moved within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemIsRefreshed_NotificationPropagates() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + var reevaluatorInvocationCount = 0; + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: item => + { + ++reevaluatorInvocationCount; + return Item.ObserveValueChanged(item); + }) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + reevaluatorInvocationCount.Should().Be(3, "the reevaluator should be invoked and subscribed to, for each added item"); + + // UUT Action + source.Refresh(item2); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item was refreshed within the source"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item was refreshed within the source"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 was refreshed within the source"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items were changed, within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + reevaluatorInvocationCount.Should().Be(3, "the reevaluator should only be invoked for items being added to the collection."); + } + + [Fact] + public void ItemIsRemoved_UnsubscribesFromReevaluator() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Remove(item2); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "1 item was removed from the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + item2.HasObservers.Should().BeFalse("removing an item should trigger unsubscription from its reevaluator"); + item1.HasObservers.Should().BeTrue("the item was not removed from the source"); + item3.HasObservers.Should().BeTrue("the item was not removed from the source"); + } + + [Fact] + public void ItemIsUpdated_ReInvokesReevaluator() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + using var item4 = new Item() { Id = 2 }; + source.AddOrUpdate(item4); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "1 item was replaced within the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + item2.HasObservers.Should().BeFalse("replacing an item should trigger unsubscription from its reevaluator"); + item4.HasObservers.Should().BeTrue("adding an item should invoke its reevaluator and subscribe to it"); + item1.HasObservers.Should().BeTrue("the item was not removed from the source"); + item3.HasObservers.Should().BeTrue("the item was not removed from the source"); + + + // UUT Action (updated item publishes reevaluator notification) + ++item4.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 item published a reevaluation notification"); + results.RecordedChangeSets.Skip(2).First().Count.Should().Be(1, "1 item published a reevaluation notification"); + results.RecordedChangeSets.Skip(2).First().Refreshes.Should().Be(1, "1 item published a reevaluation notification"); + results.RecordedChangeSets.Skip(2).First().First().Current.Should().Be(item4, "item #4 published a reevaluation notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void ReevaluatorCompletesWhenNotOnlyItemInSource_CompletionWaitsForSourceCompletionAndOtherReevaluatorCompletions(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization & Action (initial completion) + if (notificationStrategy is NotificationStrategy.Immediate) + item2.Complete(); + + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + item2.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("not all notification sources have completed"); + + + // UUT Action (remaining reevaluator completions) + item1.Complete(); + item3.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (source completion) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all notification sources have completed"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void ReevaluatorCompletesWhenOnlyItemInSource_CompletionWaitsForSourceCompletion(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item = new Item() { Id = 1 }; + + source.AddOrUpdate(item); + + + // UUT Initialization & Action (reevaluator completion) + if (notificationStrategy is NotificationStrategy.Immediate) + item.Complete(); + + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + item.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "1 item was added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (source completion) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all notification sources have completed"); + } + + [Fact] + public void ReevaluatorEmitsAsynchronously_ItemRefreshes() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + ++item2.Value; + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 item published a reevaluation notification"); + results.RecordedChangeSets.Skip(1).First().Count.Should().Be(1, "1 item published a reevaluation notification"); + results.RecordedChangeSets.Skip(1).First().Refreshes.Should().Be(1, "1 item published a reevaluation notification"); + results.RecordedChangeSets.Skip(1).First().First().Current.Should().Be(item2, "item #2 published a reevaluation notification"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact(Skip = "Existing defect, #1099")] + public void ReevaluatorEmitsImmediately_ItemDoesNotRefresh() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization & Action + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValue) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedChangeSets[0].Refreshes.Should().Be(0, "re-evaluation notifications should be ignored within the initial subscription frame"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory(Skip = "Existing defect. Docs say that ignoring reevaluator exceptions is intentional, but it shouldn't be. Basic RX philosophy is that exceptions should basically always propagate.")] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void ReevaluatorFails_ErrorPropagates(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + var error = new Exception("Test"); + + + // UUT Initialization & Action + if (notificationStrategy is NotificationStrategy.Immediate) + item2.SetError(error); + + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + item2.SetError(error); + + results.Error.Should().Be(error, "upstream errors should propagate downstream"); + if (notificationStrategy is NotificationStrategy.Immediate) + results.RecordedChangeSets.Should().BeEmpty("an error occurred during processing of the initial changeset"); + else + { + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + } + } + + [Fact(Skip = "Existing defect. Docs say that ignoring reevaluator exceptions is intentional, but it shouldn't be. Basic RX philosophy is that exceptions should basically always propagate.")] + public void ReevaluatorThrows_ExceptionPropagates() + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var error = new Exception("Test"); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: _ => throw error) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no initial changesets were published"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + using var item = new Item() { Id = 1 }; + source.AddOrUpdate(item); + + results.Error.Should().Be(error, "upstream errors should propagate downstream"); + results.RecordedChangeSets.Should().BeEmpty("an error occurred during processing of the initial changeset"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void SourceCompletesWhenEmpty_CompletionPropagates(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization & Action + if (notificationStrategy is NotificationStrategy.Immediate) + source.Complete(); + + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all notification sources have completed"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void SourceCompletesWhenNotEmpty_CompletionWaitsForReevaluatorCompletions(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + + // UUT Initialization & Action (source completion) + if (notificationStrategy is NotificationStrategy.Immediate) + source.Complete(); + + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("not all notification sources have completed"); + + + // UUT Action (initial reevaluator completion) + item2.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("not all notification sources have completed"); + + + // UUT Action (remaining reevaluator completions) + item1.Complete(); + item3.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all notification sources have completed"); + } + + [Theory] + [InlineData(NotificationStrategy.Immediate)] + [InlineData(NotificationStrategy.Asynchronous)] + public void SourceFails_ErrorPropagates(NotificationStrategy notificationStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + source.AddOrUpdate(new[] { item1, item2, item3 }); + + var error = new Exception("Test"); + + + // UUT Initialization & Action + if (notificationStrategy is NotificationStrategy.Immediate) + source.SetError(error); + + using var subscription = BuildUut( + source: source.Connect(), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (notificationStrategy is NotificationStrategy.Asynchronous) + source.SetError(error); + + results.Error.Should().Be(error, "upstream errors should propagate downstream"); + if (notificationStrategy is NotificationStrategy.Immediate) + results.RecordedChangeSets.Should().BeEmpty("an error occurred before the initial changeset"); + else + { + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added to the source"); + } + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => BuildUut( + source: null!, + reevaluator: Item.ObserveValueChanged)) + .Should() + .Throw(); + + [Fact] + public void SubscriptionIsDisposed_SubscriptionDisposalPropagates() + { + // Setup + using var source = new Subject>(); + + using var item1 = new Item() { Id = 1 }; + using var item2 = new Item() { Id = 2 }; + using var item3 = new Item() { Id = 3 }; + + var initialChangeset = new ChangeSet() + { + new Change(reason: ChangeReason.Add, key: item1.Id, current: item1), + new Change(reason: ChangeReason.Add, key: item2.Id, current: item2), + new Change(reason: ChangeReason.Add, key: item3.Id, current: item3) + }; + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Prepend(initialChangeset), + reevaluator: Item.ObserveValueChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + subscription.Dispose(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + source.HasObservers.Should().BeFalse("subscription disposal should propagate"); + item1.HasObservers.Should().BeFalse("subscription disposal should propagate"); + item2.HasObservers.Should().BeFalse("subscription disposal should propagate"); + item3.HasObservers.Should().BeFalse("subscription disposal should propagate"); + } + + protected abstract IObservable> BuildUut( + IObservable> source, + Func> reevaluator, + TimeSpan? changeSetBuffer = null, + IScheduler? scheduler = null); + } +} diff --git a/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithKey.cs b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithKey.cs new file mode 100644 index 000000000..4fa7ec9d7 --- /dev/null +++ b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithKey.cs @@ -0,0 +1,34 @@ +using System; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +using FluentAssertions; +using Xunit; + +namespace DynamicData.Tests.Cache; + +public static partial class AutoRefreshOnObservableFixture +{ + public class WithKey + : Base + { + [Fact] + public void ReevaluatorIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.AutoRefreshOnObservable( + source: Observable.Never>(), + reevaluator: (null as Func>)!)) + .Should() + .Throw(); + + protected override IObservable> BuildUut( + IObservable> source, + Func> reevaluator, + TimeSpan? changeSetBuffer = null, + IScheduler? scheduler = null) + => source.AutoRefreshOnObservable( + reevaluator: (item, _) => reevaluator.Invoke(item), + changeSetBuffer: changeSetBuffer, + scheduler: scheduler); + } +} diff --git a/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithoutKey.cs b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithoutKey.cs new file mode 100644 index 000000000..3e96d6e96 --- /dev/null +++ b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithoutKey.cs @@ -0,0 +1,34 @@ +using System; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +using FluentAssertions; +using Xunit; + +namespace DynamicData.Tests.Cache; + +public static partial class AutoRefreshOnObservableFixture +{ + public class WithoutKey + : Base + { + [Fact(Skip = "Existing defect: reevaluator is not null checked, throws NRW on first notification, instead")] + public void ReevaluatorIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.AutoRefreshOnObservable( + source: Observable.Never>(), + reevaluator: (null as Func>)!)) + .Should() + .Throw(); + + protected override IObservable> BuildUut( + IObservable> source, + Func> reevaluator, + TimeSpan? changeSetBuffer = null, + IScheduler? scheduler = null) + => source.AutoRefreshOnObservable( + reevaluator: reevaluator, + changeSetBuffer: changeSetBuffer, + scheduler: scheduler); + } +} diff --git a/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.cs b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.cs new file mode 100644 index 000000000..4e6f974a2 --- /dev/null +++ b/src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.cs @@ -0,0 +1,79 @@ +using System; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; + +namespace DynamicData.Tests.Cache; + +public static partial class AutoRefreshOnObservableFixture +{ + public enum NotificationStrategy + { + Immediate, + Asynchronous + } + + public sealed class Item + : IDisposable + { + public static IObservable ObserveValue(Item item) + => Observable.Create(observer => + { + observer.OnNext(item._value); + return item._valueChanged.SubscribeSafe(observer); + }); + + public static IObservable ObserveValueChanged(Item item) + => item._valueChanged.Select(static _ => Unit.Default); + + public static int SelectId(Item item) + => item.Id; + + public Item() + => _valueChanged = new(); + + public required int Id + { + get => _id; + init => _id = value; + } + + public bool HasObservers + => _valueChanged.HasObservers; + + public int Value + { + get => _value; + set + { + if (_value == value) + return; + + _value = value; + _valueChanged.OnNext(value); + } + } + + public void Complete() + => _valueChanged.OnCompleted(); + + public void Dispose() + { + if (Interlocked.Exchange(ref _hasDisposed, true)) + return; + + _valueChanged.OnCompleted(); + _valueChanged.Dispose(); + } + + public void SetError(Exception error) + => _valueChanged.OnError(error); + + private readonly int _id; + private readonly Subject _valueChanged; + + private bool _hasDisposed; + private int _value; + } +}