diff --git a/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs b/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs
index 350f9f8..9a09610 100644
--- a/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs
+++ b/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs
@@ -14,6 +14,9 @@ namespace ReactiveUI.Primitives.R3Bridge.Generator;
[Generator(LanguageNames.CSharp)]
public sealed class R3BridgeGenerator : IIncrementalGenerator
{
+ ///
+ /// Attribute source emitted during post initialization so consumers can identify generated bridge output.
+ ///
private const string MarkerSource = """
//
namespace ReactiveUI.Primitives.R3Bridge.Generated;
@@ -27,6 +30,9 @@ internal sealed class PrimitivesR3BridgeGeneratedAttribute : global::System.Attr
}
""";
+ ///
+ /// Bridge extension source emitted when the consumer compilation references R3.
+ ///
private const string BridgeSource = """
//
#nullable enable
@@ -75,10 +81,12 @@ public void Initialize(IncrementalGeneratorInitializationContext context)
context.RegisterSourceOutput(context.CompilationProvider, static (output, compilation) =>
{
- if (compilation.GetTypeByMetadataName("R3.Observable`1") != null)
+ if (compilation.GetTypeByMetadataName("R3.Observable`1") == null)
{
- output.AddSource("R3SignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
+ return;
}
+
+ output.AddSource("R3SignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
});
}
}
diff --git a/src/ReactiveUI.Primitives.SystemReactiveBridge.Generator/SystemReactiveBridgeGenerator.cs b/src/ReactiveUI.Primitives.SystemReactiveBridge.Generator/SystemReactiveBridgeGenerator.cs
index b2d002c..3ac235c 100644
--- a/src/ReactiveUI.Primitives.SystemReactiveBridge.Generator/SystemReactiveBridgeGenerator.cs
+++ b/src/ReactiveUI.Primitives.SystemReactiveBridge.Generator/SystemReactiveBridgeGenerator.cs
@@ -14,6 +14,9 @@ namespace ReactiveUI.Primitives.SystemReactiveBridge.Generator;
[Generator(LanguageNames.CSharp)]
public sealed class SystemReactiveBridgeGenerator : IIncrementalGenerator
{
+ ///
+ /// Attribute source emitted during post initialization so consumers can identify generated bridge output.
+ ///
private const string MarkerSource = """
//
namespace ReactiveUI.Primitives.SystemReactiveBridge.Generated;
@@ -27,6 +30,9 @@ internal sealed class PrimitivesSystemReactiveBridgeGeneratedAttribute : global:
}
""";
+ ///
+ /// Bridge extension source emitted when the consumer compilation references System.Reactive.
+ ///
private const string BridgeSource = """
//
#nullable enable
@@ -75,10 +81,12 @@ public void Initialize(IncrementalGeneratorInitializationContext context)
context.RegisterSourceOutput(context.CompilationProvider, static (output, compilation) =>
{
- if (compilation.GetTypeByMetadataName("System.Reactive.Linq.Observable") != null)
+ if (compilation.GetTypeByMetadataName("System.Reactive.Linq.Observable") == null)
{
- output.AddSource("SystemReactiveSignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
+ return;
}
+
+ output.AddSource("SystemReactiveSignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
});
}
}
diff --git a/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs
index 917efa0..2bb0ef9 100644
--- a/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs
@@ -14,17 +14,32 @@ namespace ReactiveUI.Primitives.Concurrency;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class CurrentThreadSequencer : ISequencer
{
+ ///
+ /// Singleton holder for the current-thread sequencer.
+ ///
private static readonly Lazy StaticInstance = new(() => new CurrentThreadSequencer());
+ ///
+ /// Tracks whether the current thread is running scheduled work.
+ ///
[ThreadStatic]
private static bool _running;
+ ///
+ /// Holds recursive work queued for the current thread.
+ ///
[ThreadStatic]
private static SequencerQueue? _threadLocalQueue;
+ ///
+ /// Measures relative due times for the current thread.
+ ///
[ThreadStatic]
private static Stopwatch? clock;
+ ///
+ /// Initializes a new instance of the class.
+ ///
private CurrentThreadSequencer()
{
}
@@ -45,8 +60,11 @@ private CurrentThreadSequencer()
///
/// Gets the scheduler's notion of current time.
///
- public DateTimeOffset Now => DateTimeOffset.UtcNow;
+ public DateTimeOffset Now => Sequencer.Now;
+ ///
+ /// Gets elapsed time on the current thread.
+ ///
private static TimeSpan Time
{
get
@@ -100,7 +118,7 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, Func TimeSpan.Zero)
{
@@ -116,7 +134,7 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, Func(TState state, TimeSpan dueTime, Func(TState state, DateTimeOffset dueTime, Func
+ /// Gets the queued recursive work for the current thread.
+ ///
+ /// The current thread queue, if one exists.
private static SequencerQueue? GetQueue() => _threadLocalQueue;
+ ///
+ /// Sets the queued recursive work for the current thread.
+ ///
+ /// The queue to assign.
private static void SetQueue(SequencerQueue? newQueue) => _threadLocalQueue = newQueue;
+ ///
+ /// Sets the current-thread running marker.
+ ///
+ /// Value indicating whether work is running.
+ private static void SetRunning(bool running) => _running = running;
+
+ ///
+ /// Runs queued current-thread work.
+ ///
private static class Trampoline
{
+ ///
+ /// Runs all work currently in the queue.
+ ///
+ /// Queue to drain.
public static void Run(SequencerQueue queue)
{
while (queue.Count > 0)
diff --git a/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs
index 483e447..e26e8b1 100644
--- a/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs
@@ -89,7 +89,7 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, Func
+ timer.Tick += (_, _) =>
{
timer?.Stop();
timer = null;
diff --git a/src/ReactiveUI.Primitives/Concurrency/IScheduledItem.cs b/src/ReactiveUI.Primitives/Concurrency/IScheduledItem.cs
index 4e6c862..3a359f8 100644
--- a/src/ReactiveUI.Primitives/Concurrency/IScheduledItem.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/IScheduledItem.cs
@@ -8,7 +8,7 @@ namespace ReactiveUI.Primitives.Concurrency;
/// Represents a work item that has been scheduled.
///
/// Absolute time representation type.
-public interface IScheduledItem
+public interface IScheduledItem
{
///
/// Gets the absolute time at which the item is due for invocation.
diff --git a/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs
index fb00ce3..38435ea 100644
--- a/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs
@@ -11,8 +11,14 @@ namespace ReactiveUI.Primitives.Concurrency;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class ImmediateSequencer : ISequencer
{
+ ///
+ /// Singleton holder for the immediate sequencer.
+ ///
private static readonly Lazy StaticInstance = new(static () => new ImmediateSequencer());
+ ///
+ /// Initializes a new instance of the class.
+ ///
private ImmediateSequencer()
{
}
@@ -25,7 +31,7 @@ private ImmediateSequencer()
///
/// Gets the scheduler's notion of current time.
///
- public DateTimeOffset Now => DateTimeOffset.UtcNow;
+ public DateTimeOffset Now => Sequencer.Now;
///
/// Schedules the specified state.
diff --git a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs
index e89bde5..4499e45 100644
--- a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs
@@ -2,6 +2,7 @@
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
+using System.Runtime.CompilerServices;
using ReactiveUI.Primitives.Disposables;
namespace ReactiveUI.Primitives.Concurrency;
@@ -11,11 +12,22 @@ namespace ReactiveUI.Primitives.Concurrency;
///
/// Absolute time representation type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
-public abstract class ScheduledItem : IScheduledItem, IComparable>, IsDisposed
+public abstract class ScheduledItem : IScheduledItem, IComparable>, IsDisposed, IComparable
where TAbsolute : IComparable
{
+ ///
+ /// Compares scheduled items by due time.
+ ///
private readonly IComparer _comparer;
+
+ ///
+ /// Holds the disposable returned by the scheduled action.
+ ///
private IDisposable? _disposable;
+
+ ///
+ /// Tracks cancellation without taking a lock.
+ ///
private int _isDisposed;
///
@@ -106,7 +118,11 @@ protected ScheduledItem(TAbsolute dueTime, IComparer comparer)
///
/// Work item to compare the current work item to.
/// Relative ordering between this and the specified work item.
- /// The inequality operators are overloaded to provide results consistent with the implementation. Equality operators implement traditional reference equality semantics.
+ ///
+ /// The inequality operators are overloaded to provide results consistent with the
+ /// implementation. Equality operators implement traditional
+ /// reference equality semantics.
+ ///
public int CompareTo(ScheduledItem? other)
{
// MSDN: By definition, any object compares greater than null, and two null references compare equal to each other.
@@ -118,6 +134,26 @@ public int CompareTo(ScheduledItem? other)
return _comparer.Compare(DueTime, other.DueTime);
}
+ ///
+ /// Compares the current instance with another object of the same type and returns an integer that indicates relative ordering.
+ ///
+ /// An object to compare with this instance.
+ /// A value that indicates the relative order of the objects being compared.
+ public int CompareTo(object? obj)
+ {
+ if (obj == null)
+ {
+ return 1;
+ }
+
+ if (obj is ScheduledItem x)
+ {
+ return CompareTo(x);
+ }
+
+ throw new ArgumentException("Object must be a compatible scheduled item.", nameof(obj));
+ }
+
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
@@ -131,14 +167,18 @@ public void Dispose()
/// Determines whether a object is equal to the specified object.
///
/// The object to compare to the current object.
- /// true if the obj parameter is a object and is equal to the current object; otherwise, false.
+ ///
+ /// true if the obj parameter is a
+ /// object and is equal to the current
+ /// object; otherwise, false.
+ ///
public override bool Equals(object? obj) => ReferenceEquals(this, obj);
///
/// Returns the hash code for the current object.
///
/// A 32-bit signed integer hash code.
- public override int GetHashCode() => base.GetHashCode();
+ public override int GetHashCode() => RuntimeHelpers.GetHashCode(this);
///
/// Invokes the work item.
@@ -158,10 +198,12 @@ public void Invoke()
return;
}
- if (IsDisposed)
+ if (!IsDisposed)
{
- disposable.Dispose();
+ return;
}
+
+ disposable.Dispose();
}
///
@@ -170,10 +212,12 @@ public void Invoke()
/// true to release both managed and unmanaged resources; false to release only unmanaged resources.
protected virtual void Dispose(bool disposing)
{
- if (disposing && Interlocked.Exchange(ref _isDisposed, 1) == 0)
+ if (!disposing || Interlocked.Exchange(ref _isDisposed, 1) != 0)
{
- Interlocked.Exchange(ref _disposable, Disposable.Empty)?.Dispose();
+ return;
}
+
+ Interlocked.Exchange(ref _disposable, Disposable.Empty)?.Dispose();
}
///
diff --git a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs
index 2979d17..f8c1633 100644
--- a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs
@@ -12,8 +12,19 @@ namespace ReactiveUI.Primitives.Concurrency;
public sealed class ScheduledItem : ScheduledItem
where TAbsolute : IComparable
{
+ ///
+ /// Sequencer passed to the scheduled action.
+ ///
private readonly ISequencer _scheduler;
+
+ ///
+ /// State passed to the scheduled action.
+ ///
private readonly TValue _state;
+
+ ///
+ /// Action invoked when the scheduled item runs.
+ ///
private readonly Func _action;
///
diff --git a/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs b/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs
index ae97f23..eb58b87 100644
--- a/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs
@@ -102,48 +102,17 @@ public static IDisposable Schedule(this ISequencer scheduler, DateTimeOffset due
/// The disposable object used to cancel the scheduled action (best effort).
public static IDisposable Schedule(this ISequencer scheduler, Action action)
{
- // InvokeRec1
- var group = new MultipleDisposable();
- var gate = new object();
-
-#pragma warning disable IDE0039 // Use local function
- Action? recursiveAction = null;
-#pragma warning restore IDE0039 // Use local function
- recursiveAction = () => action(() =>
+ if (scheduler == null)
{
- var isAdded = false;
- var isDone = false;
- var d = default(IDisposable);
- d = scheduler.Schedule(() =>
- {
- lock (gate)
- {
- if (isAdded)
- {
- group.Remove(d);
- }
- else
- {
- isDone = true;
- }
- }
-
- recursiveAction!();
- });
-
- lock (gate)
- {
- if (!isDone)
- {
- group.Add(d);
- isAdded = true;
- }
- }
- });
+ throw new ArgumentNullException(nameof(scheduler));
+ }
- group.Add(scheduler.Schedule(recursiveAction));
+ if (action == null)
+ {
+ throw new ArgumentNullException(nameof(action));
+ }
- return group;
+ return new RecursiveScheduleState(scheduler, action).Start();
}
///
@@ -187,6 +156,7 @@ public static IDisposable ScheduleAction(this ISequencer scheduler, TSta
///
/// Schedules an action to be executed.
///
+ /// The type of the state.
/// Sequencer to execute the action on.
/// A state object to be passed to .
/// Action to execute.
@@ -360,18 +330,123 @@ internal static IDisposable ScheduleAction(this ISequencer scheduler, TS
//// return scheduler.ScheduleLongRunning(action, static (a, c) => a(c));
////}
+ ///
+ /// Invokes an action and returns an empty disposable.
+ ///
+ /// Action to invoke.
+ /// An empty disposable.
private static IDisposable Invoke(Action action)
{
action();
return Disposable.Empty;
}
+ ///
+ /// Invokes a stateful action and returns an empty disposable.
+ ///
+ /// The type of the state.
+ /// Tuple containing the state and action.
+ /// An empty disposable.
private static IDisposable Invoke((TState state, Action action) tuple)
{
tuple.action(tuple.state);
return Disposable.Empty;
}
+ ///
+ /// Invokes a stateful disposable-returning action.
+ ///
+ /// The type of the state.
+ /// Tuple containing the state and action.
+ /// The disposable returned by the action.
private static IDisposable Invoke((TState state, Func action) tuple) =>
tuple.action(tuple.state);
+
+ ///
+ /// Holds state for recursive action scheduling.
+ ///
+ private sealed class RecursiveScheduleState : MultipleDisposable
+ {
+ ///
+ /// Sequencer used for recursive scheduling.
+ ///
+ private readonly ISequencer _scheduler;
+
+ ///
+ /// Recursive action supplied by the caller.
+ ///
+ private readonly Action _action;
+
+ ///
+ /// Guards handoff between scheduling and execution.
+ ///
+ private readonly object _gate = new();
+
+ ///
+ /// Cached delegate used to avoid recreating the recursive action.
+ ///
+ private readonly Action _recursiveAction;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Sequencer used for recursive scheduling.
+ /// Recursive action supplied by the caller.
+ public RecursiveScheduleState(ISequencer scheduler, Action action)
+ {
+ _scheduler = scheduler;
+ _action = action;
+ _recursiveAction = RunRecursiveAction;
+ }
+
+ ///
+ /// Starts recursive scheduling.
+ ///
+ /// The disposable object used to cancel recursive work.
+ public RecursiveScheduleState Start()
+ {
+ Add(_scheduler.Schedule(_recursiveAction));
+ return this;
+ }
+
+ ///
+ /// Invokes the caller-provided recursive action.
+ ///
+ private void RunRecursiveAction() => _action(Reschedule);
+
+ ///
+ /// Schedules the next recursive action invocation.
+ ///
+ private void Reschedule()
+ {
+ var isAdded = false;
+ var isDone = false;
+ IDisposable? disposable = null;
+ disposable = _scheduler.Schedule(() =>
+ {
+ lock (_gate)
+ {
+ if (isAdded)
+ {
+ Remove(disposable!);
+ }
+ else
+ {
+ isDone = true;
+ }
+ }
+
+ RunRecursiveAction();
+ });
+
+ lock (_gate)
+ {
+ if (!isDone)
+ {
+ Add(disposable);
+ isAdded = true;
+ }
+ }
+ }
+ }
}
diff --git a/src/ReactiveUI.Primitives/Concurrency/Sequencer.cs b/src/ReactiveUI.Primitives/Concurrency/Sequencer.cs
index c608ed5..484eca1 100644
--- a/src/ReactiveUI.Primitives/Concurrency/Sequencer.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/Sequencer.cs
@@ -24,7 +24,16 @@ public static partial class Sequencer
///
public static ISequencer Default => TaskPoolSequencer.Default;
- internal static DateTimeOffset Now => DateTime.UtcNow;
+ ///
+ /// Gets the shared wall-clock time used by real-time sequencers.
+ ///
+#if NET8_0_OR_GREATER
+ internal static DateTimeOffset Now => TimeProvider.System.GetUtcNow();
+#else
+#pragma warning disable S6354 // TimeProvider is not available on supported .NET Framework target frameworks.
+ internal static DateTimeOffset Now => DateTimeOffset.UtcNow;
+#pragma warning restore S6354
+#endif
///
/// Normalizes the specified value to a positive value.
diff --git a/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs b/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs
index c7d8c51..e1552af 100644
--- a/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs
@@ -15,6 +15,14 @@ namespace ReactiveUI.Primitives.Concurrency;
public class SequencerQueue
where TAbsolute : IComparable
{
+ ///
+ /// Default initial capacity for scheduler queues.
+ ///
+ private const int DefaultCapacity = 1024;
+
+ ///
+ /// Priority queue storing scheduled work.
+ ///
private readonly PriorityQueue> _queue;
///
@@ -22,7 +30,7 @@ public class SequencerQueue
/// Creates a new scheduler queue with a default initial capacity.
///
public SequencerQueue()
- : this(1024)
+ : this(DefaultCapacity)
{
}
diff --git a/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs
index 05486de..45e71c4 100644
--- a/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs
@@ -13,6 +13,9 @@ namespace ReactiveUI.Primitives.Concurrency;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class TaskPoolSequencer : ISequencer
{
+ ///
+ /// Task factory used to schedule asynchronous work.
+ ///
private readonly TaskFactory _taskFactory;
///
@@ -95,7 +98,7 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, Func
public static readonly ThreadPoolSequencer Instance = new();
+
+ ///
+ /// Guards access to outstanding timers.
+ ///
internal static readonly object Gate = new();
- internal static readonly Dictionary Timers = new();
+ ///
+ /// Keeps timers rooted until they fire or are cancelled.
+ ///
+ internal static readonly Dictionary Timers = [];
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
private ThreadPoolSequencer()
{
}
diff --git a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs
index 1ae7a66..18b7960 100644
--- a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs
@@ -71,7 +71,11 @@ public TAbsolute Clock
///
/// Relative time to advance the scheduler's clock by.
/// is negative.
- /// The scheduler is already running. VirtualTimeSequencer doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use .
+ ///
+ /// The scheduler is already running. VirtualTimeSequencer doesn't support running nested
+ /// work dispatch loops. To simulate time slippage while running work on the scheduler,
+ /// use .
+ ///
public void AdvanceBy(TRelative time)
{
var dt = Add(Clock, time);
@@ -102,7 +106,11 @@ public void AdvanceBy(TRelative time)
///
/// Absolute time to advance the scheduler's clock to.
/// is in the past.
- /// The scheduler is already running. VirtualTimeSequencer doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use .
+ ///
+ /// The scheduler is already running. VirtualTimeSequencer doesn't support running nested
+ /// work dispatch loops. To simulate time slippage while running work on the scheduler,
+ /// use .
+ ///
public void AdvanceTo(TAbsolute time)
{
var dueToClock = Comparer.Compare(time, Clock);
@@ -266,28 +274,30 @@ public void Sleep(TRelative time)
///
public void Start()
{
- if (!IsEnabled)
+ if (IsEnabled)
{
- IsEnabled = true;
- do
- {
- var next = GetNext();
- if (next != null)
- {
- if (Comparer.Compare(next.DueTime, Clock) > 0)
- {
- Clock = next.DueTime;
- }
+ return;
+ }
- next.Invoke();
- }
- else
+ IsEnabled = true;
+ do
+ {
+ var next = GetNext();
+ if (next != null)
+ {
+ if (Comparer.Compare(next.DueTime, Clock) > 0)
{
- IsEnabled = false;
+ Clock = next.DueTime;
}
+
+ next.Invoke();
+ }
+ else
+ {
+ IsEnabled = false;
}
- while (IsEnabled);
}
+ while (IsEnabled);
}
///
@@ -331,12 +341,12 @@ public void Stop()
/// Object implementing the requested service, if available; null otherwise.
protected virtual object? GetService(Type serviceType)
{
- if (serviceType == typeof(IStopwatchProvider))
+ if (serviceType != typeof(IStopwatchProvider))
{
- return this;
+ return null;
}
- return null;
+ return this;
}
///
@@ -353,19 +363,41 @@ public void Stop()
/// The corresponding relative time value.
protected abstract TRelative ToRelative(TimeSpan timeSpan);
+ ///
+ /// Converts the current clock value to a .
+ ///
+ /// The current virtual clock as a date-time offset.
private DateTimeOffset ClockToDateTimeOffset() => ToDateTimeOffset(Clock);
+ ///
+ /// Stopwatch backed by virtual time.
+ ///
private sealed class VirtualTimeStopwatch : IStopwatch
{
+ ///
+ /// Parent sequencer that owns the virtual clock.
+ ///
private readonly VirtualTimeSequencerBase _parent;
+
+ ///
+ /// Start time captured when the stopwatch was created.
+ ///
private readonly DateTimeOffset _start;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Parent virtual-time sequencer.
+ /// Start time for elapsed calculations.
public VirtualTimeStopwatch(VirtualTimeSequencerBase parent, DateTimeOffset start)
{
_parent = parent;
_start = start;
}
+ ///
+ /// Gets the elapsed virtual time.
+ ///
public TimeSpan Elapsed => _parent.ClockToDateTimeOffset() - _start;
}
}
diff --git a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerExtensions.cs b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerExtensions.cs
index 00b32a6..2099765 100644
--- a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerExtensions.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerExtensions.cs
@@ -68,6 +68,11 @@ public static IDisposable ScheduleAbsolute(this VirtualTim
return scheduler.ScheduleAbsolute(action, dueTime, static (_, a) => Invoke(a));
}
+ ///
+ /// Invokes an action and returns an empty disposable.
+ ///
+ /// Action to invoke.
+ /// An empty disposable.
private static IDisposable Invoke(Action action)
{
action();
diff --git a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs
index 0644df4..7a3fd79 100644
--- a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs
+++ b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs
@@ -13,6 +13,9 @@ namespace ReactiveUI.Primitives.Concurrency;
public abstract class VirtualTimeSequencer : VirtualTimeSequencerBase
where TAbsolute : IComparable
{
+ ///
+ /// Queue of scheduled virtual-time work.
+ ///
private readonly SequencerQueue _queue = new();
///
diff --git a/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs b/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs
index 8d92b1b..97b2b32 100644
--- a/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs
+++ b/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs
@@ -16,9 +16,24 @@ namespace ReactiveUI.Primitives;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class ConnectableSignal : IObservable
{
+ ///
+ /// Synchronizes connection state.
+ ///
private readonly object _gate = new();
+
+ ///
+ /// Source sequence to connect.
+ ///
private readonly IObservable _source;
+
+ ///
+ /// Multicast hub that receives source values.
+ ///
private readonly ISignal _hub;
+
+ ///
+ /// Active source connection.
+ ///
private IDisposable? _connection;
///
@@ -70,6 +85,10 @@ public static class ConnectableSignalMixins
///
/// Multicasts source values through the supplied hub.
///
+ /// The value type.
+ /// Source sequence to multicast.
+ /// Hub that receives source values.
+ /// A connectable signal.
public static ConnectableSignal Multicast(this IObservable source, ISignal hub)
{
if (source == null)
@@ -88,29 +107,47 @@ public static ConnectableSignal Multicast(this IObservable source, ISig
///
/// Publishes source values through a live signal hub.
///
+ /// The value type.
+ /// Source sequence to publish.
+ /// A connectable live signal.
public static ConnectableSignal PublishLive(this IObservable source) =>
source.Multicast(new Signal());
///
/// Replays source values through a bounded replay hub.
///
+ /// The value type.
+ /// Source sequence to replay.
+ /// Maximum number of values to replay.
+ /// A connectable replay signal.
public static ConnectableSignal ReplayLive(this IObservable source, int bufferSize) =>
source.Multicast(new ReplaySignal(bufferSize));
///
/// Replays source values through a replay hub constrained by count and time.
///
+ /// The value type.
+ /// Source sequence to replay.
+ /// Maximum number of values to replay.
+ /// Maximum replay window.
+ /// A connectable replay signal.
public static ConnectableSignal ReplayLive(this IObservable source, int bufferSize, TimeSpan window) =>
source.Multicast(new ReplaySignal(bufferSize, window));
///
/// Shares one live source subscription while at least one observer is subscribed.
///
+ /// The value type.
+ /// Source sequence to share.
+ /// A reference-counted live sequence.
public static IObservable ShareLive(this IObservable source) => source.PublishLive().RefCount();
///
/// Connects on first subscriber and disconnects when the last subscriber disposes.
///
+ /// The value type.
+ /// Connectable signal to reference count.
+ /// A reference-counted sequence.
public static IObservable RefCount(this ConnectableSignal source)
{
if (source == null)
@@ -122,10 +159,23 @@ public static IObservable RefCount(this ConnectableSignal source)
return ReactiveUI.Primitives.Signals.Signal.Create(gate.Subscribe);
}
+ ///
+ /// Connects on the first observer subscription.
+ ///
+ /// The value type.
+ /// Connectable signal to connect.
+ /// A sequence that connects after the first subscription.
+ public static IObservable AutoConnect(this ConnectableSignal source) =>
+ AutoConnect(source, 1);
+
///
/// Connects after observers have subscribed.
///
- public static IObservable AutoConnect(this ConnectableSignal source, int subscriberCount = 1)
+ /// The value type.
+ /// Connectable signal to connect.
+ /// Number of observers required before connecting.
+ /// A sequence that connects after the requested number of subscriptions.
+ public static IObservable AutoConnect(this ConnectableSignal source, int subscriberCount)
{
if (source == null)
{
@@ -137,37 +187,54 @@ public static IObservable AutoConnect(this ConnectableSignal source, in
throw new ArgumentOutOfRangeException(nameof(subscriberCount));
}
- var gate = new object();
- var count = 0;
- var connected = false;
- return ReactiveUI.Primitives.Signals.Signal.Create(observer =>
- {
- var subscription = source.Subscribe(observer);
- lock (gate)
- {
- count++;
- if (!connected && count >= subscriberCount)
- {
- connected = true;
- source.Connect();
- }
- }
-
- return subscription;
- });
+ var gate = AutoConnectGate.For(source, subscriberCount);
+ return ReactiveUI.Primitives.Signals.Signal.Create(gate.Subscribe);
}
+ ///
+ /// Tracks reference-counted connection state.
+ ///
+ /// The value type.
private sealed class RefCountGate
{
+ ///
+ /// Synchronizes reference-count state.
+ ///
private readonly object _gate = new();
+
+ ///
+ /// Connectable signal being reference-counted.
+ ///
private readonly ConnectableSignal _source;
+
+ ///
+ /// Active subscriber count.
+ ///
private int _count;
+
+ ///
+ /// Active source connection.
+ ///
private IDisposable? _connection;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Connectable signal being reference-counted.
private RefCountGate(ConnectableSignal source) => _source = source;
+ ///
+ /// Creates a reference-count gate for a connectable signal.
+ ///
+ /// Connectable signal being reference-counted.
+ /// A reference-count gate.
public static RefCountGate For(ConnectableSignal source) => new(source);
+ ///
+ /// Subscribes an observer and manages the shared connection lifetime.
+ ///
+ /// Observer to subscribe.
+ /// A disposable that removes the observer and may disconnect the source.
public IDisposable Subscribe(IObserver observer)
{
IDisposable subscription;
@@ -193,4 +260,77 @@ public IDisposable Subscribe(IObserver observer)
});
}
}
+
+ ///
+ /// Tracks auto-connect subscription state.
+ ///
+ /// The value type.
+ private sealed class AutoConnectGate
+ {
+ ///
+ /// Synchronizes auto-connect state.
+ ///
+ private readonly object _gate = new();
+
+ ///
+ /// Connectable signal being auto-connected.
+ ///
+ private readonly ConnectableSignal _source;
+
+ ///
+ /// Number of observers required before connecting.
+ ///
+ private readonly int _subscriberCount;
+
+ ///
+ /// Current subscriber count.
+ ///
+ private int _count;
+
+ ///
+ /// Value indicating whether the source has connected.
+ ///
+ private bool _connected;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Connectable signal being auto-connected.
+ /// Number of observers required before connecting.
+ private AutoConnectGate(ConnectableSignal source, int subscriberCount)
+ {
+ _source = source;
+ _subscriberCount = subscriberCount;
+ }
+
+ ///
+ /// Creates an auto-connect gate for a connectable signal.
+ ///
+ /// Connectable signal being auto-connected.
+ /// Number of observers required before connecting.
+ /// An auto-connect gate.
+ public static AutoConnectGate For(ConnectableSignal source, int subscriberCount) =>
+ new(source, subscriberCount);
+
+ ///
+ /// Subscribes an observer and connects when the threshold is reached.
+ ///
+ /// Observer to subscribe.
+ /// A disposable that removes the observer subscription.
+ public IDisposable Subscribe(IObserver observer)
+ {
+ var subscription = _source.Subscribe(observer);
+ lock (_gate)
+ {
+ _count++;
+ if (!_connected && _count >= _subscriberCount)
+ {
+ _connected = true;
+ _source.Connect();
+ }
+ }
+
+ return subscription;
+ }
+ }
}
diff --git a/src/ReactiveUI.Primitives/Core/Broadcaster{T}.cs b/src/ReactiveUI.Primitives/Core/Broadcaster{T}.cs
index fcd6afb..3ef0062 100644
--- a/src/ReactiveUI.Primitives/Core/Broadcaster{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/Broadcaster{T}.cs
@@ -8,12 +8,22 @@ namespace ReactiveUI.Primitives.Core;
/// Copy-on-write observer broadcaster optimized for zero-allocation single-subscriber delivery.
///
/// The value type.
-internal struct Broadcaster
+internal struct Broadcaster : IEquatable>
{
+ ///
+ /// Stores either a single observer, an observer array, or .
+ ///
private object? _observers;
+ ///
+ /// Gets a value indicating whether at least one observer is registered.
+ ///
public bool HasObservers => Volatile.Read(ref _observers) is not null;
+ ///
+ /// Adds an observer to the broadcaster.
+ ///
+ /// Observer to add.
public void Add(IObserver observer)
{
if (_observers is IObserver[] many)
@@ -34,8 +44,15 @@ public void Add(IObserver observer)
Volatile.Write(ref _observers, observer);
}
+ ///
+ /// Removes all observers from the broadcaster.
+ ///
public void Clear() => Volatile.Write(ref _observers, null);
+ ///
+ /// Removes an observer from the broadcaster.
+ ///
+ /// Observer to remove.
public void Remove(IObserver observer)
{
if (ReferenceEquals(_observers, observer))
@@ -75,6 +92,10 @@ public void Remove(IObserver observer)
Volatile.Write(ref _observers, copy);
}
+ ///
+ /// Broadcasts a value to the current observers.
+ ///
+ /// Value to broadcast.
public void Next(T value)
{
var snapshot = Volatile.Read(ref _observers);
@@ -95,12 +116,16 @@ public void Next(T value)
}
}
- public void Error(Exception error)
+ ///
+ /// Broadcasts an error to the current observers.
+ ///
+ /// Error to broadcast.
+ public void Error(Exception exception)
{
var snapshot = Volatile.Read(ref _observers);
if (snapshot is IObserver single)
{
- single.OnError(error);
+ single.OnError(exception);
return;
}
@@ -111,10 +136,13 @@ public void Error(Exception error)
for (var i = 0; i < many.Length; i++)
{
- many[i].OnError(error);
+ many[i].OnError(exception);
}
}
+ ///
+ /// Broadcasts completion to the current observers.
+ ///
public void Completed()
{
var snapshot = Volatile.Read(ref _observers);
@@ -134,4 +162,16 @@ public void Completed()
many[i].OnCompleted();
}
}
+
+ ///
+ public readonly bool Equals(Broadcaster other) =>
+ ReferenceEquals(_observers, other._observers);
+
+ ///
+ public override readonly bool Equals(object? obj) =>
+ obj is Broadcaster other && Equals(other);
+
+ ///
+ public override readonly int GetHashCode() =>
+ _observers is null ? 0 : System.Runtime.CompilerServices.RuntimeHelpers.GetHashCode(_observers);
}
diff --git a/src/ReactiveUI.Primitives/Core/DisposedWitness{T}.cs b/src/ReactiveUI.Primitives/Core/DisposedWitness{T}.cs
index 1d8497a..4dcf9c4 100644
--- a/src/ReactiveUI.Primitives/Core/DisposedWitness{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/DisposedWitness{T}.cs
@@ -4,18 +4,31 @@
namespace ReactiveUI.Primitives.Core;
+///
+/// Observer that rejects every notification because the subscription has already been disposed.
+///
+/// The observed value type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal sealed class DisposedWitness : IObserver
{
+ ///
+ /// Gets the shared disposed witness instance.
+ ///
public static readonly DisposedWitness Instance = new();
+ ///
+ /// Initializes a new instance of the class.
+ ///
private DisposedWitness()
{
}
+ ///
public void OnCompleted() => throw new ObjectDisposedException(string.Empty);
+ ///
public void OnError(Exception error) => throw new ObjectDisposedException(string.Empty, error);
+ ///
public void OnNext(T value) => throw new ObjectDisposedException(string.Empty);
}
diff --git a/src/ReactiveUI.Primitives/Core/EmptyWitness{T}.cs b/src/ReactiveUI.Primitives/Core/EmptyWitness{T}.cs
index c5fbbcb..f21da00 100644
--- a/src/ReactiveUI.Primitives/Core/EmptyWitness{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/EmptyWitness{T}.cs
@@ -6,33 +6,83 @@
namespace ReactiveUI.Primitives.Core;
+///
+/// Delegate-backed observer that defaults missing handlers to no-op behavior.
+///
+/// The observed value type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal sealed class EmptyWitness : IObserver
{
+ ///
+ /// Gets the shared no-op witness instance.
+ ///
public static readonly EmptyWitness Instance = new(_ => { });
+
+ ///
+ /// Rethrows observer errors with their original stack information.
+ ///
private static readonly Action rethrow = e => ExceptionDispatchInfo.Capture(e).Throw();
+
+ ///
+ /// Completion callback that does nothing.
+ ///
private static readonly Action nop = () => { };
+
+ ///
+ /// Error callback that does nothing.
+ ///
private static readonly Action nope = _ => { };
+ ///
+ /// Callback invoked for each value.
+ ///
private readonly Action _onNext;
+
+ ///
+ /// Callback invoked for an error.
+ ///
private readonly Action _onError;
+
+ ///
+ /// Callback invoked for completion.
+ ///
private readonly Action _onCompleted;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Callback invoked for each value.
public EmptyWitness(Action onNext)
: this(onNext, rethrow, nop)
{
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Callback invoked for each value.
+ /// Callback invoked for an error.
public EmptyWitness(Action onNext, Action onError)
: this(onNext, onError, nop)
{
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Callback invoked for each value.
+ /// Callback invoked for completion.
public EmptyWitness(Action onNext, Action onCompleted)
: this(onNext, rethrow, onCompleted)
{
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Callback invoked for each value.
+ /// Callback invoked for an error.
+ /// Callback invoked for completion.
public EmptyWitness(Action onNext, Action onError, Action onCompleted)
{
_onNext = onNext;
@@ -48,10 +98,12 @@ public EmptyWitness(Action onNext, Action onError, Action onComple
///
/// Calls the action implementing .
///
+ /// Error notification.
public void OnError(Exception error) => (_onError ?? nope)(error);
///
/// Calls the action implementing .
///
+ /// Value notification.
public void OnNext(T value) => _onNext(value);
}
diff --git a/src/ReactiveUI.Primitives/Core/IObserver{TValue,TResult}.cs b/src/ReactiveUI.Primitives/Core/IObserver{TValue,TResult}.cs
index 9fc9d29..905ba96 100644
--- a/src/ReactiveUI.Primitives/Core/IObserver{TValue,TResult}.cs
+++ b/src/ReactiveUI.Primitives/Core/IObserver{TValue,TResult}.cs
@@ -9,13 +9,13 @@ namespace ReactiveUI.Primitives.Core;
///
///
/// The type of the elements received by the observer.
-/// This type parameter is contravariant. That is, you can use either the type you specified or any type that is less derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
+/// This type parameter is contravariant. That is, you can use either the type you specified or any type that is less derived.
///
///
/// The type of the result returned from the observer's notification handlers.
-/// This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
+/// This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived.
///
-public interface IObserver
+public interface IObserver
{
///
/// Notifies the observer of a new element in the sequence.
diff --git a/src/ReactiveUI.Primitives/Core/IRequireCurrentThread.cs b/src/ReactiveUI.Primitives/Core/IRequireCurrentThread.cs
index 6617fb1..1cbc399 100644
--- a/src/ReactiveUI.Primitives/Core/IRequireCurrentThread.cs
+++ b/src/ReactiveUI.Primitives/Core/IRequireCurrentThread.cs
@@ -8,7 +8,7 @@ namespace ReactiveUI.Primitives.Core;
/// IRequireCurrentThread.
///
/// The Type.
-public interface IRequireCurrentThread : IObservable
+public interface IRequireCurrentThread : IObservable
{
///
/// Determines whether [is required subscribe on current thread].
diff --git a/src/ReactiveUI.Primitives/Core/ImmutableList{T}.cs b/src/ReactiveUI.Primitives/Core/ImmutableList{T}.cs
index b4b70b0..64f6575 100644
--- a/src/ReactiveUI.Primitives/Core/ImmutableList{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/ImmutableList{T}.cs
@@ -4,17 +4,39 @@
namespace ReactiveUI.Primitives.Core;
+///
+/// Immutable array-backed list optimized for copy-on-write observer storage.
+///
+/// The item type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
-internal class ImmutableList
+internal sealed class ImmutableList
{
+ ///
+ /// Gets the shared empty list.
+ ///
public static readonly ImmutableList Empty = new();
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Items owned by the immutable list.
public ImmutableList(T[] data) => Items = data;
- private ImmutableList() => Items = new T[0];
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ private ImmutableList() => Items = [];
+ ///
+ /// Gets the immutable list items.
+ ///
public T[] Items { get; }
+ ///
+ /// Returns a new list with the value appended.
+ ///
+ /// Value to append.
+ /// A new immutable list containing the added value.
public ImmutableList Add(T value)
{
var newData = new T[Items.Length + 1];
@@ -23,6 +45,11 @@ public ImmutableList Add(T value)
return new ImmutableList(newData);
}
+ ///
+ /// Returns a new list with the first matching value removed.
+ ///
+ /// Value to remove.
+ /// A new immutable list without the value, or the current list when the value is absent.
public ImmutableList Remove(T value)
{
var i = IndexOf(value);
@@ -45,6 +72,11 @@ public ImmutableList Remove(T value)
return new ImmutableList(newData);
}
+ ///
+ /// Finds the first matching value.
+ ///
+ /// Value to find.
+ /// The value index, or -1 when the value is absent.
public int IndexOf(T value)
{
for (var i = 0; i < Items.Length; ++i)
diff --git a/src/ReactiveUI.Primitives/Core/ListWitness{T}.cs b/src/ReactiveUI.Primitives/Core/ListWitness{T}.cs
index e2b8579..612f848 100644
--- a/src/ReactiveUI.Primitives/Core/ListWitness{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/ListWitness{T}.cs
@@ -4,14 +4,29 @@
namespace ReactiveUI.Primitives.Core;
+///
+/// Observer that forwards notifications to an immutable observer list.
+///
+/// The observed value type.
internal sealed class ListWitness : IObserver
{
+ ///
+ /// Immutable observer snapshot.
+ ///
private readonly ImmutableList> _observers;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Observers that receive forwarded notifications.
public ListWitness(ImmutableList> observers) => _observers = observers;
+ ///
+ /// Gets a value indicating whether the list contains observers.
+ ///
public bool HasObservers => _observers.Items.Length > 0;
+ ///
public void OnCompleted()
{
var targetObservers = _observers.Items;
@@ -21,6 +36,7 @@ public void OnCompleted()
}
}
+ ///
public void OnError(Exception error)
{
var targetObservers = _observers.Items;
@@ -30,6 +46,7 @@ public void OnError(Exception error)
}
}
+ ///
public void OnNext(T value)
{
var targetObservers = _observers.Items;
@@ -39,8 +56,18 @@ public void OnNext(T value)
}
}
+ ///
+ /// Returns a witness with the observer added.
+ ///
+ /// Observer to add.
+ /// The updated observer list witness.
internal IObserver Add(IObserver observer) => new ListWitness(_observers.Add(observer));
+ ///
+ /// Returns a witness with the observer removed.
+ ///
+ /// Observer to remove.
+ /// The updated observer list witness.
internal IObserver Remove(IObserver observer)
{
var i = Array.IndexOf(_observers.Items, observer);
diff --git a/src/ReactiveUI.Primitives/Core/Moment{T}.cs b/src/ReactiveUI.Primitives/Core/Moment{T}.cs
index e18fc63..cb0224d 100644
--- a/src/ReactiveUI.Primitives/Core/Moment{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/Moment{T}.cs
@@ -59,7 +59,7 @@ public Moment(T value, DateTimeOffset timestamp)
///
public override int GetHashCode()
{
- var valueHashCode = Value == null ? 1963 : Value.GetHashCode();
+ var valueHashCode = Value is null ? 1963 : EqualityComparer.Default.GetHashCode(Value);
return Timestamp.GetHashCode() ^ valueHashCode;
}
diff --git a/src/ReactiveUI.Primitives/Core/PriorityQueue.cs b/src/ReactiveUI.Primitives/Core/PriorityQueue.cs
index 74b5020..5b94e95 100644
--- a/src/ReactiveUI.Primitives/Core/PriorityQueue.cs
+++ b/src/ReactiveUI.Primitives/Core/PriorityQueue.cs
@@ -4,26 +4,76 @@
namespace ReactiveUI.Primitives.Core;
+///
+/// Binary heap priority queue that preserves insertion order for equal-priority items.
+///
+/// The queued item type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal sealed class PriorityQueue
where T : IComparable
{
+ ///
+ /// Default queue capacity.
+ ///
+ private const int DefaultCapacity = 16;
+
+ ///
+ /// Number of children per heap node.
+ ///
+ private const int HeapBranchingFactor = 2;
+
+ ///
+ /// Offset from a node's doubled index to its left child.
+ ///
+ private const int LeftChildOffset = 1;
+
+ ///
+ /// Offset from a node's doubled index to its right child.
+ ///
+ private const int RightChildOffset = 2;
+
+ ///
+ /// Capacity divisor used to shrink sparse queues.
+ ///
+ private const int ShrinkDivisor = 4;
+
+ ///
+ /// Monotonic tie-breaker for equal-priority items.
+ ///
private long _count = long.MinValue;
+
+ ///
+ /// Heap storage.
+ ///
private IndexedItem[] _items;
+ ///
+ /// Initializes a new instance of the class.
+ ///
public PriorityQueue()
- : this(16)
+ : this(DefaultCapacity)
{
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Initial queue capacity.
public PriorityQueue(int capacity)
{
_items = new IndexedItem[capacity];
Count = 0;
}
+ ///
+ /// Gets the number of queued items.
+ ///
public int Count { get; private set; }
+ ///
+ /// Removes and returns the highest-priority item.
+ ///
+ /// The highest-priority item.
public T Dequeue()
{
var result = Peek();
@@ -31,12 +81,16 @@ public T Dequeue()
return result;
}
+ ///
+ /// Adds an item to the queue.
+ ///
+ /// Item to enqueue.
public void Enqueue(T item)
{
if (Count >= _items.Length)
{
var temp = _items;
- _items = new IndexedItem[_items.Length * 2];
+ _items = new IndexedItem[_items.Length * HeapBranchingFactor];
Array.Copy(temp, _items, temp.Length);
}
@@ -45,6 +99,10 @@ public void Enqueue(T item)
Percolate(index);
}
+ ///
+ /// Returns the highest-priority item without removing it.
+ ///
+ /// The highest-priority item.
public T Peek()
{
if (Count == 0)
@@ -55,6 +113,11 @@ public T Peek()
return _items[0].Value;
}
+ ///
+ /// Removes a matching item from the queue.
+ ///
+ /// Item to remove.
+ /// when the item was found and removed; otherwise, .
public bool Remove(T item)
{
for (var i = 0; i < Count; ++i)
@@ -69,6 +132,10 @@ public bool Remove(T item)
return false;
}
+ ///
+ /// Restores heap order from the supplied index downward.
+ ///
+ /// Index to heapify.
private void Heapify(int index)
{
if (index >= Count || index < 0)
@@ -78,8 +145,8 @@ private void Heapify(int index)
while (true)
{
- var left = (2 * index) + 1;
- var right = (2 * index) + 2;
+ var left = (HeapBranchingFactor * index) + LeftChildOffset;
+ var right = (HeapBranchingFactor * index) + RightChildOffset;
var first = index;
if (left < Count && IsHigherPriority(left, first))
@@ -103,8 +170,19 @@ private void Heapify(int index)
}
}
+ ///
+ /// Determines whether the left index has higher priority than the right index.
+ ///
+ /// Candidate item index.
+ /// Current item index.
+ /// when the left item should be ordered before the right item.
private bool IsHigherPriority(int left, int right) => _items[left].CompareTo(_items[right]) < 0;
+ ///
+ /// Restores heap order from the supplied index upward.
+ ///
+ /// Index to percolate.
+ /// The final index of the percolated item.
private int Percolate(int index)
{
if (index >= Count || index < 0)
@@ -112,18 +190,22 @@ private int Percolate(int index)
return index;
}
- var parent = (index - 1) / 2;
+ var parent = (index - 1) / HeapBranchingFactor;
while (parent >= 0 && parent != index && IsHigherPriority(index, parent))
{
// swap index and parent
(_items[parent], _items[index]) = (_items[index], _items[parent]);
index = parent;
- parent = (index - 1) / 2;
+ parent = (index - 1) / HeapBranchingFactor;
}
return index;
}
+ ///
+ /// Removes the item at the supplied index.
+ ///
+ /// Index to remove.
private void RemoveAt(int index)
{
_items[index] = _items[--Count];
@@ -134,19 +216,32 @@ private void RemoveAt(int index)
Heapify(index);
}
- if (Count < _items.Length / 4)
+ if (Count >= _items.Length / ShrinkDivisor)
{
- var temp = _items;
- _items = new IndexedItem[_items.Length / 2];
- Array.Copy(temp, 0, _items, 0, Count);
+ return;
}
+
+ var temp = _items;
+ _items = new IndexedItem[_items.Length / HeapBranchingFactor];
+ Array.Copy(temp, 0, _items, 0, Count);
}
- private struct IndexedItem : IComparable
+ ///
+ /// Heap item with an insertion-order tie-breaker.
+ ///
+ private struct IndexedItem : IComparable, IEquatable
{
+ ///
+ /// Insertion order id.
+ ///
public long Id;
+
+ ///
+ /// Queued value.
+ ///
public T Value;
+ ///
public int CompareTo(IndexedItem other)
{
var c = Value.CompareTo(other.Value);
@@ -157,5 +252,21 @@ public int CompareTo(IndexedItem other)
return c;
}
+
+ ///
+ public readonly bool Equals(IndexedItem other) =>
+ Id == other.Id && EqualityComparer.Default.Equals(Value, other.Value);
+
+ ///
+ public override readonly bool Equals(object? obj) =>
+ obj is IndexedItem other && Equals(other);
+
+ ///
+ public override readonly int GetHashCode()
+ {
+ var valueHash = Value is null ? 0 : EqualityComparer.Default.GetHashCode(Value);
+
+ return unchecked((Id.GetHashCode() * 397) ^ valueHash);
+ }
}
}
diff --git a/src/ReactiveUI.Primitives/Core/Spark.cs b/src/ReactiveUI.Primitives/Core/Spark.cs
index 6bb2357..36a611b 100644
--- a/src/ReactiveUI.Primitives/Core/Spark.cs
+++ b/src/ReactiveUI.Primitives/Core/Spark.cs
@@ -12,7 +12,10 @@ public static class Spark
///
/// Creates an object that represents an OnNext spark to an observer.
///
- /// The type of the elements received by the observer. Upon dematerialization of the spark into an observable sequence, this type is used as the element type for the sequence.
+ ///
+ /// The type of the elements received by the observer.
+ /// Upon dematerialization of the spark into an observable sequence, this type is used as the element type for the sequence.
+ ///
/// The value contained in the spark.
/// The OnNext spark containing the value.
public static Spark CreateOnNext(T value) => new Spark.OnNextSpark(value);
@@ -20,10 +23,17 @@ public static class Spark
///
/// Creates an object that represents an OnError spark to an observer.
///
- /// The type of the elements received by the observer. Upon dematerialization of the spark into an observable sequence, this type is used as the element type for the sequence.
+ ///
+ /// The type of the elements received by the observer.
+ /// Upon dematerialization of the spark into an observable sequence, this type is used as the element type for the sequence.
+ ///
/// The exception contained in the spark.
/// The OnError spark containing the exception.
/// is null.
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Sonar Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter determines the returned spark value type.")]
public static Spark CreateOnError(Exception error)
{
if (error == null)
@@ -37,12 +47,26 @@ public static Spark CreateOnError(Exception error)
///
/// Creates an object that represents an OnCompleted spark to an observer.
///
- /// The type of the elements received by the observer. Upon dematerialization of the spark into an observable sequence, this type is used as the element type for the sequence.
+ ///
+ /// The type of the elements received by the observer.
+ /// Upon dematerialization of the spark into an observable sequence, this type is used as the element type for the sequence.
+ ///
/// The OnCompleted spark.
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Sonar Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter determines the cached completed spark value type.")]
public static Spark CreateOnCompleted() => CompletedSparkCache.Instance;
+ ///
+ /// Holds the cached completed spark for a value type.
+ ///
+ /// The cached spark value type.
private static class CompletedSparkCache
{
+ ///
+ /// Gets the cached completed spark.
+ ///
public static readonly Spark Instance = new Spark.OnCompletedSpark();
}
}
diff --git a/src/ReactiveUI.Primitives/Core/Spark{T}.cs b/src/ReactiveUI.Primitives/Core/Spark{T}.cs
index 76d2bd3..e70018f 100644
--- a/src/ReactiveUI.Primitives/Core/Spark{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/Spark{T}.cs
@@ -14,18 +14,14 @@ namespace ReactiveUI.Primitives.Core
///
/// The type of the elements received by the observer.
[Serializable]
-#pragma warning disable CS0659 // Type overrides Object.Equals(object o) but does not override Object.GetHashCode()
-#pragma warning disable CS0661 // Type defines operator == or operator != but does not override Object.GetHashCode()
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public abstract class Spark : IEquatable>
-#pragma warning restore CS0661 // Type defines operator == or operator != but does not override Object.GetHashCode()
-#pragma warning restore CS0659 // Type overrides Object.Equals(object o) but does not override Object.GetHashCode()
{
///
/// Initializes a new instance of the class.
/// Default constructor used by derived types.
///
- protected internal Spark()
+ private protected Spark()
{
}
@@ -54,27 +50,35 @@ protected internal Spark()
///
/// The first Spark<T> to compare, or null.
/// The second Spark<T> to compare, or null.
- /// true if the first Spark<T> value has a different observer message payload as the second Spark<T> value; otherwise, false.
+ ///
+ /// if the first Spark<T> value has a different observer message payload as the second Spark<T> value;
+ /// otherwise, .
+ ///
///
- /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent, including the Spark Kind and the Value or Exception (if any).
- /// This means two Spark<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
- /// In case one wants to determine whether two Spark<T> objects represent a different observer method call, use Object.ReferenceEquals identity equality instead.
+ /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent,
+ /// including the Spark Kind and the Value or Exception (if any). This means two Spark<T> objects can be equal even though
+ /// they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
+ /// Use Object.ReferenceEquals identity equality to determine whether two Spark<T> objects represent a different observer method call.
///
- public static bool operator !=(Spark left, Spark right) => !(left == right);
+ public static bool operator !=(Spark? left, Spark? right) => !(left == right);
///
/// Determines whether the two specified Spark<T> objects have the same observer message payload.
///
/// The first Spark<T> to compare, or null.
/// The second Spark<T> to compare, or null.
- /// true if the first Spark<T> value has the same observer message payload as the second Spark<T> value; otherwise, false.
+ ///
+ /// if the first Spark<T> value has the same observer message payload as the second Spark<T> value;
+ /// otherwise, .
+ ///
///
- /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent, including the Spark Kind and the Value or Exception (if any).
- /// This means two Spark<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
- /// In case one wants to determine whether two Spark<T> objects represent a different observer method call, use Object.ReferenceEquals identity equality instead.
+ /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent,
+ /// including the Spark Kind and the Value or Exception (if any). This means two Spark<T> objects can be equal even though
+ /// they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
+ /// Use Object.ReferenceEquals identity equality to determine whether two Spark<T> objects represent a different observer method call.
///
public static bool operator ==(Spark? left, Spark? right) =>
- ReferenceEquals(left, right) || (left is not null && left.Equals(right));
+ ReferenceEquals(left, right) || (left?.Equals(right) == true);
///
/// Determines whether the current Spark<T> object has the same observer message payload as a specified Spark<T> value.
@@ -82,9 +86,10 @@ protected internal Spark()
/// An object to compare to the current Spark<T> object.
/// true if both Spark<T> objects have the same observer message payload; otherwise, false.
///
- /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent, including the Spark Kind and the Value or Exception (if any).
- /// This means two Spark<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
- /// In case one wants to determine whether two Spark<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
+ /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent,
+ /// including the Spark Kind and the Value or Exception (if any). This means two Spark<T> objects can be equal even though
+ /// they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
+ /// Use Object.ReferenceEquals identity equality to determine whether two Spark<T> objects represent the same observer method call.
///
public abstract bool Equals(Spark? other);
@@ -94,12 +99,19 @@ protected internal Spark()
/// The System.Object to compare with the current Spark<T>.
/// true if the specified System.Object is equal to the current Spark<T>; otherwise, false.
///
- /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent, including the Spark Kind and the Value or Exception (if any).
- /// This means two Spark<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
- /// In case one wants to determine whether two Spark<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
+ /// Equality of Spark<T> objects is based on the equality of the observer message payload they represent,
+ /// including the Spark Kind and the Value or Exception (if any). This means two Spark<T> objects can be equal even though
+ /// they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
+ /// Use Object.ReferenceEquals identity equality to determine whether two Spark<T> objects represent the same observer method call.
///
public override bool Equals(object? obj) => Equals(obj as Spark);
+ ///
+ /// Returns the hash code for this spark.
+ ///
+ /// A hash code for this spark.
+ public abstract override int GetHashCode();
+
///
/// Invokes the observer's method corresponding to the Spark.
///
@@ -153,10 +165,12 @@ public IObservable ToObservable(ISequencer scheduler)
return Signal.Create(observer => scheduler.Schedule(() =>
{
Accept(observer);
- if (Kind == SparkKind.OnNext)
+ if (Kind != SparkKind.OnNext)
{
- observer.OnCompleted();
+ return;
}
+
+ observer.OnCompleted();
}));
}
@@ -171,6 +185,7 @@ internal sealed class OnNextSpark : Spark
/// Initializes a new instance of the class.
/// Constructs a Spark of a new value.
///
+ /// The value carried by the spark.
public OnNextSpark(T value) => Value = value;
///
@@ -196,6 +211,7 @@ internal sealed class OnNextSpark : Spark
///
/// Returns the hash code for this instance.
///
+ /// A hash code for this instance.
public override int GetHashCode() => EqualityComparer.Default.GetHashCode(Value!);
///
@@ -226,6 +242,7 @@ public override bool Equals(Spark? other)
///
/// Returns a string representation of this instance.
///
+ /// A string representation of this instance.
public override string ToString() => string.Format(CultureInfo.CurrentCulture, "OnNext({0})", Value);
///
@@ -245,6 +262,7 @@ public override void Accept(IObserver observer)
///
/// Invokes the observer's method corresponding to the Spark and returns the produced result.
///
+ /// The result type.
/// Observer to invoke the Spark on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
@@ -286,6 +304,7 @@ public override void Accept(Action onNext, Action onError, Action
///
/// Invokes the delegate corresponding to the Spark and returns the produced result.
///
+ /// The result type.
/// Delegate to invoke for an OnNext Spark.
/// Delegate to invoke for an OnError Spark.
/// Delegate to invoke for an OnCompleted Spark.
@@ -322,11 +341,16 @@ internal sealed class OnErrorSpark : Spark
/// Initializes a new instance of the class.
/// Constructs a Spark of an exception.
///
+ /// The exception carried by the spark.
public OnErrorSpark(Exception exception) => Exception = exception;
///
/// Gets throws the exception.
///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Sonar Code Smell",
+ "S2372:Exceptions should not be thrown from property getters",
+ Justification = "Non-OnNext sparks intentionally throw when their Value is requested.")]
public override T Value
{
get
@@ -354,11 +378,14 @@ public override T Value
///
/// Returns the hash code for this instance.
///
+ /// A hash code for this instance.
public override int GetHashCode() => Exception.GetHashCode();
///
/// Indicates whether this instance and other are equal.
///
+ /// The other spark.
+ /// when the sparks are equal; otherwise, .
public override bool Equals(Spark? other)
{
if (ReferenceEquals(this, other))
@@ -382,6 +409,7 @@ public override bool Equals(Spark? other)
///
/// Returns a string representation of this instance.
///
+ /// A string representation of this instance.
public override string ToString() => string.Format(CultureInfo.CurrentCulture, "OnError({0})", Exception.GetType().FullName);
///
@@ -401,6 +429,7 @@ public override void Accept(IObserver observer)
///
/// Invokes the observer's method corresponding to the Spark and returns the produced result.
///
+ /// The result type.
/// Observer to invoke the Spark on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
@@ -442,6 +471,7 @@ public override void Accept(Action onNext, Action onError, Action
///
/// Invokes the delegate corresponding to the Spark and returns the produced result.
///
+ /// The result type.
/// Delegate to invoke for an OnNext Spark.
/// Delegate to invoke for an OnError Spark.
/// Delegate to invoke for an OnCompleted Spark.
@@ -474,17 +504,13 @@ public override TResult Accept(Func onNext, Func
{
- ///
- /// Initializes a new instance of the class.
- /// Constructs a Spark of the end of a sequence.
- ///
- public OnCompletedSpark()
- {
- }
-
///
/// Gets throws an InvalidOperationException.
///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Sonar Code Smell",
+ "S2372:Exceptions should not be thrown from property getters",
+ Justification = "Non-OnNext sparks intentionally throw when their Value is requested.")]
public override T Value => throw new InvalidOperationException("No Value");
///
@@ -505,11 +531,14 @@ public OnCompletedSpark()
///
/// Returns the hash code for this instance.
///
+ /// A hash code for this instance.
public override int GetHashCode() => typeof(T).GetHashCode() ^ 8510;
///
/// Indicates whether this instance and other are equal.
///
+ /// The other spark.
+ /// when the sparks are equal; otherwise, .
public override bool Equals(Spark? other)
{
if (ReferenceEquals(this, other))
@@ -528,6 +557,7 @@ public override bool Equals(Spark? other)
///
/// Returns a string representation of this instance.
///
+ /// A string representation of this instance.
public override string ToString() => "OnCompleted()";
///
@@ -547,6 +577,7 @@ public override void Accept(IObserver observer)
///
/// Invokes the observer's method corresponding to the Spark and returns the produced result.
///
+ /// The result type.
/// Observer to invoke the Spark on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
@@ -588,6 +619,7 @@ public override void Accept(Action onNext, Action onError, Action
///
/// Invokes the delegate corresponding to the Spark and returns the produced result.
///
+ /// The result type.
/// Delegate to invoke for an OnNext Spark.
/// Delegate to invoke for an OnError Spark.
/// Delegate to invoke for an OnCompleted Spark.
diff --git a/src/ReactiveUI.Primitives/Core/ThrowWitness{T}.cs b/src/ReactiveUI.Primitives/Core/ThrowWitness{T}.cs
index e6dd09c..a9f9f83 100644
--- a/src/ReactiveUI.Primitives/Core/ThrowWitness{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/ThrowWitness{T}.cs
@@ -4,21 +4,34 @@
namespace ReactiveUI.Primitives.Core;
+///
+/// Observer that ignores values and completion and rethrows errors.
+///
+/// The observed value type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal sealed class ThrowWitness : IObserver
{
+ ///
+ /// Gets the shared throw witness instance.
+ ///
public static readonly ThrowWitness Instance = new();
+ ///
+ /// Initializes a new instance of the class.
+ ///
private ThrowWitness()
{
}
+ ///
public void OnCompleted()
{
}
+ ///
public void OnError(Exception error) => error.Rethrow();
+ ///
public void OnNext(T value)
{
}
diff --git a/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs b/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs
index cefc75f..d5bdb0f 100644
--- a/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs
+++ b/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs
@@ -83,7 +83,7 @@ public override bool Equals(object? obj)
/// A hash code for the current TimeInterval value.
public override int GetHashCode()
{
- var valueHashCode = Value == null ? 1963 : Value.GetHashCode();
+ var valueHashCode = Value is null ? 1963 : EqualityComparer.Default.GetHashCode(Value);
return Interval.GetHashCode() ^ valueHashCode;
}
diff --git a/src/ReactiveUI.Primitives/Core/Witness.cs b/src/ReactiveUI.Primitives/Core/Witness.cs
index d731983..7e5684b 100644
--- a/src/ReactiveUI.Primitives/Core/Witness.cs
+++ b/src/ReactiveUI.Primitives/Core/Witness.cs
@@ -13,7 +13,14 @@ namespace ReactiveUI.Primitives.Core;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public static class Witness
{
+ ///
+ /// Completion callback that does nothing.
+ ///
private static readonly Action Nop = static () => { };
+
+ ///
+ /// Error callback that rethrows with preserved exception details.
+ ///
private static readonly Action Rethrow = static error => ExceptionDispatchInfo.Capture(error).Throw();
///
@@ -106,12 +113,33 @@ public static IObserver Safe(IObserver observer, IDisposable cancel)
return new SafeWitness(observer, cancel);
}
+ ///
+ /// Delegate-backed observer implementation.
+ ///
+ /// The observed value type.
private sealed class DelegateWitness : IObserver
{
+ ///
+ /// Callback invoked for each value.
+ ///
private readonly Action _onNext;
+
+ ///
+ /// Callback invoked for an error.
+ ///
private readonly Action _onError;
+
+ ///
+ /// Callback invoked for completion.
+ ///
private readonly Action _onCompleted;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Callback invoked for each value.
+ /// Callback invoked for an error.
+ /// Callback invoked for completion.
public DelegateWitness(Action onNext, Action onError, Action onCompleted)
{
_onNext = onNext;
@@ -119,25 +147,49 @@ public DelegateWitness(Action onNext, Action onError, Action onCom
_onCompleted = onCompleted;
}
+ ///
public void OnCompleted() => _onCompleted();
+ ///
public void OnError(Exception error) => _onError(error ?? throw new ArgumentNullException(nameof(error)));
+ ///
public void OnNext(T value) => _onNext(value);
}
+ ///
+ /// Observer wrapper that prevents notifications after termination.
+ ///
+ /// The observed value type.
private sealed class SafeWitness : IObserver
{
+ ///
+ /// Wrapped observer.
+ ///
private readonly IObserver _observer;
+
+ ///
+ /// Cancellation resource disposed on terminal notifications.
+ ///
private IDisposable? _cancel;
+
+ ///
+ /// Non-zero after the observer has stopped.
+ ///
private int _stopped;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Wrapped observer.
+ /// Cancellation resource disposed on terminal notifications.
public SafeWitness(IObserver observer, IDisposable cancel)
{
_observer = observer;
_cancel = cancel;
}
+ ///
public void OnCompleted()
{
if (Interlocked.Exchange(ref _stopped, 1) != 0)
@@ -155,6 +207,7 @@ public void OnCompleted()
}
}
+ ///
public void OnError(Exception error)
{
if (error == null)
@@ -177,6 +230,7 @@ public void OnError(Exception error)
}
}
+ ///
public void OnNext(T value)
{
if (Volatile.Read(ref _stopped) != 0)
@@ -196,6 +250,9 @@ public void OnNext(T value)
}
}
+ ///
+ /// Disposes the cancellation resource exactly once.
+ ///
private void DisposeCancel() => Interlocked.Exchange(ref _cancel, null)?.Dispose();
}
}
diff --git a/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs b/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs
index 97857d4..b3bc7c9 100644
--- a/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs
+++ b/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs
@@ -10,21 +10,37 @@ namespace ReactiveUI.Primitives.Disposables;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class AssignmentSlot : SingleDisposable
{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public AssignmentSlot()
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
/// Action to invoke before the assigned disposable is disposed.
- public AssignmentSlot(Action? action = null)
+ public AssignmentSlot(Action? action)
: base(action)
{
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Initial assignment.
+ public AssignmentSlot(IDisposable disposable)
+ : base(disposable)
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
/// Initial assignment.
/// Action to invoke before the assigned disposable is disposed.
- public AssignmentSlot(IDisposable disposable, Action? action = null)
+ public AssignmentSlot(IDisposable disposable, Action? action)
: base(disposable, action)
{
}
diff --git a/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs b/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs
index ef2b8f0..81e1968 100644
--- a/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs
+++ b/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs
@@ -10,6 +10,9 @@ namespace ReactiveUI.Primitives.Disposables;
///
public sealed class CancellationDisposable : IsDisposed
{
+ ///
+ /// Cancellation source owned by this disposable.
+ ///
private readonly CancellationTokenSource _cts;
///
@@ -50,19 +53,24 @@ public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
- GC.SuppressFinalize(this);
}
+ ///
+ /// Releases resources used by this disposable.
+ ///
+ /// when called from .
private void Dispose(bool disposing)
{
- if (!IsDisposed)
+ if (IsDisposed)
{
- if (disposing)
- {
- _cts.Cancel();
- }
+ return;
+ }
- IsDisposed = true;
+ if (disposing)
+ {
+ _cts.Cancel();
}
+
+ IsDisposed = true;
}
}
diff --git a/src/ReactiveUI.Primitives/Disposables/Disposable.cs b/src/ReactiveUI.Primitives/Disposables/Disposable.cs
index e65e014..300ea0b 100644
--- a/src/ReactiveUI.Primitives/Disposables/Disposable.cs
+++ b/src/ReactiveUI.Primitives/Disposables/Disposable.cs
@@ -23,8 +23,12 @@ public static class Disposable
public static IDisposable Create(Action dispose) =>
dispose == null ? Empty : new AnonymousDisposable(dispose);
+ ///
+ /// Disposable that performs no action.
+ ///
internal sealed class EmptyDisposable : IDisposable
{
+ ///
public void Dispose()
{
}
@@ -35,6 +39,9 @@ public void Dispose()
///
internal sealed class AnonymousDisposable : IDisposable
{
+ ///
+ /// Disposal action, cleared after the first dispose call.
+ ///
private volatile Action? _dispose;
///
diff --git a/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs b/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs
index 1765715..19827c3 100644
--- a/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs
+++ b/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs
@@ -10,14 +10,44 @@ namespace ReactiveUI.Primitives.Disposables;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public class MultipleDisposable : IsDisposed
{
+ ///
+ /// Initial capacity for overflow disposable storage.
+ ///
private const int OverflowInitialCapacity = 2;
+
+ ///
+ /// Growth factor for overflow disposable storage.
+ ///
private const int OverflowGrowthFactor = 2;
+ ///
+ /// Synchronizes mutations to the disposable set.
+ ///
private readonly object _gate = new();
+
+ ///
+ /// First inline disposable slot.
+ ///
private IDisposable? _slot0;
+
+ ///
+ /// Second inline disposable slot.
+ ///
private IDisposable? _slot1;
+
+ ///
+ /// Overflow disposable slots used after the inline slots are occupied.
+ ///
private IDisposable[]? _overflow;
+
+ ///
+ /// Number of active overflow disposable slots.
+ ///
private int _overflowCount;
+
+ ///
+ /// Value indicating whether this group is disposed.
+ ///
private bool _disposed;
///
@@ -117,10 +147,12 @@ public void Add(IDisposable disposable)
}
}
- if (shouldDispose)
+ if (!shouldDispose)
{
- disposable.Dispose();
+ return;
}
+
+ disposable.Dispose();
}
///
@@ -207,6 +239,10 @@ protected virtual void Dispose(bool disposing)
}
}
+ ///
+ /// Adds a disposable while the caller holds the gate.
+ ///
+ /// Disposable to add.
private void AddCore(IDisposable disposable)
{
if (_slot0 == null)
@@ -235,6 +271,11 @@ private void AddCore(IDisposable disposable)
_overflow[_overflowCount++] = disposable;
}
+ ///
+ /// Removes a disposable while the caller holds the gate.
+ ///
+ /// Disposable to remove.
+ /// when the item was removed; otherwise, .
private bool RemoveCore(IDisposable item)
{
if (_slot0 != null && EqualityComparer.Default.Equals(_slot0, item))
@@ -275,22 +316,35 @@ private bool RemoveCore(IDisposable item)
return false;
}
+ ///
+ /// Array-backed disposable group returned by the static factory.
+ ///
private sealed class MultipleDisposableBase : IDisposable
{
+ ///
+ /// Disposables to release, or after disposal.
+ ///
private IDisposable[]? _disposables;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Disposables owned by the group.
public MultipleDisposableBase(IDisposable[] disposables) =>
Volatile.Write(ref _disposables, disposables ?? throw new ArgumentNullException(nameof(disposables)));
+ ///
public void Dispose()
{
var disposables = Interlocked.Exchange(ref _disposables, null);
- if (disposables != null)
+ if (disposables == null)
+ {
+ return;
+ }
+
+ foreach (var disposable in disposables)
{
- foreach (var disposable in disposables)
- {
- disposable?.Dispose();
- }
+ disposable?.Dispose();
}
}
}
diff --git a/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs b/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs
index 86f6403..2d6a349 100644
--- a/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs
+++ b/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs
@@ -10,23 +10,49 @@ namespace ReactiveUI.Primitives.Disposables;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public class SingleDisposable : IsDisposed
{
+ ///
+ /// Marker used once the slot has been disposed.
+ ///
private static readonly IDisposable DisposedSentinel = new DisposedMarker();
+ ///
+ /// Action invoked before disposal.
+ ///
private readonly Action? _action;
+
+ ///
+ /// Assigned disposable or the disposed marker.
+ ///
private IDisposable? _disposable;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public SingleDisposable()
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
/// Action to invoke before the assigned disposable is disposed.
- public SingleDisposable(Action? action = null) => _action = action;
+ public SingleDisposable(Action? action) => _action = action;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The disposable.
+ public SingleDisposable(IDisposable disposable)
+ : this(disposable, null)
+ {
+ }
///
/// Initializes a new instance of the class.
///
/// The disposable.
/// Action to invoke before the assigned disposable is disposed.
- public SingleDisposable(IDisposable disposable, Action? action = null)
+ public SingleDisposable(IDisposable disposable, Action? action)
: this(action) => Create(disposable);
///
@@ -66,7 +92,7 @@ public void Create(IDisposable disposable)
return;
}
- throw new InvalidOperationException("The disposable slot has already been assigned.");
+ throw new InvalidOperationException($"The {nameof(disposable)} slot has already been assigned.");
}
///
@@ -99,8 +125,12 @@ protected virtual void Dispose(bool disposing)
disposable.Dispose();
}
+ ///
+ /// Disposable marker for disposed slots.
+ ///
private sealed class DisposedMarker : IDisposable
{
+ ///
public void Dispose()
{
}
diff --git a/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs b/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs
index 075d1b3..5af90c4 100644
--- a/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs
+++ b/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs
@@ -10,27 +10,53 @@ namespace ReactiveUI.Primitives.Disposables;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public class SingleReplaceableDisposable : IsDisposed
{
+ ///
+ /// Marker used once the slot has been disposed.
+ ///
private static readonly IDisposable DisposedSentinel = new DisposedMarker();
+ ///
+ /// Action invoked before disposal.
+ ///
private readonly Action? _action;
+
+ ///
+ /// Current disposable or the disposed marker.
+ ///
private IDisposable? _disposable;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public SingleReplaceableDisposable()
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
/// The action.
- public SingleReplaceableDisposable(Action? action = null) =>
+ public SingleReplaceableDisposable(Action? action) =>
_action = action;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The disposable.
+ public SingleReplaceableDisposable(IDisposable disposable)
+ : this(disposable, null)
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
/// The disposable.
/// The action to call before disposal.
- public SingleReplaceableDisposable(IDisposable disposable, Action? action = null)
+ public SingleReplaceableDisposable(IDisposable disposable, Action? action)
{
- Create(disposable);
_action = action;
+ Create(disposable);
}
///
@@ -51,14 +77,20 @@ public bool IsDisposed
/// Creates the specified disposable.
///
/// The disposable.
+ /// is .
public void Create(IDisposable disposable)
{
+ if (disposable == null)
+ {
+ throw new ArgumentNullException(nameof(disposable));
+ }
+
while (true)
{
var current = Volatile.Read(ref _disposable);
if (ReferenceEquals(current, DisposedSentinel))
{
- disposable?.Dispose();
+ disposable.Dispose();
_action?.Invoke();
return;
}
@@ -99,8 +131,12 @@ protected virtual void Dispose(bool disposing)
_action?.Invoke();
}
+ ///
+ /// Disposable marker for disposed slots.
+ ///
private sealed class DisposedMarker : IDisposable
{
+ ///
public void Dispose()
{
}
diff --git a/src/ReactiveUI.Primitives/Disposables/Slot.cs b/src/ReactiveUI.Primitives/Disposables/Slot.cs
index bea4d38..7dce65b 100644
--- a/src/ReactiveUI.Primitives/Disposables/Slot.cs
+++ b/src/ReactiveUI.Primitives/Disposables/Slot.cs
@@ -10,21 +10,37 @@ namespace ReactiveUI.Primitives.Disposables;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class Slot : SingleReplaceableDisposable
{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public Slot()
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
/// Action to call when the slot is disposed.
- public Slot(Action? action = null)
+ public Slot(Action? action)
: base(action)
{
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Initial disposable.
+ public Slot(IDisposable disposable)
+ : base(disposable)
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
/// Initial disposable.
/// Action to call when the slot is disposed.
- public Slot(IDisposable disposable, Action? action = null)
+ public Slot(IDisposable disposable, Action? action)
: base(disposable, action)
{
}
diff --git a/src/ReactiveUI.Primitives/ExceptionMixins.cs b/src/ReactiveUI.Primitives/ExceptionMixins.cs
index 0b0cee2..2847271 100644
--- a/src/ReactiveUI.Primitives/ExceptionMixins.cs
+++ b/src/ReactiveUI.Primitives/ExceptionMixins.cs
@@ -4,8 +4,15 @@
namespace ReactiveUI.Primitives;
+///
+/// Exception helper methods.
+///
internal static class ExceptionMixins
{
+ ///
+ /// Throws the exception while preserving stack trace where required by the target framework.
+ ///
+ /// Exception to throw.
public static void Throw(this Exception exception)
{
#if NET472 || NETSTANDARD2_0
diff --git a/src/ReactiveUI.Primitives/Handle.cs b/src/ReactiveUI.Primitives/Handle.cs
index 23decf1..d669a9b 100644
--- a/src/ReactiveUI.Primitives/Handle.cs
+++ b/src/ReactiveUI.Primitives/Handle.cs
@@ -6,11 +6,31 @@
namespace ReactiveUI.Primitives;
+///
+/// Shared delegate handlers.
+///
internal static class Handle
{
+ ///
+ /// Action that does nothing.
+ ///
public static readonly Action Nop = () => { };
+
+ ///
+ /// Error handler that throws the supplied exception.
+ ///
public static readonly Action Throw = ex => ex.Throw();
+ ///
+ /// Converts an error into an empty observable sequence.
+ ///
+ /// The source value type.
+ /// Ignored exception.
+ /// An empty sequence.
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Sonar Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter determines the empty sequence value type.")]
public static IObservable CatchIgnore(Exception ex) =>
Signal.Empty();
}
diff --git a/src/ReactiveUI.Primitives/Handle{T1,T2,T3}.cs b/src/ReactiveUI.Primitives/Handle{T1,T2,T3}.cs
index b60d981..cc86e28 100644
--- a/src/ReactiveUI.Primitives/Handle{T1,T2,T3}.cs
+++ b/src/ReactiveUI.Primitives/Handle{T1,T2,T3}.cs
@@ -4,10 +4,23 @@
namespace ReactiveUI.Primitives;
+///
+/// Shared delegate handlers for three-argument callbacks.
+///
+/// The first value type.
+/// The second value type.
+/// The third value type.
internal static class Handle
{
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
- public static readonly Action Ignore = (_, __, ___) => { };
- public static readonly Action Throw = (ex, _, __, ___) => ex.Throw();
+ ///
+ /// Callback that ignores all values.
+ ///
+ public static readonly Action Ignore = (_, _, _) => { };
+
+ ///
+ /// Error callback that throws the supplied exception.
+ ///
+ public static readonly Action Throw = (ex, _, _, _) => ex.Throw();
#pragma warning restore SA1313 // Parameter names should begin with lower-case letter
}
diff --git a/src/ReactiveUI.Primitives/Handle{T1,T2}.cs b/src/ReactiveUI.Primitives/Handle{T1,T2}.cs
index b59cc2e..a7219eb 100644
--- a/src/ReactiveUI.Primitives/Handle{T1,T2}.cs
+++ b/src/ReactiveUI.Primitives/Handle{T1,T2}.cs
@@ -4,8 +4,20 @@
namespace ReactiveUI.Primitives;
+///
+/// Shared delegate handlers for two-argument callbacks.
+///
+/// The first value type.
+/// The second value type.
internal static class Handle
{
- public static readonly Action Ignore = (_, __) => { };
- public static readonly Action Throw = (ex, _, __) => ex.Throw();
+ ///
+ /// Callback that ignores both values.
+ ///
+ public static readonly Action Ignore = (_, _) => { };
+
+ ///
+ /// Error callback that throws the supplied exception.
+ ///
+ public static readonly Action Throw = (ex, _, _) => ex.Throw();
}
diff --git a/src/ReactiveUI.Primitives/Handle{T}.cs b/src/ReactiveUI.Primitives/Handle{T}.cs
index be86e40..cc3fa77 100644
--- a/src/ReactiveUI.Primitives/Handle{T}.cs
+++ b/src/ReactiveUI.Primitives/Handle{T}.cs
@@ -4,9 +4,24 @@
namespace ReactiveUI.Primitives;
+///
+/// Shared delegate handlers for one-argument callbacks.
+///
+/// The value type.
internal static class Handle
{
- public static readonly Action Ignore = (T _) => { };
- public static readonly Func Identity = (T t) => t;
+ ///
+ /// Callback that ignores its value.
+ ///
+ public static readonly Action Ignore = (_) => { };
+
+ ///
+ /// Function that returns its input.
+ ///
+ public static readonly Func Identity = (t) => t;
+
+ ///
+ /// Error callback that throws the supplied exception.
+ ///
public static readonly Action Throw = (ex, _) => ex.Throw();
}
diff --git a/src/ReactiveUI.Primitives/LinqMixins.OperatorGate.cs b/src/ReactiveUI.Primitives/LinqMixins.OperatorGate.cs
new file mode 100644
index 0000000..f2800e5
--- /dev/null
+++ b/src/ReactiveUI.Primitives/LinqMixins.OperatorGate.cs
@@ -0,0 +1,22 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+namespace ReactiveUI.Primitives;
+
+///
+/// SelectMixins.
+///
+public static partial class LinqMixins
+{
+ ///
+ /// Per-subscription synchronization gate for operators that coordinate callbacks from multiple sources.
+ ///
+ private sealed class OperatorGate
+ {
+ ///
+ /// Gets the stable synchronization object for the subscription.
+ ///
+ internal object SyncRoot => this;
+ }
+}
diff --git a/src/ReactiveUI.Primitives/LinqMixins.cs b/src/ReactiveUI.Primitives/LinqMixins.cs
index e1d41a3..23ef249 100644
--- a/src/ReactiveUI.Primitives/LinqMixins.cs
+++ b/src/ReactiveUI.Primitives/LinqMixins.cs
@@ -99,13 +99,21 @@ public static IDisposable DisposeWith(this IDisposable disposable, MultipleDispo
return disposable;
}
+ ///
+ /// Disposes the with.
+ ///
+ /// The disposable.
+ /// A SingleDisposable.
+ public static SingleDisposable DisposeWith(this IDisposable disposable) =>
+ new(disposable);
+
///
/// Disposes the with.
///
/// The disposable.
/// The action.
/// A SingleDisposable.
- public static SingleDisposable DisposeWith(this IDisposable disposable, Action? action = null) =>
+ public static SingleDisposable DisposeWith(this IDisposable disposable, Action? action) =>
new(disposable, action);
///
diff --git a/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs
index edc0890..a6853d7 100644
--- a/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs
@@ -15,10 +15,30 @@ namespace ReactiveUI.Primitives.Signals;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public class AsyncSignal : IAwaitSignal
{
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _observerLock = new();
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private T? _lastValue;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _hasValue;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Exception? _lastError;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private IObserver _outObserver = EmptyWitness.Instance;
///
@@ -102,6 +122,21 @@ public void OnCompleted()
}
}
+ ///
+ /// Specifies a callback action that will be invoked when the subject completes.
+ ///
+ /// Callback action that will be invoked when the subject completes.
+ /// is null.
+ public void OnCompleted(Action continuation)
+ {
+ if (continuation == null)
+ {
+ throw new ArgumentNullException(nameof(continuation));
+ }
+
+ SubscribeCompletion(continuation, true);
+ }
+
///
/// Called when [error].
///
@@ -182,11 +217,11 @@ public IDisposable Subscribe(IObserver observer)
var current = _outObserver;
if (current is EmptyWitness)
{
- _outObserver = new ListWitness(new ImmutableList>(new[] { observer }));
+ _outObserver = new ListWitness(new ImmutableList>([observer]));
}
else
{
- _outObserver = new ListWitness(new ImmutableList>(new[] { current, observer }));
+ _outObserver = new ListWitness(new ImmutableList>([current, observer]));
}
}
@@ -231,21 +266,6 @@ public void Dispose()
/// Object that can be awaited.
public IAwaitSignal GetAwaiter() => this;
- ///
- /// Specifies a callback action that will be invoked when the subject completes.
- ///
- /// Callback action that will be invoked when the subject completes.
- /// is null.
- public void OnCompleted(Action continuation)
- {
- if (continuation == null)
- {
- throw new ArgumentNullException(nameof(continuation));
- }
-
- OnCompleted(continuation, true);
- }
-
///
/// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.
///
@@ -256,7 +276,7 @@ public T GetResult()
if (!IsCompleted)
{
var e = new ManualResetEvent(false);
- OnCompleted(() => e.Set(), false);
+ SubscribeCompletion(() => e.Set(), false);
e.WaitOne();
}
@@ -276,38 +296,65 @@ public T GetResult()
/// true to release both managed and unmanaged resources; false to release only unmanaged resources.
protected virtual void Dispose(bool disposing)
{
- if (!IsDisposed)
+ if (IsDisposed)
+ {
+ return;
+ }
+
+ if (disposing)
{
- if (disposing)
+ lock (_observerLock)
{
- lock (_observerLock)
- {
- _outObserver = DisposedWitness.Instance;
- _lastError = null;
- _lastValue = default;
- }
+ _outObserver = DisposedWitness.Instance;
+ _lastError = null;
+ _lastValue = default;
}
-
- IsDisposed = true;
}
+
+ IsDisposed = true;
}
+ ///
+ /// Executes the ThrowIfDisposed operation.
+ ///
private void ThrowIfDisposed()
{
- if (IsDisposed)
+ if (!IsDisposed)
{
- throw new ObjectDisposedException(string.Empty);
+ return;
}
+
+ throw new ObjectDisposedException(string.Empty);
}
- private void OnCompleted(Action continuation, bool originalContext) =>
+ ///
+ /// Executes the SubscribeCompletion operation.
+ ///
+ /// The continuation value.
+ /// The originalContext value.
+ private void SubscribeCompletion(Action continuation, bool originalContext) =>
Subscribe(new AwaitObserver(continuation, originalContext));
- private class AwaitObserver : IObserver
+ ///
+ /// Represents the AwaitObserver class.
+ ///
+ private sealed class AwaitObserver : IObserver
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly SynchronizationContext? _context;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly Action _callback;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The callback value.
+ /// The originalContext value.
public AwaitObserver(Action callback, bool originalContext)
{
if (originalContext)
@@ -318,14 +365,28 @@ public AwaitObserver(Action callback, bool originalContext)
_callback = callback;
}
+ ///
+ /// Executes the OnCompleted operation.
+ ///
public void OnCompleted() => InvokeOnOriginalContext();
+ ///
+ /// Executes the OnError operation.
+ ///
+ /// The error value.
public void OnError(Exception error) => InvokeOnOriginalContext();
+ ///
+ /// Executes the OnNext operation.
+ ///
+ /// The value.
public void OnNext(T value)
{
}
+ ///
+ /// Executes the InvokeOnOriginalContext operation.
+ ///
private void InvokeOnOriginalContext()
{
if (_context != null)
@@ -339,18 +400,41 @@ private void InvokeOnOriginalContext()
}
}
- private class ObserverHandler : IDisposable
+ ///
+ /// Represents the ObserverHandler class.
+ ///
+ private sealed class ObserverHandler : IDisposable
{
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _gate = new();
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private AsyncSignal? _subject;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private IObserver? _observer;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The subject value.
+ /// The observer value.
public ObserverHandler(AsyncSignal subject, IObserver observer)
{
_subject = subject;
_observer = observer;
}
+ ///
+ /// Executes the Dispose operation.
+ ///
public void Dispose()
{
lock (_gate)
diff --git a/src/ReactiveUI.Primitives/Signal/BehaviourSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/BehaviourSignal{T}.cs
index 5b7c8c6..77374a0 100644
--- a/src/ReactiveUI.Primitives/Signal/BehaviourSignal{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/BehaviourSignal{T}.cs
@@ -14,17 +14,42 @@ namespace ReactiveUI.Primitives.Signals;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public class BehaviourSignal : ISignal
{
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _observerLock = new();
+#pragma warning disable S3459 // Broadcaster is a mutable struct whose default value is the empty broadcaster.
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Broadcaster _broadcaster;
+#pragma warning restore S3459
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _isStopped;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private T? _lastValue;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Exception? _lastError;
///
/// Initializes a new instance of the class.
///
/// The default value.
- public BehaviourSignal(T defaultValue) => _lastValue = defaultValue;
+ public BehaviourSignal(T defaultValue)
+ {
+ _lastValue = defaultValue;
+ }
///
/// Gets the current value or throws an exception.
@@ -228,42 +253,72 @@ public void Dispose()
/// true to release both managed and unmanaged resources; false to release only unmanaged resources.
protected virtual void Dispose(bool disposing)
{
- if (!IsDisposed)
+ if (IsDisposed)
{
- if (disposing)
+ return;
+ }
+
+ if (disposing)
+ {
+ lock (_observerLock)
{
- lock (_observerLock)
- {
- _broadcaster.Clear();
- _lastError = null;
- _lastValue = default;
- }
+ _broadcaster.Clear();
+ _lastError = null;
+ _lastValue = default;
}
-
- IsDisposed = true;
}
+
+ IsDisposed = true;
}
+ ///
+ /// Executes the ThrowIfDisposed operation.
+ ///
private void ThrowIfDisposed()
{
- if (IsDisposed)
+ if (!IsDisposed)
{
- throw new ObjectDisposedException(string.Empty);
+ return;
}
+
+ throw new ObjectDisposedException(string.Empty);
}
- private class ObserverHandler : IDisposable
+ ///
+ /// Represents the ObserverHandler class.
+ ///
+ private sealed class ObserverHandler : IDisposable
{
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _lock = new();
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private BehaviourSignal? _subject;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private IObserver? _observer;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The subject value.
+ /// The observer value.
public ObserverHandler(BehaviourSignal subject, IObserver observer)
{
_subject = subject;
_observer = observer;
}
+ ///
+ /// Executes the Dispose operation.
+ ///
public void Dispose()
{
lock (_lock)
diff --git a/src/ReactiveUI.Primitives/Signal/BufferSignal{T,TResult}.cs b/src/ReactiveUI.Primitives/Signal/BufferSignal{T,TResult}.cs
index f441cc4..e209b3f 100644
--- a/src/ReactiveUI.Primitives/Signal/BufferSignal{T,TResult}.cs
+++ b/src/ReactiveUI.Primitives/Signal/BufferSignal{T,TResult}.cs
@@ -4,16 +4,46 @@
namespace ReactiveUI.Primitives.Signals;
+///
+/// Represents the BufferSignal class.
+///
+/// The T type.
+/// The TResult type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
-internal class BufferSignal : Signal>
- where TResult : IList?
+internal sealed class BufferSignal : Signal
+ where TResult : class, IList
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly int _skip;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly int _count;
- private IList? _buffer;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
+ private TResult? _buffer;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private int _index;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private IDisposable? _subscription;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The source value.
+ /// The count value.
+ /// The skip value.
public BufferSignal(IObservable source, int count, int skip)
{
_skip = skip;
@@ -31,7 +61,7 @@ public BufferSignal(IObservable source, int count, int skip)
if (idx == 0)
{
// Reset buffer.
- buffer = new List();
+ buffer = CreateBuffer();
_buffer = buffer;
}
@@ -71,26 +101,38 @@ public BufferSignal(IObservable source, int count, int skip)
});
}
+ ///
+ /// Executes the Dispose operation.
+ ///
+ /// The disposing value.
protected override void Dispose(bool disposing)
{
- if (IsDisposed)
+ if (IsDisposed || !disposing)
{
+ base.Dispose(disposing);
return;
}
- Dispose(disposing);
- if (disposing)
+ var buffer = _buffer;
+ _buffer = null;
+
+ if (buffer != null)
{
- var buffer = _buffer;
- _buffer = null;
+ OnNext(buffer);
+ }
- if (buffer != null)
- {
- OnNext(buffer);
- }
+ _subscription?.Dispose();
+ _subscription = null;
+ base.Dispose(disposing);
+ }
- _subscription?.Dispose();
- _subscription = null;
- }
+ ///
+ /// Executes the CreateBuffer operation.
+ ///
+ /// The result.
+ private TResult CreateBuffer()
+ {
+ var buffer = new List(_count);
+ return (TResult)(IList)buffer;
}
}
diff --git a/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs b/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs
index 378218a..503479d 100644
--- a/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs
+++ b/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs
@@ -13,37 +13,87 @@ namespace ReactiveUI.Primitives.Signals;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class CommandSignal : IDisposable
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly Func> _execute;
+
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _gate = new();
+
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly Signal _results = new();
+
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly Signal _faults = new();
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly IDisposable? _canRunSubscription;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _canRun;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _disposed;
///
/// Initializes a new instance of the class.
///
/// The async operation to execute.
- /// Optional gating signal. When omitted, execution is always allowed.
- public CommandSignal(Func> execute, IObservable? canRun = null)
+ public CommandSignal(Func> execute)
+ : this(execute, null)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The async operation to execute.
+ /// Gating signal. When null, execution is always allowed.
+ public CommandSignal(Func> execute, IObservable? canRun)
{
_execute = execute ?? throw new ArgumentNullException(nameof(execute));
_canRun = canRun == null;
IsRunning = new StateSignal(false);
- if (canRun != null)
+ if (canRun == null)
{
- _canRunSubscription = canRun.Subscribe(value => _canRun = value, _faults.OnNext);
+ return;
}
+
+ _canRunSubscription = canRun.Subscribe(value => _canRun = value, _faults.OnNext);
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The synchronous operation to execute.
+ public CommandSignal(Func execute)
+ : this(execute, null)
+ {
}
///
/// Initializes a new instance of the class.
///
/// The synchronous operation to execute.
- /// Optional gating signal. When omitted, execution is always allowed.
- public CommandSignal(Func execute, IObservable? canRun = null)
+ /// Gating signal. When null, execution is always allowed.
+ public CommandSignal(Func execute, IObservable? canRun)
: this(_ => Task.FromResult((execute ?? throw new ArgumentNullException(nameof(execute)))()), canRun)
{
}
@@ -68,12 +118,18 @@ public CommandSignal(Func execute, IObservable? canRun = null)
///
public bool CanRun => Volatile.Read(ref _canRun);
+ ///
+ /// Executes the command if allowed and publishes the result or fault.
+ ///
+ /// The command result.
+ public Task ExecuteAsync() => ExecuteAsync(CancellationToken.None);
+
///
/// Executes the command if allowed and publishes the result or fault.
///
/// Cancellation token for the operation.
/// The command result.
- public async Task ExecuteAsync(CancellationToken cancellationToken = default)
+ public async Task ExecuteAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
lock (_gate)
@@ -103,7 +159,9 @@ public async Task ExecuteAsync(CancellationToken cancellationToken = de
}
}
- ///
+ ///
+ /// Executes the Dispose operation.
+ ///
public void Dispose()
{
if (_disposed)
@@ -118,11 +176,16 @@ public void Dispose()
IsRunning.Dispose();
}
+ ///
+ /// Executes the ThrowIfDisposed operation.
+ ///
private void ThrowIfDisposed()
{
- if (_disposed)
+ if (!_disposed)
{
- throw new ObjectDisposedException(nameof(CommandSignal));
+ return;
}
+
+ throw new ObjectDisposedException(nameof(CommandSignal));
}
}
diff --git a/src/ReactiveUI.Primitives/Signal/ISignal{T}.cs b/src/ReactiveUI.Primitives/Signal/ISignal{T}.cs
index 1b6633c..baacb69 100644
--- a/src/ReactiveUI.Primitives/Signal/ISignal{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/ISignal{T}.cs
@@ -8,6 +8,4 @@ namespace ReactiveUI.Primitives.Signals;
/// ISubject.
///
/// The Type.
-public interface ISignal : ISignal
-{
-}
+public interface ISignal : ISignal;
diff --git a/src/ReactiveUI.Primitives/Signal/KeepSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/KeepSignal{T}.cs
index 7abc0f5..046ff44 100644
--- a/src/ReactiveUI.Primitives/Signal/KeepSignal{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/KeepSignal{T}.cs
@@ -6,21 +6,46 @@
namespace ReactiveUI.Primitives.Signals;
+///
+/// Represents the KeepSignal class.
+///
+/// The T type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
-internal sealed class KeepSignal : IObservable, IRequireCurrentThread
+internal sealed class KeepSignal : IRequireCurrentThread
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly IObservable _source;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly Func _predicate;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The source value.
+ /// The predicate value.
public KeepSignal(IObservable source, Func predicate)
{
_source = source;
_predicate = predicate;
}
+ ///
+ /// Executes the IsRequiredSubscribeOnCurrentThread operation.
+ ///
+ /// The result.
public bool IsRequiredSubscribeOnCurrentThread() =>
_source is IRequireCurrentThread currentThread && currentThread.IsRequiredSubscribeOnCurrentThread();
+ ///
+ /// Executes the Subscribe operation.
+ ///
+ /// The observer value.
+ /// The result.
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
@@ -31,36 +56,70 @@ public IDisposable Subscribe(IObserver observer)
return _source.Subscribe(new KeepObserver(observer, _predicate));
}
+ ///
+ /// Represents the KeepObserver class.
+ ///
private sealed class KeepObserver : IObserver
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly IObserver _observer;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly Func _predicate;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _stopped;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The observer value.
+ /// The predicate value.
public KeepObserver(IObserver observer, Func predicate)
{
_observer = observer;
_predicate = predicate;
}
+ ///
+ /// Executes the OnCompleted operation.
+ ///
public void OnCompleted()
{
- if (!_stopped)
+ if (_stopped)
{
- _stopped = true;
- _observer.OnCompleted();
+ return;
}
+
+ _stopped = true;
+ _observer.OnCompleted();
}
+ ///
+ /// Executes the OnError operation.
+ ///
+ /// The error value.
public void OnError(Exception error)
{
- if (!_stopped)
+ if (_stopped)
{
- _stopped = true;
- _observer.OnError(error);
+ return;
}
+
+ _stopped = true;
+ _observer.OnError(error);
}
+ ///
+ /// Executes the OnNext operation.
+ ///
+ /// The value.
public void OnNext(T value)
{
if (_stopped)
@@ -79,10 +138,12 @@ public void OnNext(T value)
return;
}
- if (keep)
+ if (!keep)
{
- _observer.OnNext(value);
+ return;
}
+
+ _observer.OnNext(value);
}
}
}
diff --git a/src/ReactiveUI.Primitives/Signal/MapSignal{TSource,TResult}.cs b/src/ReactiveUI.Primitives/Signal/MapSignal{TSource,TResult}.cs
index cea9d72..7601281 100644
--- a/src/ReactiveUI.Primitives/Signal/MapSignal{TSource,TResult}.cs
+++ b/src/ReactiveUI.Primitives/Signal/MapSignal{TSource,TResult}.cs
@@ -6,21 +6,47 @@
namespace ReactiveUI.Primitives.Signals;
+///
+/// Represents the MapSignal class.
+///
+/// The TSource type.
+/// The TResult type.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
-internal sealed class MapSignal : IObservable, IRequireCurrentThread
+internal sealed class MapSignal : IRequireCurrentThread
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly IObservable _source;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly Func _selector;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The source value.
+ /// The selector value.
public MapSignal(IObservable source, Func selector)
{
_source = source;
_selector = selector;
}
+ ///
+ /// Executes the IsRequiredSubscribeOnCurrentThread operation.
+ ///
+ /// The result.
public bool IsRequiredSubscribeOnCurrentThread() =>
_source is IRequireCurrentThread currentThread && currentThread.IsRequiredSubscribeOnCurrentThread();
+ ///
+ /// Executes the Subscribe operation.
+ ///
+ /// The observer value.
+ /// The result.
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
@@ -31,36 +57,70 @@ public IDisposable Subscribe(IObserver observer)
return _source.Subscribe(new MapObserver(observer, _selector));
}
+ ///
+ /// Represents the MapObserver class.
+ ///
private sealed class MapObserver : IObserver
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly IObserver _observer;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly Func _selector;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _stopped;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The observer value.
+ /// The selector value.
public MapObserver(IObserver observer, Func selector)
{
_observer = observer;
_selector = selector;
}
+ ///
+ /// Executes the OnCompleted operation.
+ ///
public void OnCompleted()
{
- if (!_stopped)
+ if (_stopped)
{
- _stopped = true;
- _observer.OnCompleted();
+ return;
}
+
+ _stopped = true;
+ _observer.OnCompleted();
}
+ ///
+ /// Executes the OnError operation.
+ ///
+ /// The error value.
public void OnError(Exception error)
{
- if (!_stopped)
+ if (_stopped)
{
- _stopped = true;
- _observer.OnError(error);
+ return;
}
+
+ _stopped = true;
+ _observer.OnError(error);
}
+ ///
+ /// Executes the OnNext operation.
+ ///
+ /// The value.
public void OnNext(TSource value)
{
if (_stopped)
diff --git a/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs b/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs
index 5f44475..83c3cde 100644
--- a/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs
@@ -13,7 +13,14 @@ namespace ReactiveUI.Primitives.Signals;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class ReadOnlyState : IObservable, IDisposable
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly StateSignal _inner;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly IDisposable _subscription;
///
@@ -42,10 +49,15 @@ public ReadOnlyState(IObservable source, T initialValue)
///
public IObservable Changed => _inner;
- ///
+ ///
+ /// Executes the Subscribe operation.
+ ///
+ /// The result.
public IDisposable Subscribe(IObserver observer) => _inner.Subscribe(observer);
- ///
+ ///
+ /// Executes the Dispose operation.
+ ///
public void Dispose()
{
_subscription.Dispose();
@@ -83,9 +95,12 @@ public static ReadOnlyState ToReadOnlyState(
throw new ArgumentNullException(nameof(selector));
}
- return new ReadOnlyState(ReactiveUI.Primitives.Signals.Signal.CreateSafe(observer => source.Subscribe(
- value => observer.OnNext(selector(value)),
- observer.OnError,
- observer.OnCompleted)), initialValue);
+ return new ReadOnlyState(
+ ReactiveUI.Primitives.Signals.Signal.CreateSafe(
+ observer => source.Subscribe(
+ value => observer.OnNext(selector(value)),
+ observer.OnError,
+ observer.OnCompleted)),
+ initialValue);
}
}
diff --git a/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs b/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs
index b806150..30e522f 100644
--- a/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs
@@ -15,18 +15,72 @@ namespace ReactiveUI.Primitives.Signals;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public class ReplaySignal : ISignal
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly int _bufferSize;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly TimeSpan _window;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly DateTimeOffset _startTime;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly ISequencer _scheduler;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private readonly bool _usesWindow;
+
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _observerLock = new();
+#pragma warning disable S3459 // Broadcaster is a mutable struct whose default value is the empty broadcaster.
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Broadcaster _broadcaster;
+#pragma warning restore S3459
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _isStopped;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Exception? _lastError;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Queue>? _queue;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private T[]? _ring;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private int _ringCount;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private int _ringNext;
///
@@ -64,7 +118,7 @@ public ReplaySignal(int bufferSize, TimeSpan window, ISequencer scheduler)
}
else
{
- _ring = bufferSize == 0 ? Array.Empty() : new T[bufferSize];
+ _ring = bufferSize == 0 ? [] : new T[bufferSize];
}
}
@@ -237,7 +291,6 @@ public void OnNext(T value)
_queue!.Enqueue(new TimeInterval(value, interval));
Trim();
}
-
}
_broadcaster.Next(value);
@@ -305,33 +358,43 @@ public IDisposable Subscribe(IObserver observer)
/// true to release both managed and unmanaged resources; false to release only unmanaged resources.
protected virtual void Dispose(bool disposing)
{
- if (!IsDisposed)
+ if (IsDisposed)
{
- if (disposing)
+ return;
+ }
+
+ if (disposing)
+ {
+ lock (_observerLock)
{
- lock (_observerLock)
- {
- _broadcaster.Clear();
- _lastError = null;
- _queue = null;
- _ring = null;
- _ringCount = 0;
- _ringNext = 0;
- }
+ _broadcaster.Clear();
+ _lastError = null;
+ _queue = null;
+ _ring = null;
+ _ringCount = 0;
+ _ringNext = 0;
}
-
- IsDisposed = true;
}
+
+ IsDisposed = true;
}
+ ///
+ /// Executes the ThrowIfDisposed operation.
+ ///
private void ThrowIfDisposed()
{
- if (IsDisposed)
+ if (!IsDisposed)
{
- throw new ObjectDisposedException(string.Empty);
+ return;
}
+
+ throw new ObjectDisposedException(string.Empty);
}
+ ///
+ /// Executes the Trim operation.
+ ///
private void Trim()
{
while (_queue!.Count > _bufferSize)
@@ -352,6 +415,10 @@ private void Trim()
}
}
+ ///
+ /// Executes the AppendToRing operation.
+ ///
+ /// The value.
private void AppendToRing(T value)
{
var ring = _ring!;
@@ -367,12 +434,18 @@ private void AppendToRing(T value)
_ringNext = 0;
}
- if (_ringCount < ring.Length)
+ if (_ringCount >= ring.Length)
{
- _ringCount++;
+ return;
}
+
+ _ringCount++;
}
+ ///
+ /// Executes the ReplayRing operation.
+ ///
+ /// The observer value.
private void ReplayRing(IObserver observer)
{
var ring = _ring!;
@@ -398,18 +471,41 @@ private void ReplayRing(IObserver observer)
}
}
- private class ObserverHandler : IDisposable
+ ///
+ /// Represents the ObserverHandler class.
+ ///
+ private sealed class ObserverHandler : IDisposable
{
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _lock = new();
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private ReplaySignal? _subject;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private IObserver? _observer;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The subject value.
+ /// The observer value.
public ObserverHandler(ReplaySignal subject, IObserver observer)
{
_subject = subject;
_observer = observer;
}
+ ///
+ /// Executes the Dispose operation.
+ ///
public void Dispose()
{
lock (_lock)
diff --git a/src/ReactiveUI.Primitives/Signal/Signal{T}.cs b/src/ReactiveUI.Primitives/Signal/Signal{T}.cs
index b1c75b4..d303ea5 100644
--- a/src/ReactiveUI.Primitives/Signal/Signal{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/Signal{T}.cs
@@ -14,17 +14,66 @@ namespace ReactiveUI.Primitives.Signals;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public class Signal : ISignal
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
+ private const int InitialSubscriptionCapacity = 4;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private static readonly Action NoopOnNext = static _ => { };
+
+ ///
+ /// Executes the ThrowDisposed operation.
+ ///
+ /// The result.
private static readonly Action ThrowDisposedOnNext = static _ => ThrowDisposed();
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly object _observerLock = new();
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Exception? _exception;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private SignalSubscription? _singleActionSubscription;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private SignalSubscription?[]? _subscriptions;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private int _subscriptionCount;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private int _subscriptionTail;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Action _onNext = NoopOnNext;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _isDisposed;
+
+ ///
+ /// Stores state for the signal implementation.
+ ///
private bool _isStopped;
///
@@ -97,10 +146,12 @@ public void OnError(Exception error)
}
Error(subscriptions, error);
- if (hasActionSubscribers)
+ if (!hasActionSubscribers)
{
- ExceptionDispatchInfo.Capture(error).Throw();
+ return;
}
+
+ ExceptionDispatchInfo.Capture(error).Throw();
}
///
@@ -158,6 +209,11 @@ public IDisposable Subscribe(IObserver observer)
return Disposable.Empty;
}
+ ///
+ /// Executes the SubscribeAction operation.
+ ///
+ /// The onNext value.
+ /// The result.
internal IDisposable SubscribeAction(Action onNext)
{
if (onNext == null)
@@ -210,23 +266,40 @@ internal IDisposable SubscribeAction(Action onNext)
/// true to release both managed and unmanaged resources; false to release only unmanaged resources.
protected virtual void Dispose(bool disposing)
{
- if (!IsDisposed)
+ if (IsDisposed)
{
- if (disposing)
- {
- lock (_observerLock)
- {
- ClearObserversLocked();
- _exception = null;
- _onNext = ThrowDisposedOnNext;
- _isDisposed = true;
- }
- }
+ return;
+ }
+
+ if (!disposing)
+ {
+ return;
+ }
+
+ SignalSubscription? singleActionSubscription;
+ SignalSubscription?[]? subscriptions;
+ lock (_observerLock)
+ {
+ singleActionSubscription = _singleActionSubscription;
+ subscriptions = ClearObserversLocked();
+ _exception = null;
+ _onNext = ThrowDisposedOnNext;
+ _isDisposed = true;
}
+
+ singleActionSubscription?.Dispose();
+ DisposeSubscriptions(subscriptions);
}
+ ///
+ /// Executes the ThrowDisposed operation.
+ ///
private static void ThrowDisposed() => throw new ObjectDisposedException(string.Empty);
+ ///
+ /// Executes the Completed operation.
+ ///
+ /// The subscriptions value.
private static void Completed(SignalSubscription?[]? subscriptions)
{
if (subscriptions == null)
@@ -240,7 +313,12 @@ private static void Completed(SignalSubscription?[]? subscriptions)
}
}
- private static void Error(SignalSubscription?[]? subscriptions, Exception error)
+ ///
+ /// Executes the Error operation.
+ ///
+ /// The subscriptions value.
+ /// The exception value.
+ private static void Error(SignalSubscription?[]? subscriptions, Exception exception)
{
if (subscriptions == null)
{
@@ -249,10 +327,15 @@ private static void Error(SignalSubscription?[]? subscriptions, Exception error)
for (var i = 0; i < subscriptions.Length; i++)
{
- subscriptions[i]?.Observer?.OnError(error);
+ subscriptions[i]?.Observer?.OnError(exception);
}
}
+ ///
+ /// Executes the HasActionSubscribers operation.
+ ///
+ /// The subscriptions value.
+ /// The result.
private static bool HasActionSubscribers(SignalSubscription?[]? subscriptions)
{
if (subscriptions == null)
@@ -271,20 +354,46 @@ private static bool HasActionSubscribers(SignalSubscription?[]? subscriptions)
return false;
}
+ ///
+ /// Executes the DisposeSubscriptions operation.
+ ///
+ /// The subscriptions value.
+ private static void DisposeSubscriptions(SignalSubscription?[]? subscriptions)
+ {
+ if (subscriptions == null)
+ {
+ return;
+ }
+
+ for (var i = 0; i < subscriptions.Length; i++)
+ {
+ subscriptions[i]?.Dispose();
+ }
+ }
+
+ ///
+ /// Executes the ThrowIfDisposed operation.
+ ///
private void ThrowIfDisposed()
{
- if (IsDisposed)
+ if (!IsDisposed)
{
- ThrowDisposed();
+ return;
}
+
+ ThrowDisposed();
}
+ ///
+ /// Executes the AddSubscriptionLocked operation.
+ ///
+ /// The subscription value.
private void AddSubscriptionLocked(SignalSubscription subscription)
{
var subscriptions = _subscriptions;
if (subscriptions == null)
{
- subscriptions = new SignalSubscription[4];
+ subscriptions = new SignalSubscription[InitialSubscriptionCapacity];
Volatile.Write(ref _subscriptions, subscriptions);
}
@@ -315,6 +424,10 @@ private void AddSubscriptionLocked(SignalSubscription subscription)
_subscriptionCount++;
}
+ ///
+ /// Executes the ClearObserversLocked operation.
+ ///
+ /// The result.
private SignalSubscription?[]? ClearObserversLocked()
{
_singleActionSubscription = null;
@@ -326,6 +439,9 @@ private void AddSubscriptionLocked(SignalSubscription subscription)
return subscriptions;
}
+ ///
+ /// Executes the PromoteSingleActionObserverLocked operation.
+ ///
private void PromoteSingleActionObserverLocked()
{
var single = _singleActionSubscription;
@@ -338,6 +454,10 @@ private void PromoteSingleActionObserverLocked()
AddSubscriptionLocked(single);
}
+ ///
+ /// Executes the Remove operation.
+ ///
+ /// The subscription value.
private void Remove(SignalSubscription subscription)
{
lock (_observerLock)
@@ -368,6 +488,10 @@ private void Remove(SignalSubscription subscription)
}
}
+ ///
+ /// Executes the DispatchSubscriptions operation.
+ ///
+ /// The value.
private void DispatchSubscriptions(T value)
{
var subscriptions = Volatile.Read(ref _subscriptions);
@@ -396,10 +520,21 @@ private void DispatchSubscriptions(T value)
}
}
+ ///
+ /// Represents the SignalSubscription class.
+ ///
private sealed class SignalSubscription : IDisposable
{
+ ///
+ /// Stores state for the signal implementation.
+ ///
private Signal? _subject;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The subject value.
+ /// The observer value.
public SignalSubscription(Signal subject, IObserver observer)
{
_subject = subject;
@@ -407,6 +542,11 @@ public SignalSubscription(Signal subject, IObserver observer)
Index = -1;
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The subject value.
+ /// The onNext value.
public SignalSubscription(Signal subject, Action onNext)
{
_subject = subject;
@@ -414,12 +554,24 @@ public SignalSubscription(Signal subject, Action onNext)
Index = -1;
}
+ ///
+ /// Gets or sets the value.
+ ///
public int Index { get; set; }
+ ///
+ /// Gets the value.
+ ///
public IObserver? Observer { get; }
+ ///
+ /// Gets the value.
+ ///
public Action? OnNext { get; }
+ ///
+ /// Executes the Dispose operation.
+ ///
public void Dispose()
{
var subject = Interlocked.Exchange(ref _subject, null);
diff --git a/src/ReactiveUI.Primitives/Signal/TaskSignal.cs b/src/ReactiveUI.Primitives/Signal/TaskSignal.cs
index c43ab9b..c29e4e0 100644
--- a/src/ReactiveUI.Primitives/Signal/TaskSignal.cs
+++ b/src/ReactiveUI.Primitives/Signal/TaskSignal.cs
@@ -12,6 +12,31 @@ namespace ReactiveUI.Primitives.Signals;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public static class TaskSignal
{
+ ///
+ /// Creates the specified source.
+ ///
+ /// The type of the result.
+ /// The observable factory.
+ ///
+ /// An AsyncObservable.
+ ///
+ /// observableFactory.
+ public static ITaskSignal Create(Func, IObservable> observableFactory) =>
+ Instance(observableFactory, null, null);
+
+ ///
+ /// Creates the specified source.
+ ///
+ /// The type of the result.
+ /// The observable factory.
+ /// The scheduler.
+ ///
+ /// An AsyncObservable.
+ ///
+ /// observableFactory.
+ public static ITaskSignal Create(Func, IObservable> observableFactory, ISequencer? scheduler) =>
+ Instance(observableFactory, scheduler, null);
+
///
/// Creates the specified source.
///
@@ -23,10 +48,21 @@ public static class TaskSignal
/// An AsyncObservable.
///
/// observableFactory.
- public static ITaskSignal Create(Func, IObservable> observableFactory, ISequencer? scheduler = null, CancellationTokenSource? cancellationTokenSource = null) =>
+ public static ITaskSignal Create(
+ Func, IObservable> observableFactory,
+ ISequencer? scheduler,
+ CancellationTokenSource? cancellationTokenSource) =>
Instance(observableFactory, scheduler, cancellationTokenSource);
- private static ITaskSignal Instance(Func, IObservable> observableFactory, ISequencer? scheduler, CancellationTokenSource? cancellationTokenSource)
+ ///
+ /// Executes the Instance operation.
+ ///
+ /// The TResult type.
+ /// The observableFactory value.
+ /// The scheduler value.
+ /// The cancellationTokenSource value.
+ /// The result.
+ private static TaskSignal Instance(Func, IObservable> observableFactory, ISequencer? scheduler, CancellationTokenSource? cancellationTokenSource)
{
if (observableFactory is null)
{
diff --git a/src/ReactiveUI.Primitives/Signal/TaskSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/TaskSignal{T}.cs
index 3315dd7..cb6f9c9 100644
--- a/src/ReactiveUI.Primitives/Signal/TaskSignal{T}.cs
+++ b/src/ReactiveUI.Primitives/Signal/TaskSignal{T}.cs
@@ -12,18 +12,26 @@ namespace ReactiveUI.Primitives.Signals;
///
/// The object that provides notification information.
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
-internal class TaskSignal : ITaskSignal
+internal sealed class TaskSignal : ITaskSignal
{
- private readonly ISequencer _scheduler;
+ ///
+ /// Stores state for the signal implementation.
+ ///
+ private readonly ISequencer _sequencer;
+
+ ///
+ /// Executes the new operation.
+ ///
+ /// The result.
private readonly MultipleDisposable? _cleanUp = new();
///
/// Initializes a new instance of the class.
///
/// The observable factory.
- /// The scheduler.
+ /// The sequencer.
/// The cancellation token source.
- public TaskSignal(Func, IObservable> observableFactory, ISequencer? scheduler = null, CancellationTokenSource? cancellationTokenSource = null)
+ public TaskSignal(Func, IObservable> observableFactory, ISequencer? sequencer = null, CancellationTokenSource? cancellationTokenSource = null)
{
if (observableFactory is null)
{
@@ -31,7 +39,7 @@ public TaskSignal(Func, IObservable> observableFactory, ISeque
}
CancellationTokenSource = cancellationTokenSource ?? new();
- _scheduler = scheduler ?? CurrentThreadSequencer.Instance;
+ _sequencer = sequencer ?? CurrentThreadSequencer.Instance;
Source = observableFactory(this);
}
@@ -77,7 +85,7 @@ public void GetOperationCanceled(IObserver observer) =>
/// The observer.
/// A Disposable.
public IDisposable Subscribe(IObserver observer) =>
- Source!.WitnessOn(_scheduler).Subscribe(observer).DisposeWith(_cleanUp!);
+ Source!.WitnessOn(_sequencer).Subscribe(observer).DisposeWith(_cleanUp!);
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
@@ -85,27 +93,29 @@ public IDisposable Subscribe(IObserver observer) =>
public void Dispose()
{
Dispose(true);
- GC.SuppressFinalize(this);
}
///
/// Releases unmanaged and - optionally - managed resources.
///
/// true to release both managed and unmanaged resources; false to release only unmanaged resources.
- protected virtual void Dispose(bool disposing)
+ private void Dispose(bool disposing)
{
- if (_cleanUp?.IsDisposed == false && disposing)
+ if (_cleanUp?.IsDisposed != false || !disposing)
{
- try
- {
- CancellationTokenSource?.Cancel();
- }
- catch (ObjectDisposedException)
- {
- }
-
- _cleanUp?.Dispose();
- CancellationTokenSource?.Dispose();
+ return;
}
+
+ try
+ {
+ CancellationTokenSource?.Cancel();
+ }
+ catch (ObjectDisposedException)
+ {
+ // The token source can be disposed by the task completion path.
+ }
+
+ _cleanUp?.Dispose();
+ CancellationTokenSource?.Dispose();
}
}
diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs
new file mode 100644
index 0000000..97e051a
--- /dev/null
+++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs
@@ -0,0 +1,687 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using ReactiveUI.Primitives.Disposables;
+
+namespace ReactiveUI.Primitives;
+
+///
+/// Coordinator helpers for multi-source signal operators.
+///
+public static partial class LinqMixins
+{
+ ///
+ /// Coordinates race subscriptions and forwards only the winning source.
+ ///
+ /// The source value type.
+ private sealed class RaceCoordinator : IDisposable
+ {
+ ///
+ /// The downstream observer.
+ ///
+ private readonly IObserver _observer;
+
+ ///
+ /// The active subscriptions.
+ ///
+ private readonly MultipleDisposable _subscriptions = new();
+
+ ///
+ /// The winning source index.
+ ///
+ private int _winner = -1;
+
+ ///
+ /// The next source index.
+ ///
+ private int _index;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The downstream observer.
+ internal RaceCoordinator(IObserver observer) => _observer = observer;
+
+ ///
+ /// Releases the active subscriptions.
+ ///
+ public void Dispose() => _subscriptions.Dispose();
+
+ ///
+ /// Starts observing the candidate source streams.
+ ///
+ /// The candidate source streams.
+ /// The coordinator that owns the subscription cleanup.
+ internal RaceCoordinator Run(IObservable> sources)
+ {
+ _subscriptions.Add(sources.Subscribe(OnSource, _observer.OnError, OnOuterCompleted));
+ return this;
+ }
+
+ ///
+ /// Forwards a value from a candidate source.
+ ///
+ /// The candidate source index.
+ /// The value to forward.
+ private void OnNext(int candidate, T value)
+ {
+ if (!Win(candidate))
+ {
+ return;
+ }
+
+ _observer.OnNext(value);
+ }
+
+ ///
+ /// Forwards an error from a candidate source.
+ ///
+ /// The candidate source index.
+ /// The error to forward.
+ private void OnError(int candidate, Exception error)
+ {
+ if (!Win(candidate))
+ {
+ return;
+ }
+
+ _observer.OnError(error);
+ }
+
+ ///
+ /// Forwards completion from a candidate source.
+ ///
+ /// The candidate source index.
+ private void OnCompleted(int candidate)
+ {
+ if (!Win(candidate))
+ {
+ return;
+ }
+
+ _observer.OnCompleted();
+ }
+
+ ///
+ /// Handles completion of the outer sequence.
+ ///
+ private void OnOuterCompleted()
+ {
+ // Race completion is controlled by the first inner source to win.
+ }
+
+ ///
+ /// Subscribes to a candidate source.
+ ///
+ /// The source to observe.
+ private void OnSource(IObservable source)
+ {
+ var current = Interlocked.Increment(ref _index) - 1;
+ _subscriptions.Add(source.Subscribe(
+ value => OnNext(current, value),
+ error => OnError(current, error),
+ () => OnCompleted(current)));
+ }
+
+ ///
+ /// Attempts to make a candidate source the winner.
+ ///
+ /// The candidate source index.
+ /// true when the candidate is the winning source; otherwise, false.
+ private bool Win(int candidate)
+ {
+ var current = Volatile.Read(ref _winner);
+ if (current == candidate)
+ {
+ return true;
+ }
+
+ if (current >= 0)
+ {
+ return false;
+ }
+
+ return Interlocked.CompareExchange(ref _winner, candidate, -1) == -1;
+ }
+ }
+
+ ///
+ /// Coordinates a two-source zip operation.
+ ///
+ /// The left value type.
+ /// The right value type.
+ /// The result value type.
+ private sealed class ZipCoordinator
+ {
+ ///
+ /// The synchronization gate.
+ ///
+ private readonly OperatorGate _gate = new();
+
+ ///
+ /// The downstream observer.
+ ///
+ private readonly IObserver _observer;
+
+ ///
+ /// The projection function.
+ ///
+ private readonly Func _selector;
+
+ ///
+ /// The queued left values.
+ ///
+ private readonly Queue _leftQueue = new();
+
+ ///
+ /// The queued right values.
+ ///
+ private readonly Queue _rightQueue = new();
+
+ ///
+ /// A value indicating whether the left source completed.
+ ///
+ private bool _leftCompleted;
+
+ ///
+ /// A value indicating whether the right source completed.
+ ///
+ private bool _rightCompleted;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The downstream observer.
+ /// The projection function.
+ internal ZipCoordinator(IObserver observer, Func selector)
+ {
+ _observer = observer;
+ _selector = selector;
+ }
+
+ ///
+ /// Subscribes to both zip sources.
+ ///
+ /// The left source.
+ /// The right source.
+ /// The subscription cleanup.
+ internal MultipleDisposable Run(IObservable left, IObservable right) =>
+ new(
+ left.Subscribe(OnLeftNext, _observer.OnError, OnLeftCompleted),
+ right.Subscribe(OnRightNext, _observer.OnError, OnRightCompleted));
+
+ ///
+ /// Queues a left value.
+ ///
+ /// The value to queue.
+ private void OnLeftNext(TLeft value)
+ {
+ lock (_gate.SyncRoot)
+ {
+ _leftQueue.Enqueue(value);
+ }
+
+ Drain();
+ }
+
+ ///
+ /// Queues a right value.
+ ///
+ /// The value to queue.
+ private void OnRightNext(TRight value)
+ {
+ lock (_gate.SyncRoot)
+ {
+ _rightQueue.Enqueue(value);
+ }
+
+ Drain();
+ }
+
+ ///
+ /// Marks the left source as complete.
+ ///
+ private void OnLeftCompleted()
+ {
+ lock (_gate.SyncRoot)
+ {
+ _leftCompleted = true;
+ }
+
+ Drain();
+ }
+
+ ///
+ /// Marks the right source as complete.
+ ///
+ private void OnRightCompleted()
+ {
+ lock (_gate.SyncRoot)
+ {
+ _rightCompleted = true;
+ }
+
+ Drain();
+ }
+
+ ///
+ /// Emits all currently available pairs.
+ ///
+ private void Drain()
+ {
+ while (TryTake(out var left, out var right))
+ {
+ _observer.OnNext(_selector(left, right));
+ }
+ }
+
+ ///
+ /// Attempts to remove the next available pair from the queues.
+ ///
+ /// The left value.
+ /// The right value.
+ /// true when a pair was available; otherwise, false.
+ private bool TryTake(out TLeft left, out TRight right)
+ {
+ lock (_gate.SyncRoot)
+ {
+ if (_leftQueue.Count != 0 && _rightQueue.Count != 0)
+ {
+ left = _leftQueue.Dequeue();
+ right = _rightQueue.Dequeue();
+ return true;
+ }
+
+ if ((_leftCompleted && _leftQueue.Count == 0) || (_rightCompleted && _rightQueue.Count == 0))
+ {
+ _observer.OnCompleted();
+ }
+
+ left = default!;
+ right = default!;
+ return false;
+ }
+ }
+ }
+
+ ///
+ /// Coordinates a two-source combine-latest operation.
+ ///
+ /// The left value type.
+ /// The right value type.
+ /// The result value type.
+ private sealed class CombineLatestCoordinator
+ {
+ ///
+ /// The synchronization gate.
+ ///
+ private readonly OperatorGate _gate = new();
+
+ ///
+ /// The downstream observer.
+ ///
+ private readonly IObserver _observer;
+
+ ///
+ /// The projection function.
+ ///
+ private readonly Func _selector;
+
+ ///
+ /// A value indicating whether the left source has produced a value.
+ ///
+ private bool _hasLeft;
+
+ ///
+ /// A value indicating whether the right source has produced a value.
+ ///
+ private bool _hasRight;
+
+ ///
+ /// A value indicating whether the left source completed.
+ ///
+ private bool _leftDone;
+
+ ///
+ /// A value indicating whether the right source completed.
+ ///
+ private bool _rightDone;
+
+ ///
+ /// The latest left value.
+ ///
+ private TLeft? _latestLeft;
+
+ ///
+ /// The latest right value.
+ ///
+ private TRight? _latestRight;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The downstream observer.
+ /// The projection function.
+ internal CombineLatestCoordinator(IObserver observer, Func selector)
+ {
+ _observer = observer;
+ _selector = selector;
+ }
+
+ ///
+ /// Subscribes to both combine-latest sources.
+ ///
+ /// The left source.
+ /// The right source.
+ /// The subscription cleanup.
+ internal MultipleDisposable Run(IObservable left, IObservable right) =>
+ new(
+ left.Subscribe(OnLeftNext, _observer.OnError, OnLeftCompleted),
+ right.Subscribe(OnRightNext, _observer.OnError, OnRightCompleted));
+
+ ///
+ /// Handles a left value.
+ ///
+ /// The left value.
+ private void OnLeftNext(TLeft value)
+ {
+ if (!TryUpdateLeft(value, out var projected))
+ {
+ return;
+ }
+
+ _observer.OnNext(projected);
+ }
+
+ ///
+ /// Handles a right value.
+ ///
+ /// The right value.
+ private void OnRightNext(TRight value)
+ {
+ if (!TryUpdateRight(value, out var projected))
+ {
+ return;
+ }
+
+ _observer.OnNext(projected);
+ }
+
+ ///
+ /// Marks the left source as complete.
+ ///
+ private void OnLeftCompleted()
+ {
+ if (!CompleteLeft())
+ {
+ return;
+ }
+
+ _observer.OnCompleted();
+ }
+
+ ///
+ /// Marks the right source as complete.
+ ///
+ private void OnRightCompleted()
+ {
+ if (!CompleteRight())
+ {
+ return;
+ }
+
+ _observer.OnCompleted();
+ }
+
+ ///
+ /// Updates the latest left value.
+ ///
+ /// The new value.
+ /// The projected result.
+ /// true when a result is available; otherwise, false.
+ private bool TryUpdateLeft(TLeft value, out TResult result)
+ {
+ lock (_gate.SyncRoot)
+ {
+ _latestLeft = value;
+ _hasLeft = true;
+ return TryProject(out result);
+ }
+ }
+
+ ///
+ /// Updates the latest right value.
+ ///
+ /// The new value.
+ /// The projected result.
+ /// true when a result is available; otherwise, false.
+ private bool TryUpdateRight(TRight value, out TResult result)
+ {
+ lock (_gate.SyncRoot)
+ {
+ _latestRight = value;
+ _hasRight = true;
+ return TryProject(out result);
+ }
+ }
+
+ ///
+ /// Marks the left source as complete.
+ ///
+ /// true when both sources are complete; otherwise, false.
+ private bool CompleteLeft()
+ {
+ lock (_gate.SyncRoot)
+ {
+ _leftDone = true;
+ return _rightDone;
+ }
+ }
+
+ ///
+ /// Marks the right source as complete.
+ ///
+ /// true when both sources are complete; otherwise, false.
+ private bool CompleteRight()
+ {
+ lock (_gate.SyncRoot)
+ {
+ _rightDone = true;
+ return _leftDone;
+ }
+ }
+
+ ///
+ /// Projects the current latest values.
+ ///
+ /// The projected value.
+ /// true when both sources have values; otherwise, false.
+ private bool TryProject(out TResult result)
+ {
+ if (!_hasLeft || !_hasRight)
+ {
+ result = default!;
+ return false;
+ }
+
+ result = _selector(_latestLeft!, _latestRight!);
+ return true;
+ }
+ }
+
+ ///
+ /// Coordinates a switch operation.
+ ///
+ /// The source value type.
+ private sealed class SwitchCoordinator : IDisposable
+ {
+ ///
+ /// The synchronization gate.
+ ///
+ private readonly OperatorGate _gate = new();
+
+ ///
+ /// The downstream observer.
+ ///
+ private readonly IObserver _observer;
+
+ ///
+ /// The active subscriptions.
+ ///
+ private readonly MultipleDisposable _subscriptions = new();
+
+ ///
+ /// The active inner subscription.
+ ///
+ private readonly SingleReplaceableDisposable _innerSlot = new();
+
+ ///
+ /// A value indicating whether the outer source completed.
+ ///
+ private bool _outerCompleted;
+
+ ///
+ /// A value indicating whether an inner source is active.
+ ///
+ private bool _innerActive;
+
+ ///
+ /// The current inner source version.
+ ///
+ private int _version;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The downstream observer.
+ internal SwitchCoordinator(IObserver observer) => _observer = observer;
+
+ ///
+ /// Releases the active subscriptions.
+ ///
+ public void Dispose()
+ {
+ _innerSlot.Dispose();
+ _subscriptions.Dispose();
+ }
+
+ ///
+ /// Subscribes to the outer source.
+ ///
+ /// The outer source.
+ /// The coordinator that owns the subscription cleanup.
+ internal SwitchCoordinator Run(IObservable> sources)
+ {
+ _subscriptions.Add(_innerSlot);
+ _subscriptions.Add(sources.Subscribe(OnSource, _observer.OnError, OnOuterCompleted));
+ return this;
+ }
+
+ ///
+ /// Switches to a new inner source.
+ ///
+ /// The new inner source.
+ private void OnSource(IObservable source)
+ {
+ int current;
+ lock (_gate.SyncRoot)
+ {
+ current = ++_version;
+ _innerActive = true;
+ }
+
+ _innerSlot.Create(source.Subscribe(
+ value => OnNext(current, value),
+ error => OnError(current, error),
+ () => OnCompleted(current)));
+ }
+
+ ///
+ /// Marks the outer source as complete.
+ ///
+ private void OnOuterCompleted()
+ {
+ lock (_gate.SyncRoot)
+ {
+ _outerCompleted = true;
+ }
+
+ TryComplete();
+ }
+
+ ///
+ /// Forwards an inner value when it belongs to the current source.
+ ///
+ /// The inner version.
+ /// The value to forward.
+ private void OnNext(int version, T value)
+ {
+ if (!IsCurrent(version))
+ {
+ return;
+ }
+
+ _observer.OnNext(value);
+ }
+
+ ///
+ /// Forwards an inner error when it belongs to the current source.
+ ///
+ /// The inner version.
+ /// The error to forward.
+ private void OnError(int version, Exception error)
+ {
+ if (!IsCurrent(version))
+ {
+ return;
+ }
+
+ _observer.OnError(error);
+ }
+
+ ///
+ /// Completes an inner source when it belongs to the current source.
+ ///
+ /// The inner version.
+ private void OnCompleted(int version)
+ {
+ lock (_gate.SyncRoot)
+ {
+ if (version == _version)
+ {
+ _innerActive = false;
+ }
+ }
+
+ TryComplete();
+ }
+
+ ///
+ /// Determines whether a version is the current inner source.
+ ///
+ /// The candidate version.
+ /// true if the version is current; otherwise, false.
+ private bool IsCurrent(int version)
+ {
+ lock (_gate.SyncRoot)
+ {
+ return version == _version;
+ }
+ }
+
+ ///
+ /// Completes the observer when both outer and inner sources are complete.
+ ///
+ private void TryComplete()
+ {
+ lock (_gate.SyncRoot)
+ {
+ if (_outerCompleted && !_innerActive)
+ {
+ _observer.OnCompleted();
+ }
+ }
+ }
+ }
+}
diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs
index 2dac042..1af73ae 100644
--- a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs
+++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs
@@ -2,7 +2,6 @@
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
-using System.Collections.Concurrent;
using ReactiveUI.Primitives.Concurrency;
using ReactiveUI.Primitives.Core;
using ReactiveUI.Primitives.Disposables;
@@ -94,10 +93,12 @@ public static IObservable KeepNotNull(this IObservable source)
return Signal.CreateSafe(observer => source.Subscribe(
value =>
{
- if (value != null)
+ if (value == null)
{
- observer.OnNext(value);
+ return;
}
+
+ observer.OnNext(value);
},
observer.OnError,
observer.OnCompleted));
@@ -106,6 +107,10 @@ public static IObservable KeepNotNull(this IObservable source)
///
/// Projects only values assignable to .
///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Major Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "LINQ-style OfType requires the caller to provide the result type.")]
public static IObservable OfType(this IObservable