From 12ac9af92c72bcb87e472772d59559fd3a4fadfb Mon Sep 17 00:00:00 2001 From: JakenVeina Date: Sat, 21 Feb 2026 17:01:30 -0600 Subject: [PATCH] Re-wrote and optimized the static version of the list .Filter() operator, persuant to #1014, and #1060. This operator now properly supports Refresh changes, and preserves ordering of items, for downstream consumers. --- ...lter_List_Static_RandomizedBoundedEdits.cs | 215 ++++++ ...er_List_Static_RandomizedUnboundedEdits.cs | 212 ++++++ .../List/AutoRefreshFixture.cs | 4 +- .../List/FilterFixture.Base.cs | 413 ++++++++++++ .../List/FilterFixture.Static.cs | 638 ++++++++++++++++++ src/DynamicData.Tests/List/FilterFixture.cs | 247 +------ .../Utilities/ListChangeSetAssertions.cs | 19 + .../List/Internal/Filter.Dynamic.cs | 305 +++++++++ .../List/Internal/Filter.Static.cs | 268 ++++++++ src/DynamicData/List/Internal/Filter.cs | 302 --------- src/DynamicData/List/ObservableListEx.cs | 32 +- 11 files changed, 2107 insertions(+), 548 deletions(-) create mode 100644 src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedBoundedEdits.cs create mode 100644 src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedUnboundedEdits.cs create mode 100644 src/DynamicData.Tests/List/FilterFixture.Base.cs create mode 100644 src/DynamicData.Tests/List/FilterFixture.Static.cs create mode 100644 src/DynamicData.Tests/Utilities/ListChangeSetAssertions.cs create mode 100644 src/DynamicData/List/Internal/Filter.Dynamic.cs create mode 100644 src/DynamicData/List/Internal/Filter.Static.cs delete mode 100644 src/DynamicData/List/Internal/Filter.cs diff --git a/src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedBoundedEdits.cs b/src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedBoundedEdits.cs new file mode 100644 index 00000000..6e4ab39e --- /dev/null +++ b/src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedBoundedEdits.cs @@ -0,0 +1,215 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Reactive.Subjects; +using System.Reflection; +using BenchmarkDotNet.Attributes; + +using Bogus; + +namespace DynamicData.Benchmarks.List; + +[MemoryDiagnoser] +[MarkdownExporterAttribute.GitHub] +public class Filter_List_Static_RandomizedBoundedEdits +{ + public Filter_List_Static_RandomizedBoundedEdits() + { + var randomizer = new Randomizer(0x1234567); + + _changeSetsByInitialItemCount = GetType() + .GetProperty(nameof(InitialItemCount))! + .GetCustomAttribute()! + .Values + .Cast() + .ToDictionary( + keySelector: static initialItemCount => initialItemCount, + elementSelector: initialItemCount => GenerateChangeSets( + initialItemCount: initialItemCount, + editCount: 5_000, + maxChangeCount: 20, + maxRangeSize: 10, + randomizer: randomizer)); + } + + [Params( + 0, + 10, + 100, + 1_000, + 10_000)] + public int InitialItemCount { get; set; } + + [Benchmark(Baseline = true)] + public void CurrentImplementation() + { + using var source = new Subject>(); + + using var subscription = source + .Filter(Item.FilterByIsIncluded) + .Subscribe(); + + Run(source); + } + + private static ImmutableArray> GenerateChangeSets( + int initialItemCount, + int editCount, + int maxChangeCount, + int maxRangeSize, + Randomizer randomizer) + { + var changeReasons = new[] + { + ListChangeReason.Add, + ListChangeReason.AddRange, + ListChangeReason.Moved, + ListChangeReason.Refresh, + ListChangeReason.Remove, + ListChangeReason.RemoveRange, + ListChangeReason.Replace + }; + + // Weights are chosen to make the cache size likely to stay the same, over time. + // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). + var changeReasonWeightsWhenCountIs0 = new[] + { + 0.5f, // Add + 0.5f, // AddRange + 0.0f, // Moved + 0.0f, // Refresh + 0.0f, // Remove + 0.0f, // RemoveRange + 0.0f // Replace + }; + + var changeReasonWeightsWhenCountIs1 = new[] + { + 0.25f, // Add + 0.25f, // AddRange + 0.00f, // Moved + 0.00f, // Refresh + 0.50f, // Remove + 0.00f, // RemoveRange + 0.00f // Replace + }; + + var changeReasonWeightsOtherwise = new[] + { + 0.15f, // Add + 0.15f, // AddRange + 0.15f, // Moved + 0.10f, // Refresh + 0.15f, // Remove + 0.15f, // RemoveRange + 0.15f // Replace + }; + + var nextItemId = 1; + + var changeSets = ImmutableArray.CreateBuilder>(initialCapacity: editCount); + + var items = new ChangeAwareList(); + + items.AddRange(Enumerable.Repeat(0, initialItemCount) + .Select(_ => new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }) + .ToArray()); + changeSets.Add(items.CaptureChanges()); + + while (changeSets.Count < changeSets.Capacity) + { + var changeCount = randomizer.Int(1, maxChangeCount); + for (var i = 0; i < changeCount; ++i) + { + var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch + { + 0 => changeReasonWeightsWhenCountIs0, + 1 => changeReasonWeightsWhenCountIs1, + _ => changeReasonWeightsOtherwise + }); + + switch (changeReason) + { + case ListChangeReason.Add: + items.Add(new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }); + break; + + case ListChangeReason.AddRange: + items.AddRange(Enumerable.Repeat(0, randomizer.Int(1, maxRangeSize)) + .Select(_ => new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + })); + break; + + case ListChangeReason.Moved: + items.Move( + original: randomizer.Int(0, items.Count - 1), + destination: randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Refresh: + items.RefreshAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Remove: + items.RemoveAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.RemoveRange: + { + var rangeStartIndex = randomizer.Int(0, items.Count - 1); + + items.RemoveRange( + index: rangeStartIndex, + count: Math.Min(items.Count - rangeStartIndex, randomizer.Int(1, maxRangeSize))); + } + break; + + case ListChangeReason.Replace: + items[randomizer.Int(0, items.Count - 1)] = new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }; + break; + } + } + + changeSets.Add(items.CaptureChanges()); + } + + return changeSets.MoveToImmutable(); + } + + private void Run(IObserver> source) + { + foreach (var changeSet in _changeSetsByInitialItemCount[InitialItemCount]) + source.OnNext(changeSet); + } + + private readonly Dictionary>> _changeSetsByInitialItemCount; + + public class Item + { + public static bool FilterByIsIncluded(Item item) + => item.IsIncluded; + + public required int Id { get; init; } + + public bool IsIncluded { get; set; } + + public override string ToString() + => $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}"; + } +} diff --git a/src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedUnboundedEdits.cs b/src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedUnboundedEdits.cs new file mode 100644 index 00000000..e9065ef1 --- /dev/null +++ b/src/DynamicData.Benchmarks/List/Filter_List_Static_RandomizedUnboundedEdits.cs @@ -0,0 +1,212 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Reactive.Subjects; +using System.Reflection; +using BenchmarkDotNet.Attributes; + +using Bogus; + +namespace DynamicData.Benchmarks.List; + +[MemoryDiagnoser] +[MarkdownExporterAttribute.GitHub] +public class Filter_List_Static_RandomizedUnboundedEdits +{ + public Filter_List_Static_RandomizedUnboundedEdits() + { + var randomizer = new Randomizer(0x1234567); + + _changeSetsByEditCount = GetType() + .GetProperty(nameof(EditCount))! + .GetCustomAttribute()! + .Values + .Cast() + .ToDictionary( + keySelector: static editCount => editCount, + elementSelector: editCount => GenerateChangeSets( + editCount: editCount, + maxChangeCount: 20, + maxRangeSize: 10, + randomizer: randomizer)); + } + + [Params( + 100, + 1_000, + 10_000, + 100_000)] + public int EditCount { get; set; } + + [Benchmark(Baseline = true)] + public void CurrentImplementation() + { + using var source = new Subject>(); + + using var subscription = source + .Filter(Item.FilterByIsIncluded) + .Subscribe(); + + Run(source); + } + + private static ImmutableArray> GenerateChangeSets( + int editCount, + int maxChangeCount, + int maxRangeSize, + Randomizer randomizer) + { + var changeReasons = new[] + { + ListChangeReason.Add, + ListChangeReason.AddRange, + ListChangeReason.Clear, + ListChangeReason.Moved, + ListChangeReason.Refresh, + ListChangeReason.Remove, + ListChangeReason.RemoveRange, + ListChangeReason.Replace + }; + + // Weights are chosen to make the cache size likely to grow over time, + // exerting more pressure on the system the longer the benchmark runs. + // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). + var changeReasonWeightsWhenCountIs0 = new[] + { + 0.5f, // Add + 0.5f, // AddRange + 0.0f, // Clear + 0.0f, // Moved + 0.0f, // Refresh + 0.0f, // Remove + 0.0f, // RemoveRange + 0.0f // Replace + }; + + var changeReasonWeightsWhenCountIs1 = new[] + { + 0.400f, // Add + 0.400f, // AddRange + 0.001f, // Clear + 0.000f, // Moved + 0.000f, // Refresh + 0.199f, // Remove + 0.000f, // RemoveRange + 0.000f // Replace + }; + + var changeReasonWeightsOtherwise = new[] + { + 0.250f, // Add + 0.250f, // AddRange + 0.001f, // Clear + 0.100f, // Moved + 0.099f, // Refresh + 0.100f, // Remove + 0.100f, // RemoveRange + 0.200f // Replace + }; + + var nextItemId = 1; + + var changeSets = ImmutableArray.CreateBuilder>(initialCapacity: editCount); + + var items = new ChangeAwareList(); + + while (changeSets.Count < changeSets.Capacity) + { + var changeCount = randomizer.Int(1, maxChangeCount); + for (var i = 0; i < changeCount; ++i) + { + var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch + { + 0 => changeReasonWeightsWhenCountIs0, + 1 => changeReasonWeightsWhenCountIs1, + _ => changeReasonWeightsOtherwise + }); + + switch (changeReason) + { + case ListChangeReason.Add: + items.Add(new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }); + break; + + case ListChangeReason.AddRange: + items.AddRange(Enumerable.Repeat(0, randomizer.Int(1, maxRangeSize)) + .Select(_ => new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + })); + break; + + case ListChangeReason.Clear: + items.Clear(); + break; + + case ListChangeReason.Moved: + items.Move( + original: randomizer.Int(0, items.Count - 1), + destination: randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Refresh: + items.RefreshAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.Remove: + items.RemoveAt(randomizer.Int(0, items.Count - 1)); + break; + + case ListChangeReason.RemoveRange: + { + var rangeStartIndex = randomizer.Int(0, items.Count - 1); + + items.RemoveRange( + index: rangeStartIndex, + count: Math.Min(items.Count - rangeStartIndex, randomizer.Int(1, maxRangeSize))); + } + break; + + case ListChangeReason.Replace: + items[randomizer.Int(0, items.Count - 1)] = new Item() + { + Id = nextItemId++, + IsIncluded = randomizer.Bool() + }; + break; + } + } + + changeSets.Add(items.CaptureChanges()); + } + + return changeSets.MoveToImmutable(); + } + + private void Run(IObserver> source) + { + foreach (var changeSet in _changeSetsByEditCount[EditCount]) + source.OnNext(changeSet); + } + + private readonly Dictionary>> _changeSetsByEditCount; + + public class Item + { + public static bool FilterByIsIncluded(Item item) + => item.IsIncluded; + + public required int Id { get; init; } + + public bool IsIncluded { get; init; } + + public override string ToString() + => $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}"; + } +} diff --git a/src/DynamicData.Tests/List/AutoRefreshFixture.cs b/src/DynamicData.Tests/List/AutoRefreshFixture.cs index 2f860216..e6e88aa7 100644 --- a/src/DynamicData.Tests/List/AutoRefreshFixture.cs +++ b/src/DynamicData.Tests/List/AutoRefreshFixture.cs @@ -126,7 +126,7 @@ public void AutoRefreshFilter() items[60].Age = 160; results.Data.Count.Should().Be(51); results.Messages.Count.Should().Be(5); - results.Messages.Last().First().Reason.Should().Be(ListChangeReason.Replace); + results.Messages.Last().First().Reason.Should().Be(ListChangeReason.Refresh); //remove an item and check no change is fired var toRemove = items[65]; @@ -142,7 +142,7 @@ public void AutoRefreshFilter() toRemove.Age = 101; results.Messages.Count.Should().Be(8); - results.Messages.Last().First().Reason.Should().Be(ListChangeReason.Replace); + results.Messages.Last().First().Reason.Should().Be(ListChangeReason.Refresh); } [Fact] diff --git a/src/DynamicData.Tests/List/FilterFixture.Base.cs b/src/DynamicData.Tests/List/FilterFixture.Base.cs new file mode 100644 index 00000000..3282e1fe --- /dev/null +++ b/src/DynamicData.Tests/List/FilterFixture.Base.cs @@ -0,0 +1,413 @@ +using System; +using System.Linq; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.List; + +public static partial class FilterFixture +{ + public abstract class Base + { + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ExcludedItemsAreRemoved_NoChangesAreMade(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.RemoveMany(source.Items.Where(static item => !item.IsIncluded).ToArray()); + + results.Error.Should().BeNull(); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + { + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "only excluded items were manipulated", + config: options => options.WithStrictOrdering()); + } + else + { + results.RecordedChangeSets.Skip(1).Should().BeEmpty("empty changesets should be suppressed"); + } + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ItemsAreAdded_MatchingItemsPropagate(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceList(); + + + // UUT Intialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEmpty("no items have been added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ItemsAreMoved_MatchingMovementsPropagate(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items, + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action: Moves for matching items, + source.Edit(items => + { + items.Move(2, 0); + items.Move(1, 5); + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source opreation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been moved, accordingly", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action: Moves for excluded items + source.Edit(items => + { + items.Move(4, 1); + items.Move(3, 5); + }); + + results.Error.Should().BeNull(); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + { + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source opreation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "no matching items were moved", + config: options => options.WithStrictOrdering()); + } + else + { + results.RecordedChangeSets.Skip(2).Should().BeEmpty("empty changesets should be suppressed"); + } + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ItemsAreRefreshed_ItemsAreReFilteredOrRefreshed(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (add items) + foreach (var item in source.Items) + item.IsIncluded = true; + + source.Refresh(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedChangeSets.ElementAt(1).ShouldHaveRefreshed(source.Items.Take(3), "all unchanged items should have been refreshed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-matching items should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (remove items) + foreach (var item in source.Items.Take(3)) + item.IsIncluded = false; + + source.Refresh(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedChangeSets.ElementAt(2).ShouldHaveRefreshed(source.Items.Skip(3), "all unchanged items should have been refreshed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + public void ItemsAreReplaced_ItemsAreReFiltered(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Intialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have propagated", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (add and replace items) + source.Edit(items => + { + source.ReplaceAt(0, new Item() { Id = 1, IsIncluded = true }); + source.ReplaceAt(1, new Item() { Id = 2, IsIncluded = true }); + source.ReplaceAt(2, new Item() { Id = 3, IsIncluded = true }); + source.ReplaceAt(3, new Item() { Id = 4, IsIncluded = true }); + source.ReplaceAt(4, new Item() { Id = 5, IsIncluded = true }); + source.ReplaceAt(5, new Item() { Id = 6, IsIncluded = true }); + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all newly-matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (remove and replace items) + source.Edit(items => + { + source.ReplaceAt(0, new Item() { Id = 1, IsIncluded = false }); + source.ReplaceAt(1, new Item() { Id = 2, IsIncluded = false }); + source.ReplaceAt(2, new Item() { Id = 3, IsIncluded = false }); + source.ReplaceAt(3, new Item() { Id = 4, IsIncluded = true }); + source.ReplaceAt(4, new Item() { Id = 5, IsIncluded = true }); + source.ReplaceAt(5, new Item() { Id = 6, IsIncluded = true }); + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all newly-excluded items should have been removed", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void MatchingItemsAreRemoved_RemovalsPropagate(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have propagated", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.RemoveMany(source.Items.Where(Item.FilterByIsIncluded).ToArray()); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEmpty("all matching items were removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(SourceType.Asynchronous)] + [InlineData(SourceType.Immediate)] + public void SourceFails_ErrorPropagates(SourceType sourceType) + { + using var source = new TestSourceList(); + + var error = new Exception("Test"); + + if (sourceType is SourceType.Immediate) + source.SetError(error); + + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: true) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + if (sourceType is SourceType.Asynchronous) + source.SetError(error); + + results.Error.Should().Be(error); + if (sourceType is SourceType.Asynchronous) + { + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEmpty("no source items were added"); + } + else + { + results.RecordedChangeSets.Should().BeEmpty("an error occurred during initialization"); + } + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => BuildUut( + source: null!, + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: true)) + .Should() + .Throw(); + + protected abstract IObservable> BuildUut( + IObservable> source, + Func predicate, + bool suppressEmptyChangeSets); + } +} diff --git a/src/DynamicData.Tests/List/FilterFixture.Static.cs b/src/DynamicData.Tests/List/FilterFixture.Static.cs new file mode 100644 index 00000000..927deae5 --- /dev/null +++ b/src/DynamicData.Tests/List/FilterFixture.Static.cs @@ -0,0 +1,638 @@ +using System; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.List; + +public static partial class FilterFixture +{ + public class Static + { + [Fact] + public void DuplicateItemsAreAdded_ItemsAreTrackedSeparately() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + 1, + 2, + 3, + 4, + 3, + 2 + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(static item => (item % 2) is 0) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "there were initial items to publish"); + results.RecordedItems.Should().BeEquivalentTo(new[] { 2, 4, 2 }, + because: "all matching items should have been added", + config: options => options.WithoutStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Remove(2); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().ContainSingle("an operation was performed upon an included item"); + results.RecordedItems.Should().BeEquivalentTo(new[] { 4, 2 }, + because: "only one of the duplicate items was removed", + config: options => options.WithoutStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Remove(3); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("an operation was performed upon an excluded item"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Remove(2); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().ContainSingle("an operation was performed upon an included item"); + results.RecordedItems.Should().BeEquivalentTo(new[] { 4 }, + because: "the second duplicate item was removed", + config: options => options.WithoutStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ExcludedItemIsAdded_NoChangesAreMade() + { + // Setup + using var source = new TestSourceList(); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("there were no initial items to publish"); + results.RecordedItems.Should().BeEmpty("no items have been added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Add(new Item() { Id = 1, IsIncluded = false }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("empty changesets should be suppressed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ExcludedItemIsRemoved_NoChangesAreMade() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.RemoveAt(5); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("empty changesets should be suppressed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ExcludedItemsAreRemoved_NoChangesAreMade() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.RemoveMany(source.Items.Where(static item => !item.IsIncluded).ToArray()); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("empty changesets should be suppressed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemsAreAdded_MatchingItemsPropagate() + { + // Setup + using var source = new TestSourceList(); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("there were no initial items to publish"); + results.RecordedItems.Should().BeEmpty("no items have been added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemsAreMoved_MatchingMovementsPropagate() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action: Moves for matching items, + source.Edit(items => + { + items.Move(2, 0); + items.Move(1, 5); + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been moved, accordingly", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action: Moves for excluded items + source.Edit(items => + { + items.Move(4, 1); + items.Move(3, 5); + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("empty changesets should be suppressed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemsAreRefreshed_ItemsAreReFilteredOrRefreshed() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (add items) + foreach (var item in source.Items) + item.IsIncluded = true; + + source.Refresh(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedChangeSets.ElementAt(1).ShouldHaveRefreshed(source.Items.Take(3), "all unchanged items should have been refreshed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-matching items should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (remove items) + foreach (var item in source.Items.Take(3)) + item.IsIncluded = false; + + source.Refresh(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedChangeSets.ElementAt(2).ShouldHaveRefreshed(source.Items.Skip(3), "all unchanged items should have been refreshed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void ItemsAreReplaced_ItemsAreReFiltered() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have propagated", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (add and replace items) + source.Edit(items => + { + items[0] = new Item() { Id = 1, IsIncluded = true }; + items[1] = new Item() { Id = 2, IsIncluded = true }; + items[2] = new Item() { Id = 3, IsIncluded = true }; + items[3] = new Item() { Id = 4, IsIncluded = true }; + items[4] = new Item() { Id = 5, IsIncluded = true }; + items[5] = new Item() { Id = 6, IsIncluded = true }; + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all newly-matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (remove and replace items) + source.Edit(items => + { + items[0] = new Item() { Id = 1, IsIncluded = false }; + items[1] = new Item() { Id = 2, IsIncluded = false }; + items[2] = new Item() { Id = 3, IsIncluded = false }; + items[3] = new Item() { Id = 4, IsIncluded = true }; + items[4] = new Item() { Id = 5, IsIncluded = true }; + items[5] = new Item() { Id = 6, IsIncluded = true }; + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all newly-excluded items should have been removed", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void MatchingItemIsAdded_ItemPropagates() + { + // Setup + using var source = new TestSourceList(); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("there were no initial items to publish"); + results.RecordedItems.Should().BeEmpty("no items have been added to the source"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Add(new Item() { Id = 1, IsIncluded = true }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items, "the matching item should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void MatchingItemIsRemoved_RemovalPropagates() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have been added", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + var removedItem = source.Items[2]; + source.RemoveAt(2); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().ContainSingle("1 source operation was performed"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "a matching item was removed", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void MatchingItemsAreRemoved_RemovalsPropagate() + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial changeset should propagate"); + results.RecordedItems.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), + because: "all matching items should have propagated", + config: options => options.WithStrictOrdering()); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.RemoveMany(source.Items.Where(Item.FilterByIsIncluded).ToArray()); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItems.Should().BeEmpty("all matching items were removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void PredicateIsNull_ThrowsException() + => FluentActions.Invoking(static () => ObservableListEx.Filter( + source: Observable.Empty>(), + predicate: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(SourceType.Asynchronous)] + [InlineData(SourceType.Immediate)] + public void SourceCompletes_CompletionPropagates(SourceType sourceType) + { + // Setup + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true } + }); + + if (sourceType is SourceType.Immediate) + source.Complete(); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + if (sourceType is SourceType.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "the initial item should have been published"); + results.RecordedItems.Should().BeEquivalentTo(source.Items, "the initial item should have been published"); + results.HasCompleted.Should().BeTrue("the source has completed"); + } + + [Theory] + [InlineData(SourceType.Asynchronous)] + [InlineData(SourceType.Immediate)] + public void SourceFails_ErrorPropagates(SourceType sourceType) + { + using var source = new TestSourceList(); + + source.AddRange(new[] + { + new Item() { Id = 1, IsIncluded = true } + }); + + var error = new Exception("Test"); + + if (sourceType is SourceType.Immediate) + source.SetError(error); + + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + if (sourceType is SourceType.Asynchronous) + source.SetError(error); + + results.Error.Should().Be(error); + if (sourceType is SourceType.Asynchronous) + { + results.RecordedChangeSets.Count.Should().Be(1, "the initial item should have been published"); + results.RecordedItems.Should().BeEquivalentTo(source.Items, "the initial item should have been published"); + } + else + { + results.RecordedChangeSets.Should().BeEmpty("an error occurred during initialization"); + } + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(static () => ObservableListEx.Filter( + source: null!, + predicate: Item.FilterByIsIncluded)) + .Should() + .Throw(); + + [Fact] + public void SubscriptionIsDisposed_SubscriptionDisposalPropagates() + { + // Setup + using var source = new Subject>(); + + + // UUT Initialization + using var subscription = source + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no initial changeset occurred"); + results.RecordedItems.Should().BeEmpty("the source has not initialized"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + subscription.Dispose(); + + source.HasObservers.Should().BeFalse("subscription disposal should propagate upstream"); + } + } +} diff --git a/src/DynamicData.Tests/List/FilterFixture.cs b/src/DynamicData.Tests/List/FilterFixture.cs index 17ce6dc0..e5fe3b77 100644 --- a/src/DynamicData.Tests/List/FilterFixture.cs +++ b/src/DynamicData.Tests/List/FilterFixture.cs @@ -1,243 +1,34 @@ -using System; -using System.Linq; +namespace DynamicData.Tests.List; -using DynamicData.Tests.Domain; - -using FluentAssertions; - -using Xunit; - -namespace DynamicData.Tests.List; - -public partial class FilterFixture : IDisposable +public static partial class FilterFixture { - private readonly ChangeSetAggregator _results; - - private readonly ISourceList _source; - - public FilterFixture() - { - _source = new SourceList(); - _results = _source.Connect(p => p.Age > 20).AsAggregator(); - } - - [Fact] - public void AddMatched() - { - var person = new Person("Adult1", 50); - _source.Add(person); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Data.Count.Should().Be(1, "Should be 1 item in the cache"); - _results.Data.Items[0].Should().Be(person, "Should be same person"); - } - - [Fact] - public void AddNotMatched() - { - var person = new Person("Adult1", 10); - _source.Add(person); - - _results.Messages.Count.Should().Be(0, "Should have no item updates"); - _results.Data.Count.Should().Be(0, "Cache should have no items"); - } - - [Fact] - public void AddNotMatchedAndUpdateMatched() - { - const string key = "Adult1"; - var notmatched = new Person(key, 19); - var matched = new Person(key, 21); - - _source.Edit( - list => - { - list.Add(notmatched); - list.Add(matched); - }); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].First().Range.First().Should().Be(matched, "Should be same person"); - _results.Data.Items[0].Should().Be(matched, "Should be same person"); - } - - [Fact] - public void AddRange() - { - var itemstoadd = Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToList(); - - _source.AddRange(itemstoadd); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].First().Reason.Should().Be(ListChangeReason.AddRange, "Should be 1 updates"); - _results.Data.Count.Should().Be(80, "Should be 50 item in the cache"); - } - - [Fact] - public void AddSubscribeRemove() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - var source = new SourceList(); - source.AddRange(people); - - var results = source.Connect(x => x.Age > 20).AsAggregator(); - source.RemoveMany(people.Where(x => x.Age % 2 == 0)); - - results.Data.Count.Should().Be(40, "Should be 40 cached"); - } - - [Fact] - public void AttemptedRemovalOfANonExistentKeyWillBeIgnored() - { - _source.Remove(new Person("anyone", 1)); - _results.Messages.Count.Should().Be(0, "Should be 0 updates"); - } - - [Fact] - public void BatchOfUniqueUpdates() - { - var people = Enumerable.Range(1, 100).Select(i => new Person("Name" + i, i)).ToArray(); - - _source.AddRange(people); - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should return 80 adds"); - - var filtered = people.Where(p => p.Age > 20).OrderBy(p => p.Age).ToArray(); - _results.Data.Items.OrderBy(p => p.Age).Should().BeEquivalentTo(filtered, "Incorrect Filter result"); - } - - [Fact] - public void BatchRemoves() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - _source.AddRange(people); - _source.Clear(); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(80, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } - - [Fact] - public void BatchSuccessiveUpdates() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - foreach (var person in people) - { - var person1 = person; - _source.Add(person1); - } - - _results.Messages.Count.Should().Be(80, "Should be 80 updates"); - _results.Data.Count.Should().Be(80, "Should be 100 in the cache"); - var filtered = people.Where(p => p.Age > 20).OrderBy(p => p.Age).ToArray(); - _results.Data.Items.OrderBy(p => p.Age).Should().BeEquivalentTo(filtered, "Incorrect Filter result"); - } - - [Fact] - public void Clear() - { - var itemstoadd = Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToList(); - - _source.AddRange(itemstoadd); - _source.Clear(); - - _results.Messages.Count.Should().Be(2, "Should be 1 updates"); - _results.Messages[0].First().Reason.Should().Be(ListChangeReason.AddRange, "First reason should be add range"); - _results.Messages[1].First().Reason.Should().Be(ListChangeReason.Clear, "Second reason should be clear"); - _results.Data.Count.Should().Be(0, "Should be 50 item in the cache"); - } - - [Fact] - public void Clear1() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - _source.AddRange(people); - _source.Clear(); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(80, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } - - public void Dispose() - { - _source.Dispose(); - _results.Dispose(); - } - - [Fact] - public void Remove() - { - const string key = "Adult1"; - var person = new Person(key, 50); - - _source.Add(person); - _source.Remove(person); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(1, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } - - [Fact] - public void ReplaceWithMatch() + public enum SourceType { - var itemstoadd = Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToList(); - _source.AddRange(itemstoadd); - - _source.ReplaceAt(0, new Person("Adult1", 50)); - - _results.Data.Count.Should().Be(81); + Immediate, + Asynchronous } - [Fact] - public void ReplaceWithNonMatch() + public enum EmptyChangesetPolicy { - var itemstoadd = Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToList(); - _source.AddRange(itemstoadd); - - _source.ReplaceAt(50, new Person("Adult1", 1)); - - _results.Data.Count.Should().Be(79); + SuppressEmptyChangesets, + IncludeEmptyChangesets } - [Fact] - public void SameKeyChanges() + public record Item { - const string key = "Adult1"; - - var toaddandremove = new Person(key, 53); - _source.Edit( - updater => - { - updater.Add(new Person(key, 50)); - updater.Add(new Person(key, 52)); - updater.Add(toaddandremove); - updater.Remove(toaddandremove); - }); + public static bool FilterByEvenId(Item item) + => (item.Id % 2) == 0; - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].Adds.Should().Be(3, "Should be 3 adds"); - _results.Messages[0].Replaced.Should().Be(0, "Should be 0 updates"); - _results.Messages[0].Removes.Should().Be(1, "Should be 1 remove"); - } + public static bool FilterByIsIncluded(Item item) + => item.IsIncluded; - [Fact] - public void UpdateNotMatched() - { - const string key = "Adult1"; - var newperson = new Person(key, 10); - var updated = new Person(key, 11); + public static bool FilterByIdInclusionMask( + int idInclusionMask, + Item item) + => ((item.Id & idInclusionMask) == 0) && item.IsIncluded; - _source.Add(newperson); - _source.Add(updated); + public required int Id { get; init; } - _results.Messages.Count.Should().Be(0, "Should be no updates"); - _results.Data.Count.Should().Be(0, "Should nothing cached"); + public bool IsIncluded { get; set; } } } diff --git a/src/DynamicData.Tests/Utilities/ListChangeSetAssertions.cs b/src/DynamicData.Tests/Utilities/ListChangeSetAssertions.cs new file mode 100644 index 00000000..d0b336e1 --- /dev/null +++ b/src/DynamicData.Tests/Utilities/ListChangeSetAssertions.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; +using System.Linq; + +using FluentAssertions; + +namespace DynamicData.Tests.Utilities; + +public static class ListChangeSetAssertions +{ + public static void ShouldHaveRefreshed( + this IChangeSet changeSet, + IEnumerable expectedItems, + string because = "") + where T : notnull + => changeSet + .Where(static change => change.Reason is ListChangeReason.Refresh) + .Select(static change => change.Item.Current) + .Should().BeEquivalentTo(expectedItems, because); +} diff --git a/src/DynamicData/List/Internal/Filter.Dynamic.cs b/src/DynamicData/List/Internal/Filter.Dynamic.cs new file mode 100644 index 00000000..8d9b5e8d --- /dev/null +++ b/src/DynamicData/List/Internal/Filter.Dynamic.cs @@ -0,0 +1,305 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Linq; + +namespace DynamicData.List.Internal; + +internal static partial class Filter +{ + internal sealed class Dynamic + where T : notnull + { + private readonly ListFilterPolicy _policy; + + private readonly Func? _predicate; + + private readonly IObservable>? _predicates; + + private readonly IObservable> _source; + + public Dynamic(IObservable> source, IObservable> predicates, ListFilterPolicy policy = ListFilterPolicy.CalculateDiff) + { + _policy = policy; + _source = source ?? throw new ArgumentNullException(nameof(source)); + _predicates = predicates ?? throw new ArgumentNullException(nameof(predicates)); + } + + public Dynamic(IObservable> source, Func predicate, ListFilterPolicy policy = ListFilterPolicy.CalculateDiff) + { + _policy = policy; + _source = source ?? throw new ArgumentNullException(nameof(source)); + _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + } + + public IObservable> Run() => Observable.Create>( + observer => + { + var locker = InternalEx.NewLock(); + + Func predicate = _ => false; + var all = new List(); + var filtered = new ChangeAwareList(); + var immutableFilter = _predicate is not null; + + IObservable> predicateChanged; + + if (immutableFilter) + { + predicateChanged = Observable.Never>(); + predicate = _predicate ?? predicate; + } + else + { + if (_predicates is null) + { + throw new InvalidOperationException("The predicates is not set and the change is not a immutableFilter."); + } + + predicateChanged = _predicates.Synchronize(locker).Select( + newPredicate => + { + predicate = newPredicate; + return Requery(predicate, all, filtered); + }); + } + + /* + * Apply the transform operator so 'IsMatch' state can be evaluated and captured one time only + * This is to eliminate the need to re-apply the predicate when determining whether an item was previously matched, + * which is essential when we have mutable state + */ + + // Need to get item by index and store it in the transform + var filteredResult = _source.Synchronize(locker).Transform( + (t, previous) => + { + var wasMatch = previous.ConvertOr(p => p!.IsMatch, () => false); + return new ItemWithMatch(t, predicate(t), wasMatch); + }, + true) + .Select(changes => + { + // keep track of all changes if filtering on an observable + if (!immutableFilter) + { + all.Clone(changes); + } + + return Process(filtered, changes); + }); + + return predicateChanged.Merge(filteredResult).NotEmpty() + .Select(changes => changes.Transform(iwm => iwm.Item)) // use convert, not transform + .SubscribeSafe(observer); + }); + + private static IChangeSet Process(ChangeAwareList filtered, IChangeSet changes) + { + // Maintain all items as well as filtered list. This enables us to a) re-query when the predicate changes b) check the previous state when Refresh is called + foreach (var item in changes) + { + switch (item.Reason) + { + case ListChangeReason.Add: + { + var change = item.Item; + if (change.Current.IsMatch) + { + filtered.Add(change.Current); + } + + break; + } + + case ListChangeReason.AddRange: + { + var matches = item.Range.Where(t => t.IsMatch).ToList(); + filtered.AddRange(matches); + break; + } + + case ListChangeReason.Replace: + { + var change = item.Item; + var match = change.Current.IsMatch; + var wasMatch = item.Item.Current.WasMatch; + if (match) + { + if (wasMatch) + { + // an update, so get the latest index and pass the index up the chain + var previous = filtered.Select(x => x.Item).IndexOfOptional(change.Previous.Value.Item).ValueOrThrow(() => new InvalidOperationException($"Cannot find index of {typeof(T).Name} -> {change.Previous.Value}. Expected to be in the list")); + + // replace inline + filtered[previous.Index] = change.Current; + } + else + { + filtered.Add(change.Current); + } + } + else if (wasMatch) + { + filtered.Remove(change.Previous.Value); + } + + break; + } + + case ListChangeReason.Refresh: + { + var change = item.Item; + var match = change.Current.IsMatch; + var wasMatch = item.Item.Current.WasMatch; + if (match) + { + if (wasMatch) + { + // an update, so get the latest index and pass the index up the chain + var previous = filtered.Select(x => x.Item).IndexOfOptional(change.Current.Item).ValueOrThrow(() => new InvalidOperationException($"Cannot find index of {typeof(T).Name} -> {change.Previous.Value}. Expected to be in the list")); + + filtered.RefreshAt(previous.Index); + } + else + { + filtered.Add(change.Current); + } + } + else if (wasMatch) + { + filtered.Remove(change.Current); + } + + break; + } + + case ListChangeReason.Remove: + { + filtered.Remove(item.Item.Current); + break; + } + + case ListChangeReason.RemoveRange: + { + filtered.RemoveMany(item.Range); + break; + } + + case ListChangeReason.Clear: + { + filtered.ClearOrRemoveMany(item); + break; + } + } + } + + return filtered.CaptureChanges(); + } + + private IChangeSet Requery(Func predicate, List all, ChangeAwareList filtered) + { + if (all.Count == 0) + { + return ChangeSet.Empty; + } + + if (_policy == ListFilterPolicy.ClearAndReplace) + { + var itemsWithMatch = all.ConvertAll(iwm => new ItemWithMatch(iwm.Item, predicate(iwm.Item), iwm.IsMatch)); + + // mark items as matched? + filtered.Clear(); + filtered.AddRange(itemsWithMatch.Where(iwm => iwm.IsMatch)); + + // reset state for all items + all.Clear(); + all.AddRange(itemsWithMatch); + return filtered.CaptureChanges(); + } + + var toAdd = new List(all.Count); + var toRemove = new List(all.Count); + + for (var i = 0; i < all.Count; i++) + { + var original = all[i]; + + var newItem = new ItemWithMatch(original.Item, predicate(original.Item), original.IsMatch); + + var current = all[i]; + current.IsMatch = newItem.IsMatch; + current.WasMatch = newItem.WasMatch; + + if (newItem.IsMatch && !newItem.WasMatch) + { + toAdd.Add(newItem); + } + else if (!newItem.IsMatch && newItem.WasMatch) + { + toRemove.Add(newItem); + } + } + + filtered.RemoveMany(toRemove); + filtered.AddRange(toAdd); + + return filtered.CaptureChanges(); + } + + private sealed class ItemWithMatch(T item, bool isMatch, bool wasMatch = false) : IEquatable + { + public T Item { get; } = item; + + public bool IsMatch { get; set; } = isMatch; + + public bool WasMatch { get; set; } = wasMatch; + + public static bool operator ==(ItemWithMatch? left, ItemWithMatch? right) => + Equals(left, right); + + public static bool operator !=(ItemWithMatch? left, ItemWithMatch? right) => + !Equals(left, right); + + public bool Equals(ItemWithMatch? other) + { + if (other is null) + { + return false; + } + + if (ReferenceEquals(this, other)) + { + return true; + } + + return EqualityComparer.Default.Equals(Item, other.Item); + } + + public override bool Equals(object? obj) + { + if (obj is null) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((ItemWithMatch)obj); + } + + public override int GetHashCode() => EqualityComparer.Default.GetHashCode(Item!); + + public override string ToString() => $"{Item}, (was {IsMatch} is {WasMatch}"; + } + } +} diff --git a/src/DynamicData/List/Internal/Filter.Static.cs b/src/DynamicData/List/Internal/Filter.Static.cs new file mode 100644 index 00000000..36a49c5e --- /dev/null +++ b/src/DynamicData/List/Internal/Filter.Static.cs @@ -0,0 +1,268 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Linq; + +namespace DynamicData.List.Internal; + +internal static partial class Filter +{ + public static class Static + where T : notnull + { + public static IObservable> Create( + IObservable> source, + Func predicate, + bool suppressEmptyChangesets) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + predicate.ThrowArgumentNullExceptionIfNull(nameof(predicate)); + + return Observable.Create>(downstreamObserver => + { + var upstreamItemsStates = new List<(T item, bool isIncluded)>(); + var itemStatesBuffer = new List<(T item, bool isIncluded)>(); + + var downstreamItems = new ChangeAwareList(); + var itemsBuffer = new List(); + + var downstream = source.Select(upstreamChanges => + { + foreach (var change in upstreamChanges) + { + switch (change.Reason) + { + case ListChangeReason.Add: + { + var isIncluded = predicate.Invoke(change.Item.Current); + + if (change.Item.CurrentIndex < 0) + { + upstreamItemsStates.Add(( + item: change.Item.Current, + isIncluded: isIncluded)); + + if (isIncluded) + downstreamItems.Add(change.Item.Current); + } + else + { + upstreamItemsStates.Insert( + index: change.Item.CurrentIndex, + item: ( + item: change.Item.Current, + isIncluded: isIncluded)); + + if (isIncluded) + { + downstreamItems.Insert( + index: change.Item.CurrentIndex - CountExcludedItemsBefore(change.Item.CurrentIndex), + item: change.Item.Current); + } + } + } + break; + + case ListChangeReason.AddRange: + upstreamItemsStates.EnsureCapacity(upstreamItemsStates.Count + change.Range.Count); + + if ((downstreamItems.Capacity - downstreamItems.Count) < change.Range.Count) + downstreamItems.Capacity = downstreamItems.Count + change.Range.Count; + + if ((change.Range.Index < 0) || (change.Range.Index == upstreamItemsStates.Count)) + { + foreach (var item in change.Range) + { + var isIncluded = predicate.Invoke(item); + upstreamItemsStates.Add(( + item: item, + isIncluded: isIncluded)); + + if (isIncluded) + downstreamItems.Add(item); + } + } + else + { + upstreamItemsStates.EnsureCapacity(change.Range.Count); + itemsBuffer.EnsureCapacity(change.Range.Count); + + foreach (var item in change.Range) + { + var isIncluded = predicate.Invoke(item); + itemStatesBuffer.Add(( + item: item, + isIncluded: isIncluded)); + + if (isIncluded) + itemsBuffer.Add(item); + } + + upstreamItemsStates.InsertRange( + index: change.Range.Index, + collection: itemStatesBuffer); + itemStatesBuffer.Clear(); + + downstreamItems.InsertRange( + collection: itemsBuffer.ToList(), // .InsertRange() does not perform a defensive copy, so we need to. + index: change.Range.Index - CountExcludedItemsBefore(change.Range.Index)); + itemsBuffer.Clear(); + } + break; + + case ListChangeReason.Clear: + upstreamItemsStates.Clear(); + downstreamItems.Clear(); + break; + + case ListChangeReason.Moved: + { + var itemState = upstreamItemsStates[change.Item.PreviousIndex]; + + var downstreamPreviousIndex = change.Item.PreviousIndex - CountExcludedItemsBefore(change.Item.PreviousIndex); + + upstreamItemsStates.RemoveAt(change.Item.PreviousIndex); + upstreamItemsStates.Insert( + index: change.Item.CurrentIndex, + item: itemState); + + if (itemState.isIncluded) + { + var downstreamCurrentIndex = change.Item.CurrentIndex - CountExcludedItemsBefore(change.Item.CurrentIndex); + + if (downstreamPreviousIndex != downstreamCurrentIndex) + { + downstreamItems.Move( + original: downstreamPreviousIndex, + destination: downstreamCurrentIndex); + } + } + } + break; + + case ListChangeReason.Refresh: + { + var isIncluded = predicate.Invoke(change.Item.Current); + + var itemState = upstreamItemsStates[change.Item.CurrentIndex]; + upstreamItemsStates[change.Item.CurrentIndex] = itemState with + { + isIncluded = isIncluded + }; + + var downstreamIndex = (isIncluded || itemState.isIncluded) + ? change.Item.CurrentIndex - CountExcludedItemsBefore(change.Item.CurrentIndex) + : -1; + + switch (itemState.isIncluded, isIncluded) + { + case (true, true): + downstreamItems.Refresh( + item: change.Item.Current, + index: downstreamIndex); + break; + + case (false, true): + downstreamItems.Insert( + index: downstreamIndex, + item: change.Item.Current); + break; + + case (true, false): + downstreamItems.RemoveAt(downstreamIndex); + break; + } + } + break; + + case ListChangeReason.Remove: + if (upstreamItemsStates[change.Item.CurrentIndex].isIncluded) + downstreamItems.RemoveAt(change.Item.CurrentIndex - CountExcludedItemsBefore(change.Item.CurrentIndex)); + + upstreamItemsStates.RemoveAt(change.Item.CurrentIndex); + break; + + case ListChangeReason.RemoveRange: + { + var downstreamIndex = change.Range.Index - CountExcludedItemsBefore(change.Range.Index); + + var downstreamCount = 0; + var rangeEnd = change.Range.Index + change.Range.Count; + for (var i = change.Range.Index; i < rangeEnd; ++i) + { + if (upstreamItemsStates[i].isIncluded) + ++downstreamCount; + } + + if (downstreamCount is not 0) + { + downstreamItems.RemoveRange( + index: downstreamIndex, + count: downstreamCount); + } + + upstreamItemsStates.RemoveRange( + index: change.Range.Index, + count: change.Range.Count); + } + break; + + case ListChangeReason.Replace: + { + var isIncluded = predicate.Invoke(change.Item.Current); + + var itemState = upstreamItemsStates[change.Item.CurrentIndex]; + upstreamItemsStates[change.Item.CurrentIndex] = ( + item: change.Item.Current, + isIncluded: isIncluded); + + var downstreamIndex = (isIncluded || itemState.isIncluded) + ? change.Item.CurrentIndex - CountExcludedItemsBefore(change.Item.CurrentIndex) + : -1; + + switch (itemState.isIncluded, isIncluded) + { + case (true, true): + downstreamItems[downstreamIndex] = change.Item.Current; + break; + + case (true, false): + downstreamItems.RemoveAt(downstreamIndex); + break; + + case (false, true): + downstreamItems.Insert( + index: downstreamIndex, + item: change.Item.Current); + break; + } + } + break; + } + } + + return downstreamItems.CaptureChanges(); + }); + + if (suppressEmptyChangesets) + downstream = downstream.Where(changes => changes.Count is not 0); + + return downstream.SubscribeSafe(downstreamObserver); + + // This is how we implement order preservation, downstream: each time we do an indexed operation, we + // count how many excluded items there are before that index, and offset the index by that amount. + int CountExcludedItemsBefore(int index) + { + var result = 0; + for (var i = 0; i < index; ++i) + { + if (!upstreamItemsStates[i].isIncluded) + ++result; + } + return result; + } + }); + } + } +} diff --git a/src/DynamicData/List/Internal/Filter.cs b/src/DynamicData/List/Internal/Filter.cs deleted file mode 100644 index f1cbd56d..00000000 --- a/src/DynamicData/List/Internal/Filter.cs +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. -// Roland Pheasant licenses this file to you under the MIT license. -// See the LICENSE file in the project root for full license information. - -using System.Reactive.Linq; - -namespace DynamicData.List.Internal; - -internal sealed class Filter - where T : notnull -{ - private readonly ListFilterPolicy _policy; - - private readonly Func? _predicate; - - private readonly IObservable>? _predicates; - - private readonly IObservable> _source; - - public Filter(IObservable> source, IObservable> predicates, ListFilterPolicy policy = ListFilterPolicy.CalculateDiff) - { - _policy = policy; - _source = source ?? throw new ArgumentNullException(nameof(source)); - _predicates = predicates ?? throw new ArgumentNullException(nameof(predicates)); - } - - public Filter(IObservable> source, Func predicate, ListFilterPolicy policy = ListFilterPolicy.CalculateDiff) - { - _policy = policy; - _source = source ?? throw new ArgumentNullException(nameof(source)); - _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); - } - - public IObservable> Run() => Observable.Create>( - observer => - { - var locker = InternalEx.NewLock(); - - Func predicate = _ => false; - var all = new List(); - var filtered = new ChangeAwareList(); - var immutableFilter = _predicate is not null; - - IObservable> predicateChanged; - - if (immutableFilter) - { - predicateChanged = Observable.Never>(); - predicate = _predicate ?? predicate; - } - else - { - if (_predicates is null) - { - throw new InvalidOperationException("The predicates is not set and the change is not a immutableFilter."); - } - - predicateChanged = _predicates.Synchronize(locker).Select( - newPredicate => - { - predicate = newPredicate; - return Requery(predicate, all, filtered); - }); - } - - /* - * Apply the transform operator so 'IsMatch' state can be evaluated and captured one time only - * This is to eliminate the need to re-apply the predicate when determining whether an item was previously matched, - * which is essential when we have mutable state - */ - - // Need to get item by index and store it in the transform - var filteredResult = _source.Synchronize(locker).Transform( - (t, previous) => - { - var wasMatch = previous.ConvertOr(p => p!.IsMatch, () => false); - return new ItemWithMatch(t, predicate(t), wasMatch); - }, - true) - .Select(changes => - { - // keep track of all changes if filtering on an observable - if (!immutableFilter) - { - all.Clone(changes); - } - - return Process(filtered, changes); - }); - - return predicateChanged.Merge(filteredResult).NotEmpty() - .Select(changes => changes.Transform(iwm => iwm.Item)) // use convert, not transform - .SubscribeSafe(observer); - }); - - private static IChangeSet Process(ChangeAwareList filtered, IChangeSet changes) - { - // Maintain all items as well as filtered list. This enables us to a) re-query when the predicate changes b) check the previous state when Refresh is called - foreach (var item in changes) - { - switch (item.Reason) - { - case ListChangeReason.Add: - { - var change = item.Item; - if (change.Current.IsMatch) - { - filtered.Add(change.Current); - } - - break; - } - - case ListChangeReason.AddRange: - { - var matches = item.Range.Where(t => t.IsMatch).ToList(); - filtered.AddRange(matches); - break; - } - - case ListChangeReason.Replace: - { - var change = item.Item; - var match = change.Current.IsMatch; - var wasMatch = item.Item.Current.WasMatch; - if (match) - { - if (wasMatch) - { - // an update, so get the latest index and pass the index up the chain - var previous = filtered.Select(x => x.Item).IndexOfOptional(change.Previous.Value.Item).ValueOrThrow(() => new InvalidOperationException($"Cannot find index of {typeof(T).Name} -> {change.Previous.Value}. Expected to be in the list")); - - // replace inline - filtered[previous.Index] = change.Current; - } - else - { - filtered.Add(change.Current); - } - } - else if (wasMatch) - { - filtered.Remove(change.Previous.Value); - } - - break; - } - - case ListChangeReason.Refresh: - { - var change = item.Item; - var match = change.Current.IsMatch; - var wasMatch = item.Item.Current.WasMatch; - if (match) - { - if (wasMatch) - { - // an update, so get the latest index and pass the index up the chain - var previous = filtered.Select(x => x.Item).IndexOfOptional(change.Current.Item).ValueOrThrow(() => new InvalidOperationException($"Cannot find index of {typeof(T).Name} -> {change.Previous.Value}. Expected to be in the list")); - - filtered.RefreshAt(previous.Index); - } - else - { - filtered.Add(change.Current); - } - } - else if (wasMatch) - { - filtered.Remove(change.Current); - } - - break; - } - - case ListChangeReason.Remove: - { - filtered.Remove(item.Item.Current); - break; - } - - case ListChangeReason.RemoveRange: - { - filtered.RemoveMany(item.Range); - break; - } - - case ListChangeReason.Clear: - { - filtered.ClearOrRemoveMany(item); - break; - } - } - } - - return filtered.CaptureChanges(); - } - - private IChangeSet Requery(Func predicate, List all, ChangeAwareList filtered) - { - if (all.Count == 0) - { - return ChangeSet.Empty; - } - - if (_policy == ListFilterPolicy.ClearAndReplace) - { - var itemsWithMatch = all.ConvertAll(iwm => new ItemWithMatch(iwm.Item, predicate(iwm.Item), iwm.IsMatch)); - - // mark items as matched? - filtered.Clear(); - filtered.AddRange(itemsWithMatch.Where(iwm => iwm.IsMatch)); - - // reset state for all items - all.Clear(); - all.AddRange(itemsWithMatch); - return filtered.CaptureChanges(); - } - - var toAdd = new List(all.Count); - var toRemove = new List(all.Count); - - for (var i = 0; i < all.Count; i++) - { - var original = all[i]; - - var newItem = new ItemWithMatch(original.Item, predicate(original.Item), original.IsMatch); - - var current = all[i]; - current.IsMatch = newItem.IsMatch; - current.WasMatch = newItem.WasMatch; - - if (newItem.IsMatch && !newItem.WasMatch) - { - toAdd.Add(newItem); - } - else if (!newItem.IsMatch && newItem.WasMatch) - { - toRemove.Add(newItem); - } - } - - filtered.RemoveMany(toRemove); - filtered.AddRange(toAdd); - - return filtered.CaptureChanges(); - } - - private sealed class ItemWithMatch(T item, bool isMatch, bool wasMatch = false) : IEquatable - { - public T Item { get; } = item; - - public bool IsMatch { get; set; } = isMatch; - - public bool WasMatch { get; set; } = wasMatch; - - public static bool operator ==(ItemWithMatch? left, ItemWithMatch? right) => - Equals(left, right); - - public static bool operator !=(ItemWithMatch? left, ItemWithMatch? right) => - !Equals(left, right); - - public bool Equals(ItemWithMatch? other) - { - if (other is null) - { - return false; - } - - if (ReferenceEquals(this, other)) - { - return true; - } - - return EqualityComparer.Default.Equals(Item, other.Item); - } - - public override bool Equals(object? obj) - { - if (obj is null) - { - return false; - } - - if (ReferenceEquals(this, obj)) - { - return true; - } - - if (obj.GetType() != GetType()) - { - return false; - } - - return Equals((ItemWithMatch)obj); - } - - public override int GetHashCode() => EqualityComparer.Default.GetHashCode(Item!); - - public override string ToString() => $"{Item}, (was {IsMatch} is {WasMatch}"; - } -} diff --git a/src/DynamicData/List/ObservableListEx.cs b/src/DynamicData/List/ObservableListEx.cs index dc86ef69..4566652f 100644 --- a/src/DynamicData/List/ObservableListEx.cs +++ b/src/DynamicData/List/ObservableListEx.cs @@ -672,22 +672,22 @@ public static IObservable> ExpireAfter( scheduler: scheduler); /// - /// Filters the source using the specified valueSelector. + /// Filters items, statically, in a list stream, based on a given predicate. /// - /// The type of the item. - /// The source. - /// The valueSelector. - /// An observable which emits the change set. - /// source. - public static IObservable> Filter(this IObservable> source, Func predicate) - where T : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - - predicate.ThrowArgumentNullExceptionIfNull(nameof(predicate)); - - return new Filter(source, predicate).Run(); - } + /// The type of items in the list. + /// The list stream whose items are to be filtered. + /// A static predicate to be used to determine which items should be included or excluded by the filter. + /// A list stream, containing only the items matched by . + /// Throws for and . + /// Note that, unlike some other overloads of this operator, ordering of items is preserved. + public static IObservable> Filter( + this IObservable> source, + Func predicate) + where T : notnull + => List.Internal.Filter.Static.Create( + source: source, + predicate: predicate, + suppressEmptyChangesets: true); /// /// Filters source using the specified filter observable predicate. @@ -707,7 +707,7 @@ public static IObservable> Filter(this IObservable(source, predicate, filterPolicy).Run(); + return new List.Internal.Filter.Dynamic(source, predicate, filterPolicy).Run(); } ///