diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index 37ca6b19..949ec35b 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -5,6 +5,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Threading.Tasks; using Bogus; using DynamicData.Kernel; @@ -90,10 +91,11 @@ IObservable AddRemovePrices(Market market, int priceCount, int para .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) .Finally(market.PricesCache.Dispose); - var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices); - using var priceResults = merged.AsAggregator(); - + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices).Publish(); var adding = true; + var cacheCompleted = merged.LastOrDefaultAsync().ToTask(); + using var priceResults = merged.AsAggregator(); + using var connect = merged.Connect(); // Start asynchrononously modifying the parent list and the child lists using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) @@ -119,6 +121,9 @@ IObservable AddRemovePrices(Market market, int priceCount, int para } while (adding); + // Wait for the source cache to finish delivering all notifications. + await cacheCompleted; + // Verify the results CheckResultContents(_marketCacheResults, priceResults); } diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs index f7a4aa2a..49f56690 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs @@ -5,6 +5,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Threading.Tasks; using Bogus; using DynamicData.Kernel; @@ -86,9 +87,11 @@ IObservable AddRemoveAnimals(AnimalOwner owner, int animalCount, int par .Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler)) .Finally(owner.Animals.Dispose); - var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()); - + var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()).Publish(); var addingAnimals = true; + var cacheCompleted = mergeAnimals.LastOrDefaultAsync().ToTask(); + using var animalResults = mergeAnimals.AsAggregator(); + using var connect = mergeAnimals.Connect(); // Start asynchrononously modifying the parent list and the child lists using var addAnimals = AddRemoveAnimalsStress(ownerCount, animalCount, Environment.ProcessorCount, TaskPoolScheduler.Default) @@ -114,8 +117,12 @@ IObservable AddRemoveAnimals(AnimalOwner owner, int animalCount, int par } while (addingAnimals); - // Verify the results - CheckResultContents(); + // Wait for the source cache to finish delivering all notifications. + await cacheCompleted; + + // Verify the results against the aggregator wired into the same Publish chain + // that cacheCompleted observes. + CheckResultContents(_animalOwners.Items, _animalOwnerResults, animalResults); } [Fact]