From 6bc94ae532d70fb35d96330b87083e61a97af208 Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Tue, 26 May 2026 22:57:32 -0700 Subject: [PATCH 1/2] Make MergeManyChangeSetsCacheSourceCompare stress test deterministic MultiThreadedStressTest(10, 50) fails intermittently in CI with two prices present in market.PricesCache.Items but missing from the live aggregator. The two affected prices have the latest timestamps in the batch, which is the signature of a race during high-contention production. Bogus.Randomizer wraps System.Random. When constructed with a seed, the randomizer stores the random in a protected localSeed field and bypasses its internal Locker on every generator call. The test shares one seeded Randomizer across many parallel producer threads: - Directly via _randomizer.Number / .Bool / .TimeSpan / .Interval - Indirectly via _marketFaker.WithSeed(_randomizer), since every Faker.Generate call routes through the same randomizer Concurrent calls into the underlying System.Random corrupt its internal state, producing values inconsistent with what a serialized run would produce. That is sufficient to explain the observed asymmetry between the post-hoc PricesCache snapshot and the live aggregator stream. Introduce SynchronizedRandomizer, a Randomizer subclass that replaces the protected localSeed field with a LockedRandom (a Random subclass that serializes every virtual method on an internal lock). The seed and method contracts are unchanged; the wrapper only adds synchronization. Apply it to the failing fixture. Other Randomizer uses across the test project remain unchanged for now; they are either single-threaded or have not exhibited flake symptoms. Verified: 20 consecutive runs of the fixture pass at MaxParallelThreads=16, zero failures. --- ...ManyChangeSetsCacheSourceCompareFixture.cs | 2 +- .../Utilities/SynchronizedRandomizer.cs | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index ce6cc89fd..806ed3d54 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -45,7 +45,7 @@ public sealed class MergeManyChangeSetsCacheSourceCompareFixture : IDisposable public MergeManyChangeSetsCacheSourceCompareFixture() { - _randomizer = new(0x10012022); + _randomizer = new SynchronizedRandomizer(0x10012022); _marketFaker = Fakers.Market.RuleFor(m => m.Rating, faker => faker.Random.Double(0, 5)).WithSeed(_randomizer); _marketCacheResults = _marketCache.Connect().AsAggregator(); } diff --git a/src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs b/src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs new file mode 100644 index 000000000..641215c83 --- /dev/null +++ b/src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs @@ -0,0 +1,57 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Reflection; + +using Bogus; + +namespace DynamicData.Tests.Utilities; + +/// +/// A whose underlying source is safe for +/// concurrent access. Multi-threaded stress tests share a single seeded randomizer across +/// many producer threads (directly and via Faker<T>.WithSeed); the default +/// skips its internal locker when a localSeed is supplied, so a +/// shared instance races internal state and produces non-deterministic +/// failures. Replacing the protected localSeed field with a +/// serializes every generator call without changing the deterministic seed. +/// +internal sealed class SynchronizedRandomizer : Randomizer +{ + private static readonly FieldInfo LocalSeedField = + typeof(Randomizer).GetField("localSeed", BindingFlags.Instance | BindingFlags.NonPublic) + ?? throw new InvalidOperationException("Bogus.Randomizer.localSeed field not found; library shape changed."); + + public SynchronizedRandomizer(int seed) + : base(seed) => + LocalSeedField.SetValue(this, new LockedRandom(seed)); + + private sealed class LockedRandom(int seed) : Random(seed) + { + private readonly object _gate = new(); + + public override int Next() { lock (_gate) { return base.Next(); } } + + public override int Next(int maxValue) { lock (_gate) { return base.Next(maxValue); } } + + public override int Next(int minValue, int maxValue) { lock (_gate) { return base.Next(minValue, maxValue); } } + + public override double NextDouble() { lock (_gate) { return base.NextDouble(); } } + + public override void NextBytes(byte[] buffer) { lock (_gate) { base.NextBytes(buffer); } } + + public override void NextBytes(Span buffer) { lock (_gate) { base.NextBytes(buffer); } } + + public override long NextInt64() { lock (_gate) { return base.NextInt64(); } } + + public override long NextInt64(long maxValue) { lock (_gate) { return base.NextInt64(maxValue); } } + + public override long NextInt64(long minValue, long maxValue) { lock (_gate) { return base.NextInt64(minValue, maxValue); } } + + public override float NextSingle() { lock (_gate) { return base.NextSingle(); } } + + protected override double Sample() { lock (_gate) { return base.Sample(); } } + } +} From c8c5a2a50a9cec8206bacbec60860a78ddf53eb6 Mon Sep 17 00:00:00 2001 From: Darrin Cullop Date: Wed, 27 May 2026 17:57:19 -0700 Subject: [PATCH 2/2] Wait for quiescence in MergeManyChangeSets stress tests The post-#1079 cache delivery model decouples mutation from notification: AddOrUpdate enqueues a notification and returns; the actual delivery to subscribers runs later on whichever thread wins the drain. That removed the cross-cache deadlock the old Synchronize(lock) shape produced, but it opened a small window between mutation and observed delivery. Tests that compare a live aggregator's view against the cache's current Items at assert time can see disagreement during that window. The source-compare fixture already adopted the right shape: var merged = source.MergeManyChangeSets(...).Publish(); var cacheCompleted = merged.LastOrDefaultAsync().ToTask(); using var local = merged.AsAggregator(); using var connect = merged.Connect(); ... await cacheCompleted; CheckResultContents(..., local); Port the same pattern to the cache and list MergeManyChangeSets stress fixtures. The local aggregator now sits on the Publish chain so it shares the completion task; the await before CheckResultContents pins the quiescence point. Also delete the SynchronizedRandomizer change made earlier on this branch. Bogus.Randomizer takes a process-wide lock on Locker.Value for every generator call regardless of whether localSeed is set, so the wrapper was addressing a non-problem. --- .../Cache/MergeManyChangeSetsCacheFixture.cs | 11 +++- ...ManyChangeSetsCacheSourceCompareFixture.cs | 2 +- .../Cache/MergeManyChangeSetsListFixture.cs | 15 +++-- .../Utilities/SynchronizedRandomizer.cs | 57 ------------------- 4 files changed, 20 insertions(+), 65 deletions(-) delete mode 100644 src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index 37ca6b198..949ec35bf 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -5,6 +5,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Threading.Tasks; using Bogus; using DynamicData.Kernel; @@ -90,10 +91,11 @@ IObservable AddRemovePrices(Market market, int priceCount, int para .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) .Finally(market.PricesCache.Dispose); - var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices); - using var priceResults = merged.AsAggregator(); - + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices).Publish(); var adding = true; + var cacheCompleted = merged.LastOrDefaultAsync().ToTask(); + using var priceResults = merged.AsAggregator(); + using var connect = merged.Connect(); // Start asynchrononously modifying the parent list and the child lists using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) @@ -119,6 +121,9 @@ IObservable AddRemovePrices(Market market, int priceCount, int para } while (adding); + // Wait for the source cache to finish delivering all notifications. + await cacheCompleted; + // Verify the results CheckResultContents(_marketCacheResults, priceResults); } diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index 806ed3d54..ce6cc89fd 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -45,7 +45,7 @@ public sealed class MergeManyChangeSetsCacheSourceCompareFixture : IDisposable public MergeManyChangeSetsCacheSourceCompareFixture() { - _randomizer = new SynchronizedRandomizer(0x10012022); + _randomizer = new(0x10012022); _marketFaker = Fakers.Market.RuleFor(m => m.Rating, faker => faker.Random.Double(0, 5)).WithSeed(_randomizer); _marketCacheResults = _marketCache.Connect().AsAggregator(); } diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs index f7a4aa2a5..49f566907 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs @@ -5,6 +5,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Threading.Tasks; using Bogus; using DynamicData.Kernel; @@ -86,9 +87,11 @@ IObservable AddRemoveAnimals(AnimalOwner owner, int animalCount, int par .Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler)) .Finally(owner.Animals.Dispose); - var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()); - + var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()).Publish(); var addingAnimals = true; + var cacheCompleted = mergeAnimals.LastOrDefaultAsync().ToTask(); + using var animalResults = mergeAnimals.AsAggregator(); + using var connect = mergeAnimals.Connect(); // Start asynchrononously modifying the parent list and the child lists using var addAnimals = AddRemoveAnimalsStress(ownerCount, animalCount, Environment.ProcessorCount, TaskPoolScheduler.Default) @@ -114,8 +117,12 @@ IObservable AddRemoveAnimals(AnimalOwner owner, int animalCount, int par } while (addingAnimals); - // Verify the results - CheckResultContents(); + // Wait for the source cache to finish delivering all notifications. + await cacheCompleted; + + // Verify the results against the aggregator wired into the same Publish chain + // that cacheCompleted observes. + CheckResultContents(_animalOwners.Items, _animalOwnerResults, animalResults); } [Fact] diff --git a/src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs b/src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs deleted file mode 100644 index 641215c83..000000000 --- a/src/DynamicData.Tests/Utilities/SynchronizedRandomizer.cs +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. -// Roland Pheasant licenses this file to you under the MIT license. -// See the LICENSE file in the project root for full license information. - -using System; -using System.Reflection; - -using Bogus; - -namespace DynamicData.Tests.Utilities; - -/// -/// A whose underlying source is safe for -/// concurrent access. Multi-threaded stress tests share a single seeded randomizer across -/// many producer threads (directly and via Faker<T>.WithSeed); the default -/// skips its internal locker when a localSeed is supplied, so a -/// shared instance races internal state and produces non-deterministic -/// failures. Replacing the protected localSeed field with a -/// serializes every generator call without changing the deterministic seed. -/// -internal sealed class SynchronizedRandomizer : Randomizer -{ - private static readonly FieldInfo LocalSeedField = - typeof(Randomizer).GetField("localSeed", BindingFlags.Instance | BindingFlags.NonPublic) - ?? throw new InvalidOperationException("Bogus.Randomizer.localSeed field not found; library shape changed."); - - public SynchronizedRandomizer(int seed) - : base(seed) => - LocalSeedField.SetValue(this, new LockedRandom(seed)); - - private sealed class LockedRandom(int seed) : Random(seed) - { - private readonly object _gate = new(); - - public override int Next() { lock (_gate) { return base.Next(); } } - - public override int Next(int maxValue) { lock (_gate) { return base.Next(maxValue); } } - - public override int Next(int minValue, int maxValue) { lock (_gate) { return base.Next(minValue, maxValue); } } - - public override double NextDouble() { lock (_gate) { return base.NextDouble(); } } - - public override void NextBytes(byte[] buffer) { lock (_gate) { base.NextBytes(buffer); } } - - public override void NextBytes(Span buffer) { lock (_gate) { base.NextBytes(buffer); } } - - public override long NextInt64() { lock (_gate) { return base.NextInt64(); } } - - public override long NextInt64(long maxValue) { lock (_gate) { return base.NextInt64(maxValue); } } - - public override long NextInt64(long minValue, long maxValue) { lock (_gate) { return base.NextInt64(minValue, maxValue); } } - - public override float NextSingle() { lock (_gate) { return base.NextSingle(); } } - - protected override double Sample() { lock (_gate) { return base.Sample(); } } - } -}