diff --git a/README.md b/README.md index 5d29be0..d4378d9 100644 --- a/README.md +++ b/README.md @@ -163,8 +163,9 @@ Creation APIs live on `ReactiveUI.Primitives.Signals.Signal`. | `Signal.Throw(Exception)` | Terminate with an error. | | `Signal.Range(int start, int count)` | Emit an integer range and complete. | | `Signal.Repeat(T value)` / `Repeat(T value, int count)` | Repeat indefinitely or a fixed number of times. | -| `Signal.Unfold(...)` | Generate a finite sequence from state. | +| `Signal.Unfold(...)` / `Signal.Generate(...)` | Generate a finite sequence from state. | | `Signal.Use(...)` | Tie a resource lifetime to a subscription. | +| `Signal.FromEventPattern(...)` | Convert .NET events to `EventPattern` values. | | `Signal.FromEnumerable(IEnumerable)` | Convert an enumerable. | | `Signal.FromEnumerable(IEnumerable, CancellationToken)` | Convert an enumerable and stop synchronous enumeration when cancelled. | | `Signal.FromAsyncEnumerable(IAsyncEnumerable, CancellationToken)` | Convert an async enumerable on modern TFMs. | @@ -300,12 +301,13 @@ height.Value = 600; | quiet-period sampling | `Throttle` | | periodic sampling | `Sample` | | timeout | `Timeout` | +| schedule subscription | `SubscribeOn` | | timestamp values | `Timestamp` | | measure intervals | `TimeInterval` | | fixed-size buffers | `Buffer(count)`, `Buffer(count, skip)` | -| collect to list/array signal | `CollectList`, `CollectArray` | -| collect asynchronously | `CollectListAsync`, `CollectArrayAsync` | -| first value task | `FirstAsync`, `FirstOrDefaultAsync` | +| collect to list/array signal | `CollectList`, `CollectArray`, `ToList`, `ToArray` | +| collect asynchronously | `CollectListAsync`, `CollectArrayAsync`, `ToListAsync`, `ToArrayAsync` | +| first/last value task | `FirstAsync`, `FirstOrDefaultAsync`, `LastAsync`, `LastOrDefaultAsync` | Timer example: @@ -396,7 +398,7 @@ using var subscription = failed.Subscribe( ## Sequencers -Sequencers live in `ReactiveUI.Primitives.Concurrency` and implement `ISequencer`. +Sequencers live in `ReactiveUI.Primitives.Concurrency` and implement `ISequencer`. The core `ReactiveUI.Primitives` package does not reference WPF or Windows Forms; UI-thread sequencers are provided by the optional `ReactiveUI.Primitives.Wpf` and `ReactiveUI.Primitives.WinForms` packages. | Sequencer | Purpose | |---|---| @@ -404,7 +406,9 @@ Sequencers live in `ReactiveUI.Primitives.Concurrency` and implement `ISequencer | `Sequencer.CurrentThread` / `CurrentThreadSequencer.Instance` | Queue recursive/current-thread work deterministically. | | `ThreadPoolSequencer.Instance` | Schedule work through the thread pool. | | `TaskPoolSequencer.Instance` | Schedule work through tasks. | -| `DispatcherSequencer` | Schedule onto a WPF dispatcher on Windows TFMs. | +| `SynchronizationContextSequencer` | Schedule through a `SynchronizationContext`. | +| `DispatcherSequencer` | Schedule onto a WPF dispatcher from `ReactiveUI.Primitives.Wpf`. | +| `ControlSequencer` | Schedule onto a Windows Forms control from `ReactiveUI.Primitives.WinForms`. | | `VirtualClock` / `TestClock` | Virtual-time scheduling for deterministic tests. | Scheduling APIs include absolute, relative, recursive, and action-based overloads: @@ -548,8 +552,9 @@ ReactiveUI.Primitives is not a byte-for-byte clone of System.Reactive. It keeps | `DelaySubscription` | `DelayStart` | Delay source subscription. | | `Timeout` | `Timeout` | Error on missing value before due time. | | `Buffer(count)` | `Buffer(count)` | Fixed-size buffers. | -| `ToList` / `ToArray` | `CollectList` / `CollectArray` | Signal results. | -| `FirstAsync` | `FirstAsync` | Task result. | +| `SubscribeOn` | `SubscribeOn` | Schedule source subscription. | +| `ToList` / `ToArray` | `ToList` / `ToArray` or `CollectList` / `CollectArray` | Signal results. | +| `FirstAsync` / `LastAsync` | `FirstAsync` / `LastAsync` | Task result. | | `CountAsync` / `AnyAsync` | `CountAsync` / `AnyAsync` | Task-shaped terminal helpers, including cancellation overloads. | ### Disposable mapping @@ -573,7 +578,9 @@ ReactiveUI.Primitives is not a byte-for-byte clone of System.Reactive. It keeps | `CurrentThreadSequencer.Instance` | `Sequencer.CurrentThread` or `CurrentThreadSequencer.Instance` | | `ThreadPoolSequencer.Instance` | `ThreadPoolSequencer.Instance` | | task-pool scheduling | `TaskPoolSequencer.Instance` | -| dispatcher scheduling | `DispatcherSequencer` | +| synchronization-context scheduling | `SynchronizationContextSequencer` | +| WPF dispatcher scheduling | `DispatcherSequencer` from `ReactiveUI.Primitives.Wpf` | +| Windows Forms control scheduling | `ControlSequencer` from `ReactiveUI.Primitives.WinForms` | | `TestScheduler` / virtual time | `VirtualClock` or `TestClock` | ### Testing migration @@ -603,41 +610,81 @@ Use the generated bridge only at boundaries. Prefer native ReactiveUI.Primitives Benchmarks live in `src/benchmarks/ReactiveUI.Primitives.Benchmarks`. The benchmark project may reference System.Reactive and R3 to compare throughput and allocation behavior; the production package must not. -The latest joined BenchmarkDotNet ShortRun was captured on 2026-05-25 with .NET SDK 10.0.300 on Windows 11, using: +Full BenchmarkDotNet runs were captured on 2026-05-27 with .NET SDK 10.0.300 on Windows 11: ```powershell -dotnet run --project src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveUI.Primitives.Benchmarks.csproj --configuration Release --no-build -- -f '*' -j Short --join +dotnet run --project src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveUI.Primitives.Benchmarks.csproj --configuration Release --no-build -- --filter "*" --join --launchCount 1 --warmupCount 1 --iterationCount 3 ``` -Raw artifacts for the joined run are under `BenchmarkDotNet.Artifacts/results/BenchmarkRun-joined-2026-05-25-21-12-14-report.*`. The focused `FromEnumerable` row was captured in `src/BenchmarkDotNet.Artifacts/results/ReactiveUI.Primitives.Benchmarks.FactoryFromEnumerableBenchmarks-report.*` after the dedicated inline fast path was added. ShortRun is useful for fast regression checks; rerun with a longer BenchmarkDotNet job before making release claims. +Short local jobs are useful for fast regression checks; rerun with a longer BenchmarkDotNet job before making release claims. Current local test coverage after this pass is 82.80% line coverage and 75.50% branch coverage from `coverage-local.cobertura.xml`; the 100% coverage target remains an active work item. + +`N/A` means the current benchmark harness does not contain a matching measured row for that library. | Scenario | ReactiveUI.Primitives | System.Reactive | R3 | |---|---:|---:|---:| -| Completed task bridge | 17.6833 ns / 88 B | 1,348.2890 ns / 793 B | n/a | -| Pocket / composite dispose | 90.8799 ns / 408 B | 138.6110 ns / 512 B | n/a | -| Current-thread schedule | 22.8205 ns / 88 B | 28.3162 ns / 88 B | n/a | -| Safe witness wrapper | 40.2300 ns / 168 B | n/a | n/a | -| Completed spark | 0.3007 ns / 0 B | n/a | n/a | -| Return subscribe | 0.4417 ns / 0 B | 91.5187 ns / 120 B | 49.3844 ns / 72 B | -| Empty subscribe | 7.3897 ns / 40 B | 79.6293 ns / 96 B | 43.8897 ns / 48 B | -| Range subscribe | 55.9990 ns / 96 B | 4,153.4012 ns / 2,472 B | 119.9919 ns / 72 B | -| Repeat subscribe | 10.3262 ns / 0 B | 3,951.5395 ns / 2,408 B | 116.7110 ns / 72 B | -| FromEnumerable subscribe | 48.9910 ns / 40 B | 3,740.3600 ns / 2,504 B | 131.3610 ns / 80 B | -| Throw subscribe | 100.3490 ns / 120 B | 190.9367 ns / 240 B | 158.5640 ns / 192 B | -| Map + Keep | 213.9322 ns / 208 B | 4,463.8969 ns / 2,616 B | 423.8154 ns / 264 B | -| DistinctBy + Count + Any | 427.3704 ns / 992 B | 8,842.7094 ns / 5,896 B | 932.2863 ns / 1,280 B | -| StartWith + Append + DefaultIfEmpty | 79.0351 ns / 184 B | 1,511.0960 ns / 1,257 B | 226.6506 ns / 280 B | -| SelectMany over ranges | 1,174.3683 ns / 712 B | 5,989.3754 ns / 3,872 B | 1,530.4454 ns / 1,032 B | -| Zip over ranges | 1,920.5231 ns / 1,320 B | 5,434.1159 ns / 2,976 B | 1,103.3186 ns / 648 B | -| Replay subscribe | 491.2126 ns / 320 B | 944.9225 ns / 696 B | n/a | -| Behaviour signal, 32 values | 717.1898 ns / 176 B | 735.4731 ns / 200 B | 831.3793 ns / 184 B | -| Behaviour signal, 1024 values | 19,587.6333 ns / 176 B | 18,925.1658 ns / 200 B | 21,464.7502 ns / 184 B | -| Signal subscribe/dispose, 8 subscribers | 415.4351 ns / 1,176 B | 506.4101 ns / 1,288 B | 719.0130 ns / 840 B | -| Signal subscribe/dispose, 64 subscribers | 4,503.8029 ns / 8,864 B | 8,526.7609 ns / 38,472 B | 5,480.4075 ns / 6,216 B | -| Signal emit, 32 values | 108.2371 ns / 160 B | 122.6897 ns / 136 B | 213.9175 ns / 152 B | -| Signal emit, 1024 values | 2,130.8298 ns / 160 B | 1,994.6875 ns / 136 B | 3,677.6208 ns / 152 B | - -Current benchmark coverage is intentionally visible rather than overstated. The next benchmark expansion areas are factory/adapters (`Never`, `Create`, `Defer`, `FromEnumerable`, `FromAsyncEnumerable`, `Start`, `Unfold`, `Use`), time/scheduler operators (`Delay`, `DelayStart`, `Throttle`, `Sample`, `Timestamp`, `TimeInterval`, `Timeout`, `ObserveOn`), higher-order combinators (`Concat`, `Merge`, `Race`, `Switch`, `CombineLatest`, `WithLatest`, `ForkJoin`), terminal/collection APIs, connectable/share APIs, and state/task command surfaces. +| Return subscribe | 0.2089 ns / 0 B | 45.7505 ns / 120 B | 28.1108 ns / 72 B | +| Empty subscribe | 2.6805 ns / 40 B | 40.1536 ns / 96 B | 25.8399 ns / 48 B | +| Range subscribe | 46.0466 ns / 96 B | 2,452.1549 ns / 2,472 B | 63.6342 ns / 72 B | +| Repeat subscribe | 6.7455 ns / 0 B | 2,275.7650 ns / 2,408 B | 64.8692 ns / 72 B | +| Throw subscribe | 54.9685 ns / 120 B | 106.8594 ns / 240 B | 85.8239 ns / 192 B | +| FromEnumerable subscribe | 49.3764 ns / 40 B | 2,171.2470 ns / 2,504 B | 70.3934 ns / 80 B | +| Completed task bridge | 8.7892 ns / 88 B | 769.1087 ns / 793 B | N/A | +| Create subscribe | 43.7376 ns / 248 B | N/A | N/A | +| CreateSafe subscribe | 44.1792 ns / 248 B | N/A | N/A | +| Defer subscribe | 66.0839 ns / 240 B | N/A | N/A | +| Start subscribe | 51.7062 ns / 376 B | N/A | N/A | +| Unfold subscribe | 167.5562 ns / 736 B | N/A | N/A | +| Use subscribe | 67.7116 ns / 432 B | N/A | N/A | +| FromAsyncEnumerable subscribe | 1,921.1208 ns / 2,052 B | N/A | N/A | +| Never subscribe/dispose | 0.2163 ns / 0 B | N/A | N/A | +| Map + Keep over range | 130.0149 ns / 208 B | 2,461.1312 ns / 2,616 B | 256.9470 ns / 264 B | +| Aggregate + Any + Count | 229.9964 ns / 992 B | 5,073.0502 ns / 5,896 B | 529.5892 ns / 1,280 B | +| StartWith + Append + DefaultIfEmpty | 45.2433 ns / 184 B | 869.3671 ns / 1,257 B | 128.0685 ns / 280 B | +| SelectMany over ranges | 939.2041 ns / 712 B | 3,357.9581 ns / 3,872 B | 965.1100 ns / 1,032 B | +| Zip over ranges | 38.8399 ns / 232 B | 2,903.8925 ns / 2,976 B | 658.2362 ns / 648 B | +| Concat ranges | 67.0788 ns / 256 B | N/A | N/A | +| Merge ranges | 66.9464 ns / 256 B | N/A | N/A | +| Race ranges | 37.7646 ns / 192 B | N/A | N/A | +| Switch ranges | 797.3144 ns / 1,376 B | N/A | N/A | +| CombineLatest ranges | 95.0119 ns / 504 B | N/A | N/A | +| WithLatest ranges | 104.8306 ns / 504 B | N/A | N/A | +| ForkJoin ranges | 67.7277 ns / 480 B | N/A | N/A | +| Delay range | 3,132.9781 ns / 38,816 B | N/A | N/A | +| DelayStart range | 874.3075 ns / 25,520 B | N/A | N/A | +| Throttle burst | 2,504.3725 ns / 38,384 B | N/A | N/A | +| Sample latest | 1,045.5780 ns / 26,072 B | N/A | N/A | +| Timestamp range | 394.0507 ns / 312 B | N/A | N/A | +| TimeInterval range | 478.5383 ns / 736 B | N/A | N/A | +| Timeout never | 976.3098 ns / 25,816 B | N/A | N/A | +| ObserveOn immediate | 21.8563 ns / 96 B | N/A | N/A | +| Replay subscribe | 324.2733 ns / 320 B | 665.7033 ns / 696 B | N/A | +| BehaviorSignal 32 values | 554.5077 ns / 176 B | 581.5846 ns / 200 B | 594.3121 ns / 184 B | +| BehaviorSignal 1024 values | 15,698.1415 ns / 176 B | 15,826.6246 ns / 200 B | 15,702.7802 ns / 184 B | +| Signal emit, 32 values | 65.8803 ns / 136 B | 90.0502 ns / 136 B | 116.1177 ns / 152 B | +| Signal emit, 1024 values | 1,650.9938 ns / 136 B | 1,676.8777 ns / 136 B | 1,984.4349 ns / 152 B | +| Signal subscribe/dispose, 8 observers | 240.6380 ns / 592 B | 284.9100 ns / 1,288 B | 450.5067 ns / 840 B | +| Signal subscribe/dispose, 64 observers | 2,599.1562 ns / 3,800 B | 3,600.1331 ns / 38,472 B | 3,401.7292 ns / 6,216 B | +| Publish live connect | 125.5809 ns / 384 B | N/A | N/A | +| Share live subscribe | 225.6140 ns / 848 B | N/A | N/A | +| Replay live late subscribe | 595.9243 ns / 568 B | N/A | N/A | +| RefCount subscribe | 222.4420 ns / 848 B | N/A | N/A | +| AutoConnect subscribe | 167.9847 ns / 728 B | N/A | N/A | +| StateSignal updates | 552.4185 ns / 176 B | N/A | N/A | +| ReadOnlyState projection | 123.5420 ns / 248 B | N/A | N/A | +| TaskSignal subscribe | 2,384.2121 ns / 3,875 B | N/A | N/A | +| Command execute | 114.0456 ns / 600 B | N/A | N/A | +| Command result subscribe | 137.9245 ns / 672 B | N/A | N/A | +| CollectList range | 115.9544 ns / 688 B | N/A | N/A | +| CollectArray range | 83.1385 ns / 656 B | N/A | N/A | +| CollectArrayAsync range | 33.8094 ns / 384 B | N/A | N/A | +| FirstAsync range | 5.9320 ns / 56 B | N/A | N/A | +| ToTask range | 13.9715 ns / 192 B | N/A | N/A | +| Count(predicate) range | 55.0441 ns / 144 B | N/A | N/A | +| All + Contains range | 204.1768 ns / 1,024 B | N/A | N/A | +| Pocket dispose | 60.2873 ns / 408 B | 93.1830 ns / 512 B | N/A | +| CurrentThread schedule | 12.4912 ns / 88 B | 14.7694 ns / 88 B | N/A | +| Safe witness | 21.7079 ns / 168 B | N/A | N/A | +| Completed Spark | 0.0006 ns / 0 B | N/A | N/A | Performance constraints used by the project: @@ -652,6 +699,8 @@ Performance constraints used by the project: | Path | Purpose | |---|---| | `src/ReactiveUI.Primitives` | Production runtime library. | +| `src/ReactiveUI.Primitives.Wpf` | Optional WPF dispatcher integration library. | +| `src/ReactiveUI.Primitives.WinForms` | Optional Windows Forms control integration library. | | `src/ReactiveUI.Primitives.SystemReactiveBridge.Generator` | Source generator for System.Reactive bridge adapters. | | `src/ReactiveUI.Primitives.R3Bridge.Generator` | Source generator for R3 bridge adapters. | | `src/ReactiveUI.Primitives.Tests` | Test project using Microsoft Testing Platform/TUnit-style validation. | @@ -665,6 +714,7 @@ For NuGet package verification, inspect the generated `.nupkg` and confirm: - The nuspec contains `README.md`. - Bridge generator DLLs are present under `analyzers/dotnet/cs`. - Production runtime dependencies do not include System.Reactive or R3. +- The core `ReactiveUI.Primitives` package does not reference WPF or Windows Forms assemblies; those integrations ship from `ReactiveUI.Primitives.Wpf` and `ReactiveUI.Primitives.WinForms`. ## Practical migration checklist diff --git a/src/Directory.Build.props b/src/Directory.Build.props index b205410..7493ad3 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -55,6 +55,14 @@ $(NetCoreTargetFrameworks);$(NetFrameworkTargetFrameworks) + + net8.0-windows;net9.0-windows;net10.0-windows + $(WindowsNetCoreTargetFrameworks);$(NetFrameworkTargetFrameworks) + + + $(NetCoreTargetFrameworks) + net9.0;net10.0 + $(NetCoreTargetFrameworks) diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 0389a14..195198c 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -20,7 +20,12 @@ + + + + + diff --git a/src/ReactiveUI.Primitives.Blazor/Components/ReactiveComponentBase.cs b/src/ReactiveUI.Primitives.Blazor/Components/ReactiveComponentBase.cs new file mode 100644 index 0000000..ea23b19 --- /dev/null +++ b/src/ReactiveUI.Primitives.Blazor/Components/ReactiveComponentBase.cs @@ -0,0 +1,213 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using Microsoft.AspNetCore.Components; +using ReactiveUI.Primitives.Blazor.Concurrency; +using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Disposables; + +namespace ReactiveUI.Primitives.Blazor.Components; + +/// +/// Base component that tracks reactive subscriptions and refreshes through Blazor's renderer dispatcher. +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public abstract class ReactiveComponentBase : ComponentBase, IDisposable +{ + /// + /// Tracks subscriptions owned by the component. + /// + private readonly MultipleDisposable _subscriptions = new(); + + /// + /// Value indicating whether the component has been disposed. + /// + private bool _disposed; + + /// + /// Initializes a new instance of the class. + /// + protected ReactiveComponentBase() => + RendererSequencer = new BlazorRendererSequencer(InvokeAsync); + + /// + /// Gets a value indicating whether the component has been disposed. + /// + protected bool IsDisposed => _disposed; + + /// + /// Gets a sequencer that schedules work through the Blazor renderer dispatcher. + /// + protected ISequencer RendererSequencer { get; } + + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => $"IsDisposed = {IsDisposed}"; + + /// + /// Disposes the component and all tracked subscriptions. + /// + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + /// Tracks a subscription so it is disposed when the component is disposed. + /// + /// The subscription to track. + /// The supplied subscription, or when the component has already been disposed. + /// is . + protected IDisposable Track(IDisposable subscription) + { + if (subscription == null) + { + throw new ArgumentNullException(nameof(subscription)); + } + + if (IsDisposed) + { + subscription.Dispose(); + return Disposable.Empty; + } + + _subscriptions.Add(subscription); + return subscription; + } + + /// + /// Subscribes to a source and refreshes the component after each value. + /// + /// The source value type. + /// The source sequence. + /// Action invoked for each value on the Blazor renderer dispatcher. + /// A tracked subscription. + /// or is . + protected IDisposable Observe(IObservable source, Action onNext) => + Observe(source, onNext, null, null); + + /// + /// Subscribes to a source and refreshes the component after each observed signal. + /// + /// The source value type. + /// The source sequence. + /// Action invoked for each value on the Blazor renderer dispatcher. + /// Optional action invoked when the source errors. + /// Optional action invoked when the source completes. + /// A tracked subscription. + /// or is . + protected IDisposable Observe( + IObservable source, + Action onNext, + Action? onError, + Action? onCompleted) => + Observe(source, onNext, onError, onCompleted, true); + + /// + /// Subscribes to a source and refreshes the component after each observed signal. + /// + /// The source value type. + /// The source sequence. + /// Action invoked for each value on the Blazor renderer dispatcher. + /// Optional action invoked when the source errors. + /// Optional action invoked when the source completes. + /// A value indicating whether to call after callbacks. + /// A tracked subscription. + /// or is . + protected IDisposable Observe( + IObservable source, + Action onNext, + Action? onError, + Action? onCompleted, + bool refreshAfterCallbacks) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (onNext == null) + { + throw new ArgumentNullException(nameof(onNext)); + } + + return Track(source.Subscribe( + value => _ = InvokeAsync(() => + { + onNext(value); + Refresh(refreshAfterCallbacks); + }), + error => _ = InvokeAsync(() => + { + if (onError == null) + { + OnObservedError(error); + } + else + { + onError(error); + } + + Refresh(refreshAfterCallbacks); + }), + () => _ = InvokeAsync(() => + { + onCompleted?.Invoke(); + Refresh(refreshAfterCallbacks); + }))); + } + + /// + /// Invalidates the component through Blazor's renderer dispatcher. + /// + /// A task that completes when the renderer has accepted the invalidation callback. + protected Task InvalidateAsync() => InvokeAsync(StateHasChanged); + + /// + /// Handles an unhandled subscription error. + /// + /// The observed error. + /// Always thrown to surface the subscription error. + protected virtual void OnObservedError(Exception error) + { + if (error == null) + { + throw new ArgumentNullException(nameof(error)); + } + + throw new InvalidOperationException("The reactive subscription failed.", error); + } + + /// + /// Releases resources used by the component. + /// + /// when managed resources should be released. + protected virtual void Dispose(bool disposing) + { + if (!disposing || _disposed) + { + return; + } + + _disposed = true; + _subscriptions.Dispose(); + } + + /// + /// Refreshes the component when requested and when it is still active. + /// + /// A value indicating whether refresh is requested. + private void Refresh(bool shouldRefresh) + { + if (!shouldRefresh || IsDisposed) + { + return; + } + + StateHasChanged(); + } +} diff --git a/src/ReactiveUI.Primitives.Blazor/Concurrency/BlazorRendererSequencer.cs b/src/ReactiveUI.Primitives.Blazor/Concurrency/BlazorRendererSequencer.cs new file mode 100644 index 0000000..5f2324c --- /dev/null +++ b/src/ReactiveUI.Primitives.Blazor/Concurrency/BlazorRendererSequencer.cs @@ -0,0 +1,139 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Disposables; + +namespace ReactiveUI.Primitives.Blazor.Concurrency; + +/// +/// Sequencer that schedules work through a Blazor renderer dispatcher delegate. +/// +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed class BlazorRendererSequencer : ISequencer +{ + /// + /// Delegate used to marshal work through Blazor's renderer. + /// + private readonly Func _invokeAsync; + + /// + /// Initializes a new instance of the class. + /// + /// A delegate such as ComponentBase.InvokeAsync that runs work through the renderer. + /// is . + public BlazorRendererSequencer(Func invokeAsync) => + _invokeAsync = invokeAsync ?? throw new ArgumentNullException(nameof(invokeAsync)); + + /// + /// Gets the scheduler's notion of current time. + /// + public DateTimeOffset Now => TimeProvider.System.GetUtcNow(); + + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + + /// + /// Schedules an action to be executed. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + /// is . + public IDisposable Schedule(TState state, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var cancelable = new BooleanDisposable(); + _ = _invokeAsync(() => + { + if (cancelable.IsDisposed) + { + return; + } + + action(this, state); + }); + return cancelable; + } + + /// + /// Schedules an action to be executed after dueTime. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Relative time after which to execute the action. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + /// is . + public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var cancellation = new CancellationDisposable(); + _ = DelayThenDispatchAsync(state, Sequencer.Normalize(dueTime), action, cancellation.Token); + return cancellation; + } + + /// + /// Schedules an action to be executed at dueTime. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Absolute time at which to execute the action. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) => + Schedule(state, Sequencer.Normalize(dueTime - Now), action); + + /// + /// Delays work and then dispatches it through the renderer. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// The normalized due time. + /// Action to be executed. + /// Token used to cancel delayed work. + /// A task representing the asynchronous delay and dispatch. + private async Task DelayThenDispatchAsync( + TState state, + TimeSpan dueTime, + Func action, + CancellationToken cancellationToken) + { + try + { + await Task.Delay(dueTime, cancellationToken).ConfigureAwait(false); + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await _invokeAsync(() => + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + action(this, state); + }).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Cancellation is the expected disposal path for delayed renderer work. + } + } +} diff --git a/src/ReactiveUI.Primitives.Blazor/ReactiveUI.Primitives.Blazor.csproj b/src/ReactiveUI.Primitives.Blazor/ReactiveUI.Primitives.Blazor.csproj new file mode 100644 index 0000000..3cd097d --- /dev/null +++ b/src/ReactiveUI.Primitives.Blazor/ReactiveUI.Primitives.Blazor.csproj @@ -0,0 +1,17 @@ + + + + $(BlazorTargetFrameworks) + enable + enable + preview + Blazor integration helpers for ReactiveUI.Primitives. + system.reactive;rx;reactive;primitives;blazor;components;subscriptions + + + + + + + + diff --git a/src/ReactiveUI.Primitives.Maui/Concurrency/MauiDispatcherSequencer.cs b/src/ReactiveUI.Primitives.Maui/Concurrency/MauiDispatcherSequencer.cs new file mode 100644 index 0000000..ffc8ec2 --- /dev/null +++ b/src/ReactiveUI.Primitives.Maui/Concurrency/MauiDispatcherSequencer.cs @@ -0,0 +1,123 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using Microsoft.Maui.Dispatching; +using ReactiveUI.Primitives.Disposables; + +namespace ReactiveUI.Primitives.Concurrency; + +/// +/// MAUI dispatcher sequencer that schedules work through an . +/// +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed class MauiDispatcherSequencer : ISequencer +{ + /// + /// Initializes a new instance of the class. + /// + /// The dispatcher used to marshal work to the UI thread. + /// is . + public MauiDispatcherSequencer(IDispatcher dispatcher) => + Dispatcher = dispatcher ?? throw new ArgumentNullException(nameof(dispatcher)); + + /// + /// Gets the dispatcher used to marshal work to the UI thread. + /// + public IDispatcher Dispatcher { get; } + + /// + /// Gets the scheduler's notion of current time. + /// + public DateTimeOffset Now => TimeProvider.System.GetUtcNow(); + + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + + /// + /// Schedules an action to be executed. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + /// is . + public IDisposable Schedule(TState state, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var cancelable = new BooleanDisposable(); + Dispatcher.Dispatch(() => + { + if (cancelable.IsDisposed) + { + return; + } + + action(this, state); + }); + + return cancelable; + } + + /// + /// Schedules an action to be executed after dueTime. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Relative time after which to execute the action. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + /// is . + public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var cancelable = new BooleanDisposable(); + var timer = Dispatcher.CreateTimer(); + timer.Interval = Sequencer.Normalize(dueTime); + timer.IsRepeating = false; + timer.Tick += OnTick; + timer.Start(); + + return Disposable.Create(() => + { + cancelable.Dispose(); + timer.Stop(); + timer.Tick -= OnTick; + }); + + void OnTick(object? sender, EventArgs eventArgs) + { + timer.Stop(); + timer.Tick -= OnTick; + if (cancelable.IsDisposed) + { + return; + } + + action(this, state); + } + } + + /// + /// Schedules an action to be executed at dueTime. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Absolute time at which to execute the action. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) => + Schedule(state, Sequencer.Normalize(dueTime - Now), action); +} diff --git a/src/ReactiveUI.Primitives.Maui/Concurrency/MauiDispatcherSequencerMixins.cs b/src/ReactiveUI.Primitives.Maui/Concurrency/MauiDispatcherSequencerMixins.cs new file mode 100644 index 0000000..2b84f3c --- /dev/null +++ b/src/ReactiveUI.Primitives.Maui/Concurrency/MauiDispatcherSequencerMixins.cs @@ -0,0 +1,29 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using Microsoft.Maui.Dispatching; + +namespace ReactiveUI.Primitives.Concurrency; + +/// +/// Convenience helpers for MAUI dispatcher sequencers. +/// +public static class MauiDispatcherSequencerMixins +{ + /// + /// Adapts a MAUI dispatcher to an . + /// + /// The dispatcher to adapt. + /// A sequencer that schedules through . + /// is . + public static MauiDispatcherSequencer ToSequencer(this IDispatcher dispatcher) + { + if (dispatcher == null) + { + throw new ArgumentNullException(nameof(dispatcher)); + } + + return new MauiDispatcherSequencer(dispatcher); + } +} diff --git a/src/ReactiveUI.Primitives.Maui/ReactiveUI.Primitives.Maui.csproj b/src/ReactiveUI.Primitives.Maui/ReactiveUI.Primitives.Maui.csproj new file mode 100644 index 0000000..b9ed076 --- /dev/null +++ b/src/ReactiveUI.Primitives.Maui/ReactiveUI.Primitives.Maui.csproj @@ -0,0 +1,17 @@ + + + + $(MauiTargetFrameworks) + enable + enable + preview + MAUI dispatcher integration sequencers for ReactiveUI.Primitives. + system.reactive;rx;reactive;primitives;maui;dispatcher;scheduler;sequencer + + + + + + + + diff --git a/src/ReactiveUI.Primitives.WinForms/Concurrency/ControlSequencer.cs b/src/ReactiveUI.Primitives.WinForms/Concurrency/ControlSequencer.cs new file mode 100644 index 0000000..584b8ae --- /dev/null +++ b/src/ReactiveUI.Primitives.WinForms/Concurrency/ControlSequencer.cs @@ -0,0 +1,150 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Windows.Forms; +using ReactiveUI.Primitives.Disposables; +using FormsTimer = System.Windows.Forms.Timer; + +namespace ReactiveUI.Primitives.Concurrency; + +/// +/// Windows Forms sequencer that schedules work through a UI control. +/// +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed class ControlSequencer : ISequencer +{ + /// + /// Initializes a new instance of the class. + /// + /// The control used to marshal work to the UI thread. + /// is . + public ControlSequencer(Control control) => + Control = control ?? throw new ArgumentNullException(nameof(control)); + + /// + /// Gets the control used to marshal work to the UI thread. + /// + public Control Control { get; } + + /// + /// Gets the scheduler's notion of current time. + /// + public DateTimeOffset Now + { + get + { +#if NET8_0_OR_GREATER + return TimeProvider.System.GetUtcNow(); +#else +#pragma warning disable S6354 // TimeProvider is not available on supported .NET Framework target frameworks. + return DateTimeOffset.UtcNow; +#pragma warning restore S6354 +#endif + } + } + + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + + /// + /// Schedules an action to be executed. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + /// is . + public IDisposable Schedule(TState state, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var cancelable = new BooleanDisposable(); + Control.BeginInvoke((MethodInvoker)(() => + { + if (cancelable.IsDisposed) + { + return; + } + + action(this, state); + })); + + return cancelable; + } + + /// + /// Schedules an action to be executed after dueTime. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Relative time after which to execute the action. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + /// is . + public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var timer = new FormsTimer + { + Interval = ToTimerInterval(Sequencer.Normalize(dueTime)), + }; + + timer.Tick += (_, _) => + { + timer.Stop(); + timer.Dispose(); + action(this, state); + }; + timer.Start(); + + return Disposable.Create(() => + { + timer.Stop(); + timer.Dispose(); + }); + } + + /// + /// Schedules an action to be executed at dueTime. + /// + /// The type of the state passed to the scheduled action. + /// State passed to the action to be executed. + /// Absolute time at which to execute the action. + /// Action to be executed. + /// The disposable object used to cancel the scheduled action on a best-effort basis. + public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) => + Schedule(state, Sequencer.Normalize(dueTime - Now), action); + + /// + /// Converts the due time to a Windows Forms timer interval. + /// + /// The normalized due time. + /// The timer interval in milliseconds. + private static int ToTimerInterval(TimeSpan dueTime) + { + var totalMilliseconds = dueTime.TotalMilliseconds; + if (totalMilliseconds <= 1) + { + return 1; + } + + if (totalMilliseconds >= int.MaxValue) + { + return int.MaxValue; + } + + return (int)Math.Ceiling(totalMilliseconds); + } +} diff --git a/src/ReactiveUI.Primitives.WinForms/ReactiveUI.Primitives.WinForms.csproj b/src/ReactiveUI.Primitives.WinForms/ReactiveUI.Primitives.WinForms.csproj new file mode 100644 index 0000000..737f496 --- /dev/null +++ b/src/ReactiveUI.Primitives.WinForms/ReactiveUI.Primitives.WinForms.csproj @@ -0,0 +1,17 @@ + + + + $(WindowsLibraryTargetFrameworks) + enable + enable + preview + true + Windows Forms integration sequencers for ReactiveUI.Primitives. + system.reactive;rx;reactive;primitives;winforms;windows-forms;scheduler;sequencer + + + + + + + diff --git a/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs b/src/ReactiveUI.Primitives.Wpf/Concurrency/DispatcherSequencer.cs similarity index 86% rename from src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs rename to src/ReactiveUI.Primitives.Wpf/Concurrency/DispatcherSequencer.cs index 523cca6..45dac3a 100644 --- a/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs +++ b/src/ReactiveUI.Primitives.Wpf/Concurrency/DispatcherSequencer.cs @@ -2,12 +2,9 @@ // ReactiveUI Association Incorporated licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. -#if WINDOWS - using System; using System.Windows.Threading; using ReactiveUI.Primitives.Disposables; -using static ReactiveUI.Primitives.Disposables.Disposable; namespace ReactiveUI.Primitives.Concurrency; @@ -16,7 +13,7 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] -public partial class DispatcherSequencer : ISequencer +public class DispatcherSequencer : ISequencer { /// /// Initializes a new instance of the class. @@ -37,7 +34,25 @@ public DispatcherSequencer(Dispatcher dispatcher) => /// /// Gets the scheduler's notion of current time. /// - public DateTimeOffset Now => Sequencer.Now; + public DateTimeOffset Now + { + get + { +#if NET8_0_OR_GREATER + return TimeProvider.System.GetUtcNow(); +#else +#pragma warning disable S6354 // TimeProvider is not available on supported .NET Framework target frameworks. + return DateTimeOffset.UtcNow; +#pragma warning restore S6354 +#endif + } + } + + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; /// /// Schedules an action to be executed. @@ -97,7 +112,7 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, Func + return Disposable.Create(() => { timer?.Stop(); timer = null; @@ -117,4 +132,3 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, Func(TState state, DateTimeOffset dueTime, Func action) => Schedule(state, Sequencer.Normalize(dueTime - Now), action); } -#endif diff --git a/src/ReactiveUI.Primitives.Wpf/ReactiveUI.Primitives.Wpf.csproj b/src/ReactiveUI.Primitives.Wpf/ReactiveUI.Primitives.Wpf.csproj new file mode 100644 index 0000000..4adfe4c --- /dev/null +++ b/src/ReactiveUI.Primitives.Wpf/ReactiveUI.Primitives.Wpf.csproj @@ -0,0 +1,17 @@ + + + + $(WindowsLibraryTargetFrameworks) + enable + enable + preview + true + WPF integration sequencers for ReactiveUI.Primitives. + system.reactive;rx;reactive;primitives;wpf;dispatcher;scheduler;sequencer + + + + + + + diff --git a/src/ReactiveUI.Primitives.slnx b/src/ReactiveUI.Primitives.slnx index 3e9ae0f..193e48e 100644 --- a/src/ReactiveUI.Primitives.slnx +++ b/src/ReactiveUI.Primitives.slnx @@ -22,4 +22,8 @@ + + + + diff --git a/src/ReactiveUI.Primitives/Concurrency/SynchronizationContextSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/SynchronizationContextSequencer.cs new file mode 100644 index 0000000..ccafd06 --- /dev/null +++ b/src/ReactiveUI.Primitives/Concurrency/SynchronizationContextSequencer.cs @@ -0,0 +1,116 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Disposables; +using Timer = System.Threading.Timer; + +namespace ReactiveUI.Primitives.Concurrency; + +/// +/// Sequencer that posts work through a . +/// +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed class SynchronizationContextSequencer : ISequencer +{ + /// + /// Initializes a new instance of the class. + /// + /// The synchronization context used to schedule work. + /// is . + public SynchronizationContextSequencer(SynchronizationContext context) => + Context = context ?? throw new ArgumentNullException(nameof(context)); + + /// + /// Gets a sequencer for the current synchronization context. + /// + /// There is no current synchronization context. + public static SynchronizationContextSequencer Current => + new(SynchronizationContext.Current ?? throw new InvalidOperationException("There is no current synchronization context.")); + + /// + /// Gets the synchronization context used to schedule work. + /// + public SynchronizationContext Context { get; } + + /// + /// Gets the scheduler's notion of current time. + /// + public DateTimeOffset Now => Sequencer.Now; + + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + + /// + public IDisposable Schedule(TState state, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var cancelable = new BooleanDisposable(); + Context.Post( + _ => + { + if (cancelable.IsDisposed) + { + return; + } + + action(this, state); + }, + null); + + return cancelable; + } + + /// + public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) + { + if (action == null) + { + throw new ArgumentNullException(nameof(action)); + } + + var cancelable = new BooleanDisposable(); + Timer? timer = null; + timer = new Timer( + _ => + { + if (cancelable.IsDisposed) + { + return; + } + + Context.Post( + __ => + { + if (!cancelable.IsDisposed) + { + action(this, state); + } + + timer?.Dispose(); + }, + null); + }, + null, + Sequencer.Normalize(dueTime), + Timeout.InfiniteTimeSpan); + + return Disposable.Create(() => + { + cancelable.Dispose(); + timer.Dispose(); + }); + } + + /// + public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) => + Schedule(state, Sequencer.Normalize(dueTime - Now), action); +} diff --git a/src/ReactiveUI.Primitives/Core/EventPattern{TEventArgs}.cs b/src/ReactiveUI.Primitives/Core/EventPattern{TEventArgs}.cs new file mode 100644 index 0000000..3cb7912 --- /dev/null +++ b/src/ReactiveUI.Primitives/Core/EventPattern{TEventArgs}.cs @@ -0,0 +1,74 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace ReactiveUI.Primitives.Core; + +/// +/// Represents a .NET event notification as a value. +/// +/// The event arguments type. +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public readonly struct EventPattern : IEquatable> + where TEventArgs : EventArgs +{ + /// + /// Initializes a new instance of the struct. + /// + /// The event sender. + /// The event arguments. + public EventPattern(object? sender, TEventArgs eventArgs) + { + Sender = sender; + EventArgs = eventArgs ?? throw new ArgumentNullException(nameof(eventArgs)); + } + + /// + /// Gets the event sender. + /// + public object? Sender { get; } + + /// + /// Gets the event arguments. + /// + public TEventArgs EventArgs { get; } + + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString(); + + /// + /// Compares two event pattern values for equality. + /// + /// The first value. + /// The second value. + /// when the values are equal; otherwise, . + public static bool operator ==(EventPattern left, EventPattern right) => left.Equals(right); + + /// + /// Compares two event pattern values for inequality. + /// + /// The first value. + /// The second value. + /// when the values are not equal; otherwise, . + public static bool operator !=(EventPattern left, EventPattern right) => !left.Equals(right); + + /// + public bool Equals(EventPattern other) => + ReferenceEquals(Sender, other.Sender) && EqualityComparer.Default.Equals(EventArgs, other.EventArgs); + + /// + public override bool Equals(object? obj) => obj is EventPattern other && Equals(other); + + /// + public override int GetHashCode() + { + var senderHashCode = Sender?.GetHashCode() ?? 0; + return (senderHashCode * 397) ^ EqualityComparer.Default.GetHashCode(EventArgs); + } + + /// + public override string ToString() => $"{Sender}: {EventArgs}"; +} diff --git a/src/ReactiveUI.Primitives/DebuggerDisplay.Partials.cs b/src/ReactiveUI.Primitives/DebuggerDisplay.Partials.cs index 90ec635..b9a4d7e 100644 --- a/src/ReactiveUI.Primitives/DebuggerDisplay.Partials.cs +++ b/src/ReactiveUI.Primitives/DebuggerDisplay.Partials.cs @@ -39,17 +39,6 @@ public sealed partial class CurrentThreadSequencer private string DebuggerDisplay => ToString() ?? string.Empty; } -#if WINDOWS - public partial class DispatcherSequencer - { - /// - /// Gets the debugger display text. - /// - [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] - private string DebuggerDisplay => ToString() ?? string.Empty; - } -#endif - public sealed partial class ImmediateSequencer { /// diff --git a/src/ReactiveUI.Primitives/ReactiveUI.Primitives.csproj b/src/ReactiveUI.Primitives/ReactiveUI.Primitives.csproj index 0f72b04..6ae0eb4 100644 --- a/src/ReactiveUI.Primitives/ReactiveUI.Primitives.csproj +++ b/src/ReactiveUI.Primitives/ReactiveUI.Primitives.csproj @@ -7,12 +7,6 @@ preview - - true - true - $(DefineConstants);WINDOWS - - 15.0 diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs index 2416280..d30a700 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs @@ -174,6 +174,34 @@ public static IObservable ObserveOn(this IObservable source, ISequencer return source.WitnessOn(scheduler); } + /// + /// Schedules source subscription on the supplied sequencer. + /// + /// The value type. + /// The source sequence. + /// The sequencer used to perform subscription. + /// A sequence that subscribes to on . + /// or is . + public static IObservable SubscribeOn(this IObservable source, ISequencer scheduler) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (scheduler == null) + { + throw new ArgumentNullException(nameof(scheduler)); + } + + return Signal.Create(observer => + { + var subscription = new SingleReplaceableDisposable(); + var scheduled = scheduler.Schedule(() => subscription.Create(source.Subscribe(observer))); + return MultipleDisposable.Create(scheduled, subscription); + }); + } + /// /// Alias for using the System.Reactive operator name. /// @@ -1210,6 +1238,40 @@ public static Task ToTask(this IObservable source, CancellationToken ca /// is . public static Task ToTask(this Task task) => task ?? throw new ArgumentNullException(nameof(task)); + /// + /// Awaits source completion and returns the last value produced by the source. + /// + /// The value type. + /// The source sequence. + /// A task that completes with the final source value. + public static Task LastAsync(this IObservable source) => source.ToTask(); + + /// + /// Awaits source completion and returns the last value produced by the source, or when the source is empty. + /// + /// The value type. + /// The source sequence. + /// A task that completes with the final source value, or when the source is empty. + public static Task LastOrDefaultAsync(this IObservable source) => + source.LastOrDefaultAsync(default!); + + /// + /// Awaits source completion and returns the last value produced by the source, or when the source is empty. + /// + /// The value type. + /// The source sequence. + /// The fallback value to use when the source is empty. + /// A task that completes with the final source value, or when the source is empty. + public static Task LastOrDefaultAsync(this IObservable source, T defaultValue) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + return source.DefaultIfEmpty(defaultValue).ToTask(); + } + /// /// Awaits the source count as a task. /// @@ -1323,6 +1385,22 @@ public static Task CollectArrayAsync(this IObservable source) return completion.Task; } + /// + /// Collects all values into an array. + /// + /// The value type. + /// The source sequence. + /// A sequence that emits a single array containing all source values. + public static IObservable ToArray(this IObservable source) => source.CollectArray(); + + /// + /// Collects all values into an array task. + /// + /// The value type. + /// The source sequence. + /// A task that completes with all source values in an array. + public static Task ToArrayAsync(this IObservable source) => source.CollectArrayAsync(); + /// /// Collects all values into a list task. /// @@ -1348,6 +1426,22 @@ public static Task> CollectListAsync(this IObservable source) return completion.Task; } + /// + /// Collects all values into a list. + /// + /// The value type. + /// The source sequence. + /// A sequence that emits a single list containing all source values. + public static IObservable> ToList(this IObservable source) => source.CollectList(); + + /// + /// Collects all values into a list task. + /// + /// The value type. + /// The source sequence. + /// A task that completes with all source values in a list. + public static Task> ToListAsync(this IObservable source) => source.CollectListAsync(); + /// /// Awaits the first source value and applies the configured empty-source behavior. /// diff --git a/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs b/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs index a22ac19..79ecd5e 100644 --- a/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs +++ b/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for full license information. using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Core; using ReactiveUI.Primitives.Disposables; using ReactiveUI.Primitives.Signals.Core; @@ -128,6 +129,16 @@ public static IObservable Unfold( true); } + /// + /// Generates a finite signal from state. Alias of . + /// + public static IObservable Generate( + TState initialState, + Func condition, + Func iterate, + Func resultSelector) => + Unfold(initialState, condition, iterate, resultSelector); + /// /// Creates a signal whose subscription lifetime owns a resource. /// @@ -171,6 +182,61 @@ public static IObservable Using(Func resourceFactory where TResource : IDisposable => Use(resourceFactory, signalFactory); + /// + /// Converts an event into a signal of event pattern values. + /// + public static IObservable> FromEventPattern( + Action addHandler, + Action removeHandler) + { + if (addHandler == null) + { + throw new ArgumentNullException(nameof(addHandler)); + } + + if (removeHandler == null) + { + throw new ArgumentNullException(nameof(removeHandler)); + } + + return Create>(observer => + { + void Handler(object? sender, EventArgs eventArgs) => + observer.OnNext(new EventPattern(sender, eventArgs)); + + addHandler(Handler); + return Disposable.Create(() => removeHandler(Handler)); + }); + } + + /// + /// Converts an event into a signal of event pattern values. + /// + public static IObservable> FromEventPattern( + Action> addHandler, + Action> removeHandler) + where TEventArgs : EventArgs + { + if (addHandler == null) + { + throw new ArgumentNullException(nameof(addHandler)); + } + + if (removeHandler == null) + { + throw new ArgumentNullException(nameof(removeHandler)); + } + + return Create>(observer => + { + void Handler(object? sender, TEventArgs eventArgs) => + observer.OnNext(new EventPattern(sender, eventArgs)); + + addHandler(Handler); + return Disposable.Create(() => removeHandler(Handler)); + }); + } + /// /// Creates a signal from an enumerable sequence. /// @@ -474,6 +540,25 @@ public static IObservable Every(TimeSpan period, ISequencer scheduler) /// public static IObservable Timer(TimeSpan dueTime, ISequencer scheduler) => After(dueTime, scheduler); + /// + /// Emits a single zero tick at the specified absolute due time. + /// + public static IObservable Timer(DateTimeOffset dueTime) => + Timer(dueTime, ThreadPoolSequencer.Instance); + + /// + /// Emits a single zero tick at the specified absolute due time. + /// + public static IObservable Timer(DateTimeOffset dueTime, ISequencer scheduler) + { + if (scheduler == null) + { + throw new ArgumentNullException(nameof(scheduler)); + } + + return After(Sequencer.Normalize(dueTime - scheduler.Now), scheduler); + } + /// /// Creates a timer that emits first after and then at . /// @@ -543,6 +628,11 @@ public static IObservable Race(params IObservable[] sources) return FromEnumerable(validated).Race(); } + /// + /// Mirrors the first supplied signal to produce a value or terminal signal. + /// + public static IObservable Amb(params IObservable[] sources) => Race(sources); + /// /// Zips two signals with a result selector. /// @@ -631,14 +721,39 @@ private static IObservable[] ValidateSources(IObservable[] sources) private static IDisposable SubscribeAsyncEnumerable(IAsyncEnumerable values, IObserver observer, CancellationToken cancellationToken) { var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var disposed = 0; IAsyncEnumerator? enumerator = null; _ = Task.Run( - async () => await PumpAsyncEnumerable(values, observer, cts, enumeratorReference => enumerator = enumeratorReference).ConfigureAwait(false), + async () => + { + try + { + await PumpAsyncEnumerable(values, observer, cts, enumeratorReference => enumerator = enumeratorReference).ConfigureAwait(false); + } + finally + { + Volatile.Write(ref disposed, 1); + } + }, CancellationToken.None); return Disposable.Create(() => { - cts.Cancel(); + if (Interlocked.Exchange(ref disposed, 1) != 0) + { + return; + } + + try + { + cts.Cancel(); + } + catch (ObjectDisposedException) + { + // The async enumerable completed and released its linked token before disposal reached this callback. + return; + } + var current = Volatile.Read(ref enumerator); if (current == null) { diff --git a/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs b/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs index f0b57bf..949d463 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using ReactiveUI.Primitives.Concurrency; using ReactiveUI.Primitives.Core; @@ -439,6 +440,40 @@ public async Task SequencersCoverValidationAndExecutionBranches() Assert.Throws(() => ThreadPoolSequencer.Instance.Schedule(One, null!)); Assert.Throws(() => ThreadPoolSequencer.Instance.Schedule(One, TimeSpan.Zero, null!)); + + var synchronizationContext = new ImmediateSynchronizationContext(); + Assert.Throws(CreateSynchronizationContextSequencerWithoutContext); + var previousContext = SynchronizationContext.Current; + try + { + SynchronizationContext.SetSynchronizationContext(synchronizationContext); + Assert.Same(synchronizationContext, SynchronizationContextSequencer.Current.Context); + } + finally + { + SynchronizationContext.SetSynchronizationContext(previousContext); + } + + var synchronizationSequencer = new SynchronizationContextSequencer(synchronizationContext); + Assert.True(synchronizationSequencer.Now > DateTimeOffset.MinValue); + Assert.Throws(() => synchronizationSequencer.Schedule(One, null!)); + Assert.Throws(() => synchronizationSequencer.Schedule(One, TimeSpan.Zero, null!)); + + var synchronizationValues = new List(); + using var synchronizationSubscription = synchronizationSequencer.Schedule(One, (_, state) => + { + synchronizationValues.Add(state); + return Disposable.Empty; + }); + + var delayedSynchronizationCompletion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var delayedSynchronizationSubscription = synchronizationSequencer.Schedule(Two, TimeSpan.Zero, (_, state) => + { + delayedSynchronizationCompletion.TrySetResult(state); + return Disposable.Empty; + }); + var delayedValue = await delayedSynchronizationCompletion.Task.WaitAsync(TimeSpan.FromSeconds(TimeoutSeconds)); + Assert.Equal(ExpectedOneTwo, (IEnumerable)[.. synchronizationValues, delayedValue]); } /// @@ -498,6 +533,12 @@ private static void CreateScheduledItemWithoutAction() => private static void CreateScheduledItemWithoutComparer() => _ = new ScheduledItem(Sequencer.Immediate, "x", (_, _) => Disposable.Empty, One, null!); + /// + /// Creates a synchronization-context sequencer without a context. + /// + private static void CreateSynchronizationContextSequencerWithoutContext() => + _ = new SynchronizationContextSequencer(null!); + /// /// Compares a scheduled item through the non-generic comparable interface. /// @@ -582,6 +623,15 @@ public ExposedMultipleDisposable(IDisposable disposable) public void DisposeFalse() => Dispose(false); } + /// + /// Synchronization context that runs posted work immediately. + /// + private sealed class ImmediateSynchronizationContext : SynchronizationContext + { + /// + public override void Post(SendOrPostCallback d, object? state) => d(state); + } + /// /// Records observer values and terminal signals. /// diff --git a/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs b/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs index 0890a11..c650ea2 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs @@ -386,6 +386,7 @@ public void CombiningOperatorsPreserveCoreOrderingSemantics() var rangeConcatenated = new List(); var rangeMerged = new List(); var rangeRace = new List(); + var rangeAmb = new List(); var rangeLatest = new List(); var rangeWithLatest = new List(); var rangeForkJoin = new List(); @@ -400,6 +401,7 @@ public void CombiningOperatorsPreserveCoreOrderingSemantics() rangeConcatSignal.Subscribe(rangeObserver); Signal.Merge(Signal.Range(FirstValue, SecondValue), Signal.Range(RetrySuccessAttempt, SecondValue)).Subscribe(rangeMerged.Add); Signal.Race(Signal.Range(FirstValue, SecondValue), Signal.Range(RetrySuccessAttempt, SecondValue)).Subscribe(rangeRace.Add); + Signal.Amb(Signal.Range(FirstValue, SecondValue), Signal.Range(RetrySuccessAttempt, SecondValue)).Subscribe(rangeAmb.Add); Signal.CombineLatest(Signal.Range(FirstValue, SecondValue), Signal.Range(ProjectionMultiplier, SecondValue), static (left, right) => left + right).Subscribe(rangeLatest.Add); Signal.Range(FirstValue, SecondValue).WithLatest(Signal.Range(ProjectionMultiplier, SecondValue), static (left, right) => left + right).Subscribe(rangeWithLatest.Add); Signal.ForkJoin(Signal.Range(FirstValue, SecondValue), Signal.Range(ProjectionMultiplier, SecondValue), static (left, right) => left + right).Subscribe(rangeForkJoin.Add); @@ -416,6 +418,7 @@ public void CombiningOperatorsPreserveCoreOrderingSemantics() Assert.Equal(1, rangeObserver.Completed); Assert.Equal(FourItemExpected, rangeMerged); Assert.Equal(TakeWhileExpected, rangeRace); + Assert.Equal(TakeWhileExpected, rangeAmb); Assert.Equal(new[] { ProjectedSecondBucketPeerValue, RangeZipShorterSecondResult }, rangeLatest); Assert.Equal(new[] { ProjectedSecondBucketPeerValue, RangeZipShorterSecondResult }, rangeWithLatest); Assert.Equal(new[] { RangeZipShorterSecondResult }, rangeForkJoin); @@ -493,6 +496,25 @@ async IAsyncEnumerable Values([EnumeratorCancellation] CancellationToken to Assert.True(disposed); } + /// + /// Verifies completed async enumerable subscriptions can be disposed without racing a disposed token source. + /// + /// A task that completes when asynchronous assertions have run. + [Test] + public async Task AsyncEnumerableFactoryCanDisposeAfterCompletion() + { + static async IAsyncEnumerable Values() + { + yield return FirstValue; + await Task.Yield(); + yield return SecondValue; + } + + var values = await Signal.FromAsyncEnumerable(Values()).CollectArrayAsync(); + + Assert.Equal(TakeWhileExpected, (IEnumerable)values); + } + /// /// Verifies timer factories use an injected virtual sequencer. /// @@ -501,9 +523,11 @@ public void TimeFactoriesUseInjectedScheduler() { var clock = new TestClock(); var after = new List(); + var absoluteTimer = new List(); var every = new List(); Signal.After(TimeSpan.FromTicks(AfterTicks), clock).Subscribe(after.Add); + Signal.Timer(clock.Now.AddTicks(AfterTicks), clock).Subscribe(absoluteTimer.Add); var subscription = Signal.Every(TimeSpan.FromTicks(EveryTicks), clock).Subscribe(every.Add); clock.AdvanceBy(TimeSpan.FromTicks(InitialAdvanceTicks)); @@ -512,6 +536,7 @@ public void TimeFactoriesUseInjectedScheduler() clock.AdvanceBy(TimeSpan.FromTicks(FirstValue)); Assert.Equal(OneShotTimerExpected, after); + Assert.Equal(OneShotTimerExpected, absoluteTimer); clock.AdvanceBy(TimeSpan.FromTicks(InitialAdvanceTicks)); subscription.Dispose(); @@ -655,28 +680,47 @@ public async Task FactoryAliasesAndGuardsCoverParityBranches() Assert.Throws(() => Signal.Unfold(0, null!, static state => state, static state => state)); Assert.Throws(() => Signal.Unfold(0, static _ => true, null!, static state => state)); Assert.Throws(() => Signal.Unfold(0, static _ => true, static state => state, null!)); + Assert.Throws(() => Signal.FromEventPattern(null!, _ => { })); + Assert.Throws(() => Signal.FromEventPattern(_ => { }, null!)); + Assert.Throws(() => Signal.FromEventPattern(null!, _ => { })); + Assert.Throws(() => Signal.FromEventPattern(_ => { }, null!)); Assert.Throws(() => Signal.Start((Func)null!)); Assert.Throws(() => Signal.Start(static () => FirstValue, null!)); Assert.Throws(() => Signal.Start((Action)null!)); Assert.Throws(() => Signal.After(TimeSpan.Zero, null!)); + Assert.Throws(() => Signal.Timer(DateTimeOffset.UnixEpoch, null!)); Assert.Throws(() => Signal.Every(TimeSpan.FromTicks(-1))); Assert.Throws(() => Signal.Timer(TimeSpan.Zero, TimeSpan.Zero, null!)); Assert.Throws(() => Signal.FromAsync((Func>)null!)); Assert.Throws(() => Signal.FromAsync((Func>)null!)); + Assert.Throws(() => ((IObservable)null!).SubscribeOn(Sequencer.Immediate)); + Assert.Throws(() => Signal.Empty().SubscribeOn(null!)); Signal.Range(FirstValue, 0).Subscribe(values.Add, errors.Add, () => completed++); Signal.Repeat(FirstValue, 0).Subscribe(values.Add, errors.Add, () => completed++); + Signal.Generate(FirstValue, value => value <= SecondValue, value => value + 1, value => value).Subscribe(values.Add); + Signal.Range(FirstValue, SecondValue).SubscribeOn(Sequencer.Immediate).Subscribe(values.Add); new[] { FirstValue, SecondValue }.ToObservable(cancelled.Token).Subscribe(values.Add, errors.Add, () => completed++); Signal.Start(() => throw new InvalidOperationException("start failed"), Sequencer.Immediate).Subscribe(values.Add, errors.Add, () => completed++); + var eventSource = new EventSource(); + var eventValues = new List>(); + using (Signal.FromEventPattern(handler => eventSource.Raised += handler, handler => eventSource.Raised -= handler).Subscribe(eventValues.Add)) + { + eventSource.Raise(); + } + var fromAsync = await Signal.FromAsync(() => Task.FromResult(RetryResult)).ToTask(); var fromAsyncWithToken = await Signal.FromAsync(static token => Task.FromResult(token.IsCancellationRequested ? -1 : RetrySuccessAttempt)).ToTask(); Assert.Equal(RetryResult, fromAsync); Assert.Equal(RetrySuccessAttempt, fromAsyncWithToken); - Assert.Equal(0, values.Count); + Assert.Equal(new[] { FirstValue, SecondValue, FirstValue, SecondValue }, values); Assert.Equal(SecondValue, completed); Assert.Equal(1, errors.Count); + Assert.Equal(1, eventValues.Count); + Assert.Same(eventSource, eventValues[0].Sender); + Assert.Same(EventArgs.Empty, eventValues[0].EventArgs); } /// @@ -752,14 +796,44 @@ private static async Task VerifyTaskAliasOperators() { var converted = new[] { 4, AfterTicks }.ToObservable(); var last = await converted.ToTask(); + var lastAlias = await converted.LastAsync(); + var lastDefault = await Signal.Empty().LastOrDefaultAsync(RetryResult); + var array = await Signal.Range(FirstValue, FourthValue).ToArrayAsync(); + var list = await Signal.Range(FirstValue, FourthValue).ToListAsync(); +#pragma warning disable S6966 // This verifies the observable ToArray/ToList aliases, not async enumerable materialization. + var observedArray = await Signal.Range(FirstValue, SecondValue).ToArray().ToTask(); + var observedList = await Signal.Range(FirstValue, SecondValue).ToList().ToTask(); +#pragma warning restore S6966 var first = await Signal.FromEnumerable([RepeatValue, ProjectionMultiplier]).FirstAsync().ToTask(); var started = await Signal.Start(() => ProjectedSecondValue, Sequencer.CurrentThread).ToTask(); Assert.Equal(AfterTicks, last); + Assert.Equal(AfterTicks, lastAlias); + Assert.Equal(RetryResult, lastDefault); + Assert.Equal(FourItemExpected, (IEnumerable)array); + Assert.Equal(FourItemExpected, (IEnumerable)list); + Assert.Equal((IEnumerable)[FirstValue, SecondValue], observedArray); + Assert.Equal([FirstValue, SecondValue], (IEnumerable)observedList); Assert.Equal(RepeatValue, first); Assert.Equal(ProjectedSecondValue, started); } + /// + /// Test event source. + /// + private sealed class EventSource + { + /// + /// Raised when is called. + /// + public event EventHandler? Raised; + + /// + /// Raises the event. + /// + public void Raise() => Raised?.Invoke(this, EventArgs.Empty); + } + /// /// Records observer values and terminal signals. ///