A focused collection of high–value Reactive Extensions (Rx) operators that do not ship with System.Reactive but are commonly needed when building reactive .NET applications.
The goal of this library is to:
- Reduce boilerplate for frequent reactive patterns (timers, buffering, throttling, heartbeats, etc.)
- Provide pragmatic, allocation‑aware helpers for performance sensitive scenarios
- Avoid additional dependencies – only
System.Reactiveis required
Supported Target Frameworks: .NET 4.6.2, .NET 4.7.2, .NET 4.8.1, .NET 8, .NET 9, .NET 10.
- Installation
- Quick Start
- API Catalog
- Operator Categories & Examples
- Null / Signal Helpers
- Timing, Scheduling & Flow Control
- Inactivity / Liveness
- Error Handling & Resilience
- Combining, Partitioning & Logical Helpers
- Async / Task Integration
- Backpressure / Conflation
- Selective & Conditional Emission
- Buffering & Transformation
- Subscription / Side Effects
- Utility & Miscellaneous
- Async Observables (
IObservableAsync<T>)- Async Quick Start
- Async API Catalog
- Core Interfaces & Types
- Factory Methods
- Transformation Operators
- Filtering Operators
- Combining & Merging
- Error Handling & Retry
- Timing & Scheduling (Async)
- Aggregation & Element Operators
- Side Effects & Lifecycle
- Multicasting & Sharing
- Bridging & Conversion
- Subjects
- Async Disposables
- Performance Notes
- Thread Safety
- License
# Package coming soon (example)
dotnet add package ReactiveUI.ExtensionsReference the project directly while developing locally.
using System;
using System.Reactive.Linq;
using ReactiveUI.Extensions;
var source = Observable.Interval(TimeSpan.FromMilliseconds(120))
.Take(10)
.Select(i => (long?) (i % 3 == 0 ? null : i));
// 1. Filter nulls + convert to a Unit signal.
var signal = source.WhereIsNotNull().AsSignal();
// 2. Add a heartbeat if the upstream goes quiet for 500ms.
var withHeartbeat = source.WhereIsNotNull()
.Heartbeat(TimeSpan.FromMilliseconds(500), Scheduler.Default);
// 3. Retry with exponential backoff up to 5 times.
var resilient = Observable.Defer(() =>
Observable.Throw<long>(new InvalidOperationException("Boom")))
.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));
// 4. Conflate bursty updates.
var conflated = source.Conflate(TimeSpan.FromMilliseconds(300), Scheduler.Default);
using (conflated.Subscribe(Console.WriteLine))
{
Console.ReadLine();
}Below is the full list of extension methods (grouped logically).
Some overloads omitted for brevity.
| Category | Operators |
|---|---|
| Null & Signal | WhereIsNotNull, AsSignal |
| Timing & Scheduling | SyncTimer, Schedule (overloads), ScheduleSafe, ThrottleFirst, ThrottleDistinct, DebounceImmediate |
| Inactivity / Liveness | Heartbeat, DetectStale, BufferUntilInactive |
| Error Handling | CatchIgnore, CatchAndReturn, OnErrorRetry (overloads), RetryWithBackoff |
| Combining & Aggregation | CombineLatestValuesAreAllTrue, CombineLatestValuesAreAllFalse, GetMax, GetMin, Partition |
| Logical / Boolean | Not, WhereTrue, WhereFalse |
| Async / Task | SelectAsyncSequential, SelectLatestAsync, SelectAsyncConcurrent, SubscribeAsync (overloads), SynchronizeSynchronous, SynchronizeAsync, SubscribeSynchronous (overloads), ToHotTask |
| Backpressure | Conflate |
| Filtering / Conditional | Filter (Regex), TakeUntil (predicate), WaitUntil, SampleLatest, SwitchIfEmpty, DropIfBusy |
| Buffering | BufferUntil, BufferUntilInactive, BufferUntilIdle, Pairwise, ScanWithInitial |
| Transformation & Utility | Shuffle, ForEach, FromArray, Using, While, Start, OnNext (params helper), DoOnSubscribe, DoOnDispose, ToReadOnlyBehavior, ToPropertyObservable |
IObservable<string?> raw = GetPossiblyNullStream();
IObservable<string> cleaned = raw.WhereIsNotNull();
IObservable<Unit> signal = cleaned.AsSignal();// Shared timer for a given period (one underlying timer per distinct TimeSpan)
var sharedTimer = ReactiveExtensions.SyncTimer(TimeSpan.FromSeconds(1));
// Delay emission of a single value
42.Schedule(TimeSpan.FromMilliseconds(250), Scheduler.Default)
.Subscribe(v => Console.WriteLine($"Delayed: {v}"));
// Safe scheduling when a scheduler may be null
IScheduler? maybeScheduler = null;
maybeScheduler.ScheduleSafe(() => Console.WriteLine("Ran inline"));
// ThrottleFirst: allow first item per window, ignore rest
var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50))
.ThrottleFirst(TimeSpan.FromMilliseconds(200));
// DebounceImmediate: emit first immediately then debounce rest
var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40))
.DebounceImmediate(TimeSpan.FromMilliseconds(250));
// ThrottleDistinct: throttle but only emit when the value actually changes
var source = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(20);
var distinctThrottled = source.ThrottleDistinct(TimeSpan.FromMilliseconds(200));// Heartbeat emits IHeartbeat<T> where IsHeartbeat == true during quiet periods
var heartbeats = Observable.Interval(TimeSpan.FromMilliseconds(400))
.Take(5)
.Heartbeat(TimeSpan.FromMilliseconds(300), Scheduler.Default);
// DetectStale emits IStale<T>: one stale marker after inactivity, or fresh update wrappers
var staleAware = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
.Take(3)
.DetectStale(TimeSpan.FromMilliseconds(300), Scheduler.Default);
// BufferUntilInactive groups events separated by inactivity
var bursts = Observable.Interval(TimeSpan.FromMilliseconds(60)).Take(20);
var groups = bursts.BufferUntilInactive(TimeSpan.FromMilliseconds(200));var flaky = Observable.Create<int>(o =>
{
o.OnNext(1);
o.OnError(new InvalidOperationException("Fail"));
return () => { };
});
// Ignore all errors and complete silently
var flakySafe = flaky.CatchIgnore();
// Replace error with a fallback value
var withFallback = flaky.CatchAndReturn(-1);
// Retry only specific exception type with logging
var retried = flaky.OnErrorRetry<int, InvalidOperationException>(ex => Console.WriteLine(ex.Message), retryCount: 3);
// Retry with exponential backoff
var backoff = flaky.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));var a = Observable.Interval(TimeSpan.FromMilliseconds(150)).Select(i => i % 2 == 0);
var b = Observable.Interval(TimeSpan.FromMilliseconds(170)).Select(i => i % 3 == 0);
var allTrue = new[] { a, b }.CombineLatestValuesAreAllTrue();
var allFalse = new[] { a, b }.CombineLatestValuesAreAllFalse();
var numbers = Observable.Range(1, 10);
var (even, odd) = numbers.Partition(n => n % 2 == 0); // Partition stream
var toggles = a.Not(); // Negate booleansIObservable<int> inputs = Observable.Range(1, 5);
// Sequential (preserves order)
var seq = inputs.SelectAsyncSequential(async i => { await Task.Delay(50); return i * 2; });
// Latest only (cancels previous)
var latest = inputs.SelectLatestAsync(async i => { await Task.Delay(100); return i; });
// Limited parallelism
var concurrent = inputs.SelectAsyncConcurrent(async i => { await Task.Delay(100); return i; }, maxConcurrency: 2);
// Asynchronous subscription (serializing tasks)
inputs.SubscribeAsync(async i => await Task.Delay(10));
// Synchronous gate: ensures per-item async completion before next is emitted
inputs.SubscribeSynchronous(async i => await Task.Delay(25));
// ToHotTask: convert an observable to a Task that starts immediately
var source = Observable.Return(42);
var task = source.ToHotTask();
var result = await task; // 42// Conflate: enforce minimum spacing between emissions while always outputting the most recent value
var noisy = Observable.Interval(TimeSpan.FromMilliseconds(20)).Take(30);
var conflated = noisy.Conflate(TimeSpan.FromMilliseconds(200), Scheduler.Default);// TakeUntil predicate (inclusive)
var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5);
// WaitUntil first match then complete
var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0);
// SampleLatest: sample the latest value whenever a trigger fires
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
var trigger = Observable.Interval(TimeSpan.FromMilliseconds(300)).Take(3);
var sampled = source.SampleLatest(trigger);
// SwitchIfEmpty: provide a fallback if the source completes without emitting
var empty = Observable.Empty<int>();
var fallback = Observable.Return(42);
var result = empty.SwitchIfEmpty(fallback); // emits 42
// DropIfBusy: drop values if the previous async operation is still running
var inputs = Observable.Range(1, 5);
var processed = inputs.DropIfBusy(async x => { await Task.Delay(200); Console.WriteLine(x); });// BufferUntil - collect chars between delimiters
var chars = "<a><bc><d>".ToCharArray().ToObservable();
var frames = chars.BufferUntil('<', '>'); // emits "<a>", "<bc>", "<d>"
// Shuffle arrays in-place
var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 });
var shuffled = arrays.Shuffle();
// BufferUntilIdle: emit a batch when the stream goes quiet
var events = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
var batches = events.BufferUntilIdle(TimeSpan.FromMilliseconds(250));
// Pairwise: emit consecutive pairs
var numbers = Observable.Range(1, 5);
var pairs = numbers.Pairwise(); // emits (1,2), (2,3), (3,4), (4,5)
// ScanWithInitial: scan that always emits the initial value first
var values = Observable.Return(5);
var accumulated = values.ScanWithInitial(10, (acc, x) => acc + x); // emits 10, then 15var stream = Observable.Range(1, 3)
.DoOnSubscribe(() => Console.WriteLine("Subscribed"))
.DoOnDispose(() => Console.WriteLine("Disposed"));
using (stream.Subscribe(Console.WriteLine))
{
// auto dispose at using end
}// Emit list contents quickly with low allocations
var listSource = Observable.Return<IEnumerable<int>>(new List<int> { 1, 2, 3 });
listSource.ForEach().Subscribe(Console.WriteLine);
// Using helper for deterministic disposal
var value = new MemoryStream().Using(ms => ms.Length);
// While loop (reactive)
var counter = 0;
ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter))
.Subscribe();
// Batch push with OnNext params
var subj = new Subject<int>();
subj.OnNext(1, 2, 3, 4);
// ToReadOnlyBehavior: create a read-only behavior subject
var (observable, observer) = ReactiveExtensions.ToReadOnlyBehavior(10);
observer.OnNext(20); // observable emits 10, then 20
// ToPropertyObservable: observe property changes on INotifyPropertyChanged
public class ViewModel : INotifyPropertyChanged
{
private string _name;
public string Name
{
get => _name;
set { _name = value; PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Name))); }
}
public event PropertyChangedEventHandler? PropertyChanged;
}
var vm = new ViewModel();
var nameChanges = vm.ToPropertyObservable(x => x.Name);
vm.Name = "Hello"; // observable emits "Hello"A fully async-native observable framework that mirrors the System.Reactive programming model but uses ValueTask, CancellationToken, and IAsyncDisposable throughout. Every observer callback (OnNextAsync, OnErrorResumeAsync, OnCompletedAsync) is asynchronous, enabling true end-to-end async/await pipelines without blocking threads.
Key design goals:
- Async all the way down – no synchronous observer callbacks; every notification is
ValueTask-based - Cancellation-first – every operator and subscription accepts
CancellationToken - Async disposal – subscriptions return
IAsyncDisposablefor clean async resource cleanup - Interop – bidirectional bridge between
IObservable<T>andIObservableAsync<T> - Configurable concurrency – subjects offer serial or concurrent publishing, stateful or stateless modes
Note: Async Observables require .NET 8 or later.
using ReactiveUI.Extensions.Async;
using ReactiveUI.Extensions.Async.Subjects;
using ReactiveUI.Extensions.Async.Disposables;
// 1. Create an async observable from a factory
var source = ObservableAsync.Range(1, 5);
// 2. Apply operators – filter, transform, throttle
var pipeline = source
.Where(x => x % 2 != 0) // keep odd numbers
.Select(x => x * 10) // multiply by 10
.Do(x => Console.WriteLine($"Processing: {x}"));
// 3. Subscribe asynchronously
await using var subscription = await pipeline.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Received: {value}"),
CancellationToken.None);
// 4. Use subjects for multicasting
var subject = SubjectAsync.Create<string>();
await using var sub = await subject.Values.SubscribeAsync(
async (msg, ct) => Console.WriteLine(msg),
CancellationToken.None);
await subject.OnNextAsync("Hello, async Rx!", CancellationToken.None);
// 5. Bridge from classic IObservable<T>
var classic = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(5);
var asyncVersion = classic.ToObservableAsync();
// 6. Aggregate results
int count = await source.CountAsync(CancellationToken.None);
int first = await source.FirstAsync(CancellationToken.None);
var items = await source.ToListAsync(CancellationToken.None);| Category | Operators / Types |
|---|---|
| Core Interfaces | IObservableAsync<T>, IObserverAsync<T>, ObservableAsync<T>, ConnectableObservableAsync<T> |
| Factory Methods | Create, CreateAsBackgroundJob, Return, Empty, Never, Throw, Range, Interval, Timer, Defer, FromAsync, ToObservableAsync (Task / IAsyncEnumerable / IEnumerable) |
| Transformation | Select (sync & async), SelectMany, Scan (sync & async), Cast, OfType, GroupBy |
| Filtering | Where (sync & async), Take, Skip, TakeWhile, SkipWhile, TakeUntil (observable / task / token / predicate), Distinct, DistinctBy, DistinctUntilChanged, DistinctUntilChangedBy |
| Combining | Merge (overloads), Concat (overloads), Switch, CombineLatest (2–8 sources), Zip, Prepend, StartWith |
| Error Handling | Catch, CatchAndIgnoreErrorResume, Retry (infinite & count-limited), OnErrorResumeAsFailure |
| Timing | Throttle, Delay, Timeout (with optional fallback) |
| Aggregation | AggregateAsync, CountAsync, LongCountAsync, AnyAsync, AllAsync, ContainsAsync, FirstAsync, FirstOrDefaultAsync, LastAsync, LastOrDefaultAsync, SingleAsync, SingleOrDefaultAsync, ToListAsync, ToDictionaryAsync |
| Side Effects | Do (sync & async), OnDispose (sync & async), Using |
| Multicasting | Publish, StatelessPublish, ReplayLatest, RefCount, Multicast |
| Scheduling | ObserveOn (AsyncContext / SynchronizationContext / TaskScheduler / IScheduler), Yield |
| Subscription | SubscribeAsync (multiple overloads), ForEachAsync, WaitCompletionAsync |
| Conversion | ToObservable (async → classic), ToObservableAsync (classic → async), ToAsyncEnumerable, Wrap |
| Subjects | SubjectAsync.Create, SubjectAsync.CreateBehavior, SubjectAsync.CreateReplayLatest |
| Disposables | DisposableAsync, CompositeDisposableAsync |
| Mixins | AsObserverAsync, MapValues, ToDisposableAsync |
| Type | Description |
|---|---|
IObservableAsync<T> |
Async push-based notification provider. SubscribeAsync returns ValueTask<IAsyncDisposable>. |
IObserverAsync<T> |
Receives async notifications: OnNextAsync, OnErrorResumeAsync, OnCompletedAsync. Implements IAsyncDisposable. |
ObservableAsync<T> |
Abstract base class for building custom async observables. Override SubscribeAsyncCore. |
ConnectableObservableAsync<T> |
Extends ObservableAsync<T> with ConnectAsync for deferred subscription to the underlying source. |
ISubjectAsync<T> |
Async subject: exposes Values (IObservableAsync<T>) and publishing methods (OnNextAsync, OnErrorResumeAsync, OnCompletedAsync). |
Result |
Completion token carrying either success or a failure Exception. Used with OnCompletedAsync. |
AsyncContext |
Wraps a SynchronizationContext or TaskScheduler for scheduling async observer notifications. |
// Implementing a custom async observable
public class TickObservable : ObservableAsync<int>
{
protected override async ValueTask<IAsyncDisposable> SubscribeAsyncCore(
IObserverAsync<int> observer, CancellationToken cancellationToken)
{
for (int i = 0; i < 5 && !cancellationToken.IsCancellationRequested; i++)
{
await observer.OnNextAsync(i, cancellationToken);
await Task.Delay(100, cancellationToken);
}
await observer.OnCompletedAsync(Result.Success);
return DisposableAsync.Empty;
}
}| Method | Description |
|---|---|
ObservableAsync.Create<T>(subscribe) |
Create from an async delegate that receives (observer, ct) and returns IAsyncDisposable. |
ObservableAsync.CreateAsBackgroundJob<T>(subscribe) |
Like Create, but the subscribe delegate runs as a background job, returning immediately. |
ObservableAsync.Return<T>(value) |
Emits a single value then completes. |
ObservableAsync.Empty<T>() |
Completes immediately without emitting any values. |
ObservableAsync.Never<T>() |
Never emits and never completes. |
ObservableAsync.Throw<T>(exception) |
Completes immediately with the specified error. |
ObservableAsync.Range(start, count) |
Emits a sequence of integers. |
ObservableAsync.Interval(period) |
Emits incrementing long values at a regular interval. Accepts optional TimeProvider. |
ObservableAsync.Timer(dueTime) |
Emits 0L after the specified delay. Supports one-shot and periodic overloads. Accepts optional TimeProvider. |
ObservableAsync.Defer<T>(factory) |
Defers creation of the observable until subscription time. |
ObservableAsync.FromAsync<T>(func) |
Wraps an async function as a single-element observable. |
task.ToObservableAsync() |
Converts a Task<T> to an async observable. |
asyncEnumerable.ToObservableAsync() |
Converts an IAsyncEnumerable<T> to an async observable. |
enumerable.ToObservableAsync() |
Converts an IEnumerable<T> to an async observable. |
// Create from factory delegate
var custom = ObservableAsync.Create<string>(async (observer, ct) =>
{
await observer.OnNextAsync("Hello", ct);
await observer.OnNextAsync("World", ct);
await observer.OnCompletedAsync(Result.Success);
return DisposableAsync.Empty;
});
// From a Task
var fromTask = Task.FromResult(42).ToObservableAsync();
// From IAsyncEnumerable
async IAsyncEnumerable<int> GenerateAsync()
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(50);
yield return i;
}
}
var fromAsyncEnum = GenerateAsync().ToObservableAsync();
// Periodic timer (with TimeProvider support)
var ticks = ObservableAsync.Interval(TimeSpan.FromSeconds(1));
// Deferred creation
var deferred = ObservableAsync.Defer<int>(() =>
ObservableAsync.Return(DateTime.Now.Second));| Operator | Description |
|---|---|
Select(selector) |
Projects each element using a synchronous transform function. |
Select(asyncSelector) |
Projects each element using an async transform function. |
SelectMany(selector) |
Flattens nested async observables produced by a projection. Multiple overloads for collection selectors. |
Scan(seed, accumulator) |
Applies an accumulator over the sequence, emitting each intermediate result. Sync and async overloads. |
Cast<TResult>() |
Casts each element to the specified type. |
OfType<TResult>() |
Filters elements that are assignable to the specified type. |
GroupBy(keySelector) |
Groups elements by key, emitting GroupedAsyncObservable<TKey, T> per group. |
var source = ObservableAsync.Range(1, 5);
// Sync projection
var doubled = source.Select(x => x * 2);
// Async projection
var projected = source.Select(async (x, ct) =>
{
await Task.Delay(10, ct);
return x.ToString();
});
// Flat map: each item produces a sub-sequence
var flattened = source.SelectMany(x => ObservableAsync.Range(x * 10, 3));
// Running total with Scan
var runningTotal = source.Scan(0, (acc, x) => acc + x); // 1, 3, 6, 10, 15
// Group by even/odd
var grouped = source.GroupBy(x => x % 2 == 0 ? "even" : "odd");| Operator | Description |
|---|---|
Where(predicate) |
Filters elements using a synchronous predicate. |
Where(asyncPredicate) |
Filters elements using an async predicate. |
Take(count) |
Takes the first N elements then completes. |
Skip(count) |
Skips the first N elements. |
TakeWhile(predicate) |
Takes elements while the predicate holds, then completes. |
SkipWhile(predicate) |
Skips elements while the predicate holds, then emits the rest. |
TakeUntil(other) |
Takes elements until another async observable, Task, CancellationToken, or predicate signals. |
Distinct() |
Emits only elements not previously seen. |
DistinctBy(keySelector) |
Emits only elements with unique keys. |
DistinctUntilChanged() |
Suppresses consecutive duplicate elements. |
DistinctUntilChangedBy(keySelector) |
Suppresses consecutive elements with the same key. |
var source = ObservableAsync.Range(1, 10);
// Basic filter
var evens = source.Where(x => x % 2 == 0); // 2, 4, 6, 8, 10
// Async filter
var asyncFiltered = source.Where(async (x, ct) =>
{
await Task.Delay(1, ct);
return x > 5;
});
// Take and Skip
var firstThree = source.Take(3); // 1, 2, 3
var skipThree = source.Skip(3); // 4, 5, 6, 7, 8, 9, 10
// TakeWhile / SkipWhile
var whileLow = source.TakeWhile(x => x < 4); // 1, 2, 3
var afterLow = source.SkipWhile(x => x < 4); // 4, 5, 6, 7, 8, 9, 10
// TakeUntil with a cancellation token
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var bounded = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100))
.TakeUntil(cts.Token);
// Distinct and DistinctUntilChanged
var items = new[] { 1, 2, 2, 3, 1, 3 }.ToObservableAsync();
var unique = items.Distinct(); // 1, 2, 3
var noConsecDups = items.DistinctUntilChanged(); // 1, 2, 3, 1, 3| Operator | Description |
|---|---|
Merge(sources) |
Merges multiple async observables, interleaving values as they arrive. Supports max concurrency. |
Concat(sources) |
Concatenates multiple async observables sequentially – subscribes to the next only after the previous completes. |
Switch() |
Flattens a higher-order async observable by always switching to the most recent inner sequence. |
CombineLatest(other, selector) |
Combines the latest values from 2–8 sources whenever any source emits. |
Zip(other) |
Pairs elements from two sources by index. Optional result selector. |
Prepend(value) |
Prepends a single value before the source sequence. |
StartWith(values) |
Prepends one or more values before the source sequence. |
var a = ObservableAsync.Range(1, 3); // 1, 2, 3
var b = ObservableAsync.Range(10, 3); // 10, 11, 12
// Merge: interleave two streams
var merged = ObservableAsync.Merge(a, b);
// Concat: sequential
var sequential = ObservableAsync.Concat(a, b); // 1, 2, 3, 10, 11, 12
// CombineLatest
var combined = a.CombineLatest(b, (x, y) => $"{x}+{y}");
// Zip by index
var zipped = a.Zip(b, (x, y) => x + y); // 11, 13, 15
// Prepend / StartWith
var withPrefix = a.Prepend(0); // 0, 1, 2, 3
var withMany = a.StartWith([-2, -1, 0]); // -2, -1, 0, 1, 2, 3| Operator | Description |
|---|---|
Catch(handler) |
Catches errors and switches to a fallback async observable. |
CatchAndIgnoreErrorResume() |
Catches error-resume notifications and suppresses them, allowing the sequence to continue. |
Retry() |
Re-subscribes to the source indefinitely on error. |
Retry(count) |
Re-subscribes up to count times on error. |
OnErrorResumeAsFailure() |
Converts OnErrorResumeAsync calls into OnCompletedAsync with a Result.Failure. |
// Catch and switch to fallback
var flaky = ObservableAsync.Throw<int>(new InvalidOperationException("Oops"));
var safe = flaky.Catch(ex => ObservableAsync.Return(-1)); // emits -1
// Retry up to 3 times
var retried = flaky.Retry(3);
// Retry indefinitely
var forever = flaky.Retry();
// Convert error-resume to failure completion
var strict = someSource.OnErrorResumeAsFailure();| Operator | Description |
|---|---|
Throttle(timeSpan) |
Suppresses values that arrive within the specified window after the last emission. Accepts optional TimeProvider. |
Delay(timeSpan) |
Delays every emission by the specified duration. Accepts optional TimeProvider. |
Timeout(timeSpan) |
Raises a TimeoutException if no value arrives within the specified window. |
Timeout(timeSpan, fallback) |
Switches to a fallback source on timeout instead of faulting. |
ObserveOn(context) |
Shifts observer notifications to the specified AsyncContext, SynchronizationContext, TaskScheduler, or IScheduler. |
Yield() |
Yields control (via Task.Yield) between each notification, ensuring other work can interleave. |
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(50)).Take(10);
// Throttle rapid emissions (debounce)
var throttled = source.Throttle(TimeSpan.FromMilliseconds(200));
// Delay each value by 100ms
var delayed = source.Delay(TimeSpan.FromMilliseconds(100));
// Timeout if nothing arrives in 2 seconds
var guarded = source.Timeout(TimeSpan.FromSeconds(2));
// Timeout with fallback
var withFallback = source.Timeout(
TimeSpan.FromSeconds(2),
ObservableAsync.Return(999L));
// Schedule notifications onto a specific context
var onContext = source.ObserveOn(new AsyncContext(SynchronizationContext.Current!));
// Yield between each notification
var yielded = source.Yield();| Operator | Description |
|---|---|
AggregateAsync(seed, accumulator) |
Applies an accumulator over the sequence and returns the final result. |
CountAsync() |
Returns the number of elements. |
LongCountAsync() |
Returns the element count as long. |
AnyAsync() / AnyAsync(predicate) |
Returns true if the sequence has any elements (optionally matching a predicate). |
AllAsync(predicate) |
Returns true if all elements match the predicate. |
ContainsAsync(value) |
Returns true if the sequence contains the specified value. |
FirstAsync() |
Returns the first element or throws if empty. |
FirstOrDefaultAsync() |
Returns the first element or default if empty. |
LastAsync() |
Returns the last element or throws if empty. |
LastOrDefaultAsync() |
Returns the last element or default if empty. |
SingleAsync() |
Returns the only element; throws if empty or more than one. |
SingleOrDefaultAsync() |
Returns the only element or default; throws if more than one. |
ToListAsync() |
Collects all elements into a List<T>. |
ToDictionaryAsync(keySelector) |
Collects all elements into a Dictionary<TKey, T>. |
ForEachAsync(action) |
Invokes an async action for each element. |
WaitCompletionAsync() |
Awaits the completion of the observable without capturing values. |
var source = ObservableAsync.Range(1, 5);
var ct = CancellationToken.None;
int count = await source.CountAsync(ct); // 5
int sum = await source.AggregateAsync(0, (a, x) => a + x, ct); // 15
bool hasEvens = await source.AnyAsync(x => x % 2 == 0, ct); // true
bool allPos = await source.AllAsync(x => x > 0, ct); // true
bool has3 = await source.ContainsAsync(3, ct); // true
int first = await source.FirstAsync(ct); // 1
int last = await source.LastAsync(ct); // 5
List<int> all = await source.ToListAsync(ct); // [1, 2, 3, 4, 5]
var dict = await source.ToDictionaryAsync(x => x.ToString(), ct); // {"1":1, "2":2, ...}
// Iterate with async action
await source.ForEachAsync(async (x, ct2) =>
Console.WriteLine($"Item: {x}"), ct);
// Wait for completion without capturing values
await source.WaitCompletionAsync(ct);| Operator | Description |
|---|---|
Do(onNext) |
Performs a synchronous side effect for each element without altering the sequence. |
Do(asyncOnNext) |
Performs an async side effect for each element. |
OnDispose(action) |
Registers a synchronous action to run when the subscription is disposed. |
OnDispose(asyncAction) |
Registers an async action to run on disposal. |
Using(resourceFactory, observableFactory) |
Creates a disposable resource tied to the subscription lifetime. |
var source = ObservableAsync.Range(1, 3);
// Sync side effect
var logged = source.Do(x => Console.WriteLine($"[LOG] {x}"));
// Async side effect
var asyncLogged = source.Do(async (x, ct) =>
{
await Task.Delay(1, ct);
Console.WriteLine($"[ASYNC LOG] {x}");
});
// Cleanup on disposal
var withCleanup = source.OnDispose(() => Console.WriteLine("Cleaned up!"));
// Async cleanup
var withAsyncCleanup = source.OnDispose(async () =>
{
await Task.Delay(10);
Console.WriteLine("Async cleanup done!");
});
// Using: tie a resource to the subscription lifetime
var withResource = ObservableAsync.Using(
() => new MemoryStream(),
stream => ObservableAsync.Return(stream.Length));| Operator | Description |
|---|---|
Publish() |
Returns a ConnectableObservableAsync<T> that multicasts the source to multiple observers using a serial subject. |
StatelessPublish() |
Returns a connectable observable using a stateless serial subject. |
ReplayLatest() |
Returns a connectable observable that replays the most recent value to new subscribers. |
RefCount() |
Automatically connects a connectable observable when the first observer subscribes and disconnects when the last unsubscribes. |
Multicast(subjectFactory) |
General-purpose multicasting with a custom subject factory. |
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100)).Take(5);
// Publish + explicit connect
var published = source.Publish();
await using var sub1 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub1: {v}"), CancellationToken.None);
await using var sub2 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub2: {v}"), CancellationToken.None);
await using var connection = await published.ConnectAsync(CancellationToken.None);
// RefCount: auto-connect on first subscriber, auto-disconnect on last
var shared = source.Publish().RefCount();
await using var sub3 = await shared.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Shared: {v}"), CancellationToken.None);
// ReplayLatest: new subscribers get the most recent value immediately
var replayed = source.ReplayLatest().RefCount();| Method | Description |
|---|---|
observable.ToObservableAsync() |
Wraps a classic IObservable<T> as IObservableAsync<T>. Synchronous notifications are forwarded sequentially through async callbacks. |
asyncObservable.ToObservable() |
Exposes an IObservableAsync<T> as a classic IObservable<T>. Async callbacks are awaited; the synchronous observer is notified on the completing thread. |
asyncObservable.ToAsyncEnumerable() |
Converts an async observable to IAsyncEnumerable<T> for consumption with await foreach. |
source.Wrap(converter) |
Wraps each value using a converter function producing a new async observable type. |
// Classic IObservable<T> → IObservableAsync<T>
IObservable<int> classic = Observable.Range(1, 5);
IObservableAsync<int> async1 = classic.ToObservableAsync();
// IObservableAsync<T> → classic IObservable<T>
IObservable<int> backToClassic = async1.ToObservable();
// IObservableAsync<T> → IAsyncEnumerable<T>
var asyncEnum = ObservableAsync.Range(1, 5).ToAsyncEnumerable();
await foreach (var item in asyncEnum)
{
Console.WriteLine(item);
}Subjects act as both an observer and an observable, enabling manual publishing of values to multiple subscribers.
| Factory Method | Description |
|---|---|
SubjectAsync.Create<T>() |
Creates a subject with default options (serial publishing, stateful). |
SubjectAsync.Create<T>(options) |
Creates a subject with configurable publishing (Serial / Concurrent) and statefulness. |
SubjectAsync.CreateBehavior<T>(startValue) |
Creates a behavior subject that replays the latest value (starting with startValue) to new subscribers. |
SubjectAsync.CreateReplayLatest<T>() |
Creates a subject that replays only the most recent value to new subscribers (no initial value required). |
Publishing options:
| Option | Behavior |
|---|---|
PublishingOption.Serial |
Notifications are delivered to observers one at a time. Safe for single-producer scenarios. |
PublishingOption.Concurrent |
Notifications are delivered concurrently to all observers. Useful when observers are independent. |
// Basic subject
var subject = SubjectAsync.Create<int>();
await using var sub = await subject.Values.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Got: {value}"),
CancellationToken.None);
await subject.OnNextAsync(1, CancellationToken.None);
await subject.OnNextAsync(2, CancellationToken.None);
await subject.OnCompletedAsync(Result.Success);
// Behavior subject: new subscribers receive the current value
var behavior = SubjectAsync.CreateBehavior("initial");
await using var behaviorSub = await behavior.Values.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Behavior: {value}"), // prints "initial"
CancellationToken.None);
await behavior.OnNextAsync("updated", CancellationToken.None); // prints "updated"
// Concurrent publishing for fan-out scenarios
var concurrent = SubjectAsync.Create<int>(new SubjectCreationOptions
{
PublishingOption = PublishingOption.Concurrent,
IsStateless = false
});
// ReplayLatest: replays the last value without a required start value
var replay = SubjectAsync.CreateReplayLatest<double>();
await replay.OnNextAsync(3.14, CancellationToken.None);
// Late subscriber still receives 3.14
await using var lateSub = await replay.Values.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Replay: {v}"),
CancellationToken.None);| Type | Description |
|---|---|
DisposableAsync.Empty |
A no-op IAsyncDisposable – useful as a default or placeholder. |
DisposableAsync.Create(func) |
Creates an IAsyncDisposable from a Func<ValueTask> delegate. Ensures the delegate runs at most once. |
CompositeDisposableAsync |
A thread-safe collection of IAsyncDisposable objects that are disposed together. Supports Add, Remove, and bulk disposal. |
// Create from a delegate
var disposable = DisposableAsync.Create(async () =>
{
await Task.Delay(10);
Console.WriteLine("Resource released");
});
await disposable.DisposeAsync();
// Compose multiple async disposables
var composite = new CompositeDisposableAsync();
composite.Add(DisposableAsync.Create(() => { Console.WriteLine("A"); return default; }));
composite.Add(DisposableAsync.Create(() => { Console.WriteLine("B"); return default; }));
Console.WriteLine($"Count: {composite.Count}"); // 2
await composite.DisposeAsync(); // disposes A and B
Console.WriteLine($"IsDisposed: {composite.IsDisposed}"); // trueFastForEachpath avoids iterator allocations forList<T>,IList<T>, and arrays.SyncTimerensures only one shared timer per period reducing timer overhead.Conflatehelps tame high–frequency producers without dropping the final value of a burst.HeartbeatandDetectStaleuse lightweight scheduling primitives.- Most operators avoid capturing lambdas in hot loops where practical.
- Async observable operators use
ValueTaskthroughout to minimize allocations on the hot path. ObserverAsync<T>base class detects concurrent re-entrant calls, raisingConcurrentObserverCallsExceptionin debug scenarios.- Subject variants (serial vs. concurrent, stateful vs. stateless) let you choose the right trade-off between safety and throughput.
- All operators are pure functional transformations unless documented otherwise.
SyncTimeruses aConcurrentDictionaryand returns a hotIConnectableObservablethat connects once per uniqueTimeSpan.- Methods returning shared observables (
SyncTimer,Partitionresult sequences) are safe for multi-subscriber usage unless the upstream is inherently side-effecting. ISubjectAsync<T>subjects withPublishingOption.Serialserialize notifications;PublishingOption.Concurrentvariants deliver to all observers concurrently.CompositeDisposableAsyncis thread-safe for concurrentAdd/Remove/DisposeAsynccalls.
MIT – see LICENSE file.
Issues / PRs welcome. Please keep additions dependency–free and focused on broadly useful reactive patterns.
(Keep this section updated as the library evolves.)
- Added async task projection helpers (
SelectAsyncSequential,SelectLatestAsync,SelectAsyncConcurrent). - Added liveness operators (
Heartbeat,DetectStale,BufferUntilInactive). - Added resilience (
RetryWithBackoff, expandedOnErrorRetryoverloads). - Added flow control (
Conflate,ThrottleFirst,DebounceImmediate,ThrottleDistinct). - Added buffering and transformation operators (
BufferUntilIdle,Pairwise,ScanWithInitial). - Added filtering and conditional operators (
SampleLatest,SwitchIfEmpty,DropIfBusy). - Added utility operators (
ToReadOnlyBehavior,ToHotTask,ToPropertyObservable). - Fixed
SynchronizeSynchronousto properly propagate OnError and OnCompleted events. - Removed DisposeWith extension use System.Reactive.Disposables.Fluent from System.Reactive.
- Added fully async-native observable framework (
IObservableAsync<T>,IObserverAsync<T>) with 60+ operators, subjects, bridge extensions, and async disposables.
Happy reactive coding!