diff --git a/src/DynamicData/Cache/ObservableCacheEx.Adapt.cs b/src/DynamicData/Cache/ObservableCacheEx.Adapt.cs
new file mode 100644
index 00000000..1e895a26
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Adapt.cs
@@ -0,0 +1,69 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// ObservableCache extensions for Adapt.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Injects a side effect into the changeset stream by calling .
+ /// for every changeset, then forwarding it downstream unchanged.
+ ///
+ /// The type of items in the cache.
+ /// The type of the key.
+ /// The source to observe and adapt.
+ /// The whose Adapt method is called for each changeset.
+ /// An observable that emits the same changesets as , after the adaptor has processed each one.
+ ///
+ ///
+ /// This is a thin wrapper around Rx's Do operator. The adaptor receives each changeset
+ /// as a side effect; the changeset itself is forwarded downstream unmodified.
+ ///
+ ///
+ /// or is .
+ ///
+ ///
+ public static IObservable> Adapt(this IObservable> source, IChangeSetAdaptor adaptor)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ adaptor.ThrowArgumentNullExceptionIfNull(nameof(adaptor));
+
+ return source.Do(adaptor.Adapt);
+ }
+
+ ///
+ /// The source to observe and adapt.
+ /// The whose Adapt method is called for each changeset.
+ /// This overload operates on . Delegates to Rx's Do operator.
+ public static IObservable> Adapt(this IObservable> source, ISortedChangeSetAdaptor adaptor)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ adaptor.ThrowArgumentNullExceptionIfNull(nameof(adaptor));
+
+ return source.Do(adaptor.Adapt);
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.AutoRefresh.cs b/src/DynamicData/Cache/ObservableCacheEx.AutoRefresh.cs
new file mode 100644
index 00000000..4633ce84
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.AutoRefresh.cs
@@ -0,0 +1,130 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// ObservableCache extensions for AutoRefresh.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Automatically refresh downstream operators when any properties change.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The source to monitor for property-driven refresh signals.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements have successive property changes.
+ /// An optional throttle applied to each item's property change notifications, preventing excessive refresh invocations.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ ///
+ public static IObservable> AutoRefresh(this IObservable> source, TimeSpan? changeSetBuffer = null, TimeSpan? propertyChangeThrottle = null, IScheduler? scheduler = null)
+ where TObject : INotifyPropertyChanged
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.AutoRefreshOnObservable(
+ (t, _) =>
+ {
+ if (propertyChangeThrottle is null)
+ {
+ return t.WhenAnyPropertyChanged();
+ }
+
+ return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
+ },
+ changeSetBuffer,
+ scheduler);
+ }
+
+ ///
+ /// Automatically refresh downstream operators when properties change.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The type of the property.
+ /// The source to monitor for property-driven refresh signals.
+ /// A that specify a property to observe changes. When it changes a Refresh is invoked.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements have successive property changes.
+ /// An optional throttle applied to each item's property change notifications, preventing excessive refresh invocations.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ public static IObservable> AutoRefresh(this IObservable> source, Expression> propertyAccessor, TimeSpan? changeSetBuffer = null, TimeSpan? propertyChangeThrottle = null, IScheduler? scheduler = null)
+ where TObject : INotifyPropertyChanged
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.AutoRefreshOnObservable(
+ (t, _) =>
+ {
+ if (propertyChangeThrottle is null)
+ {
+ return t.WhenPropertyChanged(propertyAccessor, false);
+ }
+
+ return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
+ },
+ changeSetBuffer,
+ scheduler);
+ }
+
+ ///
+ /// Automatically refresh downstream operator. The refresh is triggered when the observable receives a notification.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The type of evaluation.
+ /// The source to monitor for observable-driven refresh signals.
+ /// The observable which acts on items within the collection and produces a value when the item should be refreshed.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements require a refresh.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ ///
+ public static IObservable> AutoRefreshOnObservable(this IObservable> source, Func> reevaluator, TimeSpan? changeSetBuffer = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => source.AutoRefreshOnObservable((t, _) => reevaluator(t), changeSetBuffer, scheduler);
+
+ ///
+ /// Automatically refresh downstream operator. The refresh is triggered when the observable receives a notification.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The type of evaluation.
+ /// The source to monitor for observable-driven refresh signals.
+ /// The observable which acts on items within the collection and produces a value when the item should be refreshed.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements require a refresh.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ ///
+ /// Worth noting: Per-item observable errors are silently ignored (not forwarded to the downstream observer). Only source stream errors propagate.
+ ///
+ public static IObservable> AutoRefreshOnObservable(this IObservable> source, Func> reevaluator, TimeSpan? changeSetBuffer = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ reevaluator.ThrowArgumentNullExceptionIfNull(nameof(reevaluator));
+
+ return new AutoRefresh(source, reevaluator, changeSetBuffer, scheduler).Run();
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Batch.cs b/src/DynamicData/Cache/ObservableCacheEx.Batch.cs
new file mode 100644
index 00000000..316933b0
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Batch.cs
@@ -0,0 +1,134 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// ObservableCache extensions for Batch and BatchIf.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Collects changesets emitted within a time window and merges them into a single changeset.
+ /// Uses Rx's Buffer operator followed by .
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to batch.
+ /// The time window for batching.
+ /// The scheduler for timing. Defaults to .
+ /// An observable that emits merged changesets, one per time window.
+ ///
+ ///
+ /// All changesets received during the time window are concatenated into a single changeset.
+ /// This is useful for reducing UI update frequency when the source emits many rapid changes.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddBuffered and included in the merged changeset at the end of the time window.
+ /// - UpdateBuffered and included in the merged changeset.
+ /// - RemoveBuffered and included in the merged changeset.
+ /// - RefreshBuffered and included in the merged changeset.
+ /// - OnCompletedAny remaining buffered changes are flushed, then completion is forwarded.
+ ///
+ /// Worth noting: The merged changeset may contain contradictory changes (e.g., Add then Remove for the same key). Downstream operators handle this correctly, but raw inspection of the changeset may be surprising.
+ ///
+ /// is .
+ ///
+ ///
+ public static IObservable> Batch(this IObservable> source, TimeSpan timeSpan, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.Buffer(timeSpan, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult();
+ }
+
+ ///
+ /// This overload delegates to the primary overload with initialPauseState: false.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => BatchIf(source, pauseIfTrueSelector, false, scheduler);
+
+ ///
+ /// This overload delegates to the primary overload with default initialPauseState: false.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => new BatchIf(source, pauseIfTrueSelector, null, initialPauseState, scheduler: scheduler).Run();
+
+ ///
+ /// This overload omits initialPauseState (defaults to ) but accepts a timeout.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, TimeSpan? timeOut = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => BatchIf(source, pauseIfTrueSelector, false, timeOut, scheduler);
+
+ ///
+ /// Conditionally buffers changesets while a pause signal is active, then flushes all buffered
+ /// changes as a single merged changeset when the signal resumes.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to conditionally buffer.
+ /// An that when , buffering begins. When , the buffer is flushed.
+ /// If , starts in a paused (buffering) state.
+ /// A that maximum time the buffer stays open. When elapsed, the buffer is flushed regardless of pause state.
+ /// The for timeout timing.
+ /// An observable that emits changesets, buffered or passthrough depending on pause state.
+ ///
+ ///
+ /// While paused, incoming changesets are accumulated. On resume (or timeout), all buffered changesets
+ /// are merged into a single changeset and emitted. While not paused, changesets pass through immediately.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddBuffered while paused; forwarded immediately while active.
+ /// - UpdateBuffered while paused; forwarded immediately while active.
+ /// - RemoveBuffered while paused; forwarded immediately while active.
+ /// - RefreshBuffered while paused; forwarded immediately while active.
+ /// - OnErrorBuffered data is lost.
+ /// - OnCompletedAny remaining buffered data is flushed before completion.
+ ///
+ /// Worth noting: If the source completes while paused, buffered data IS flushed before OnCompleted. However, if the source errors while paused, buffered data is lost.
+ ///
+ /// or is .
+ ///
+ ///
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, TimeSpan? timeOut = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ pauseIfTrueSelector.ThrowArgumentNullExceptionIfNull(nameof(pauseIfTrueSelector));
+
+ return new BatchIf(source, pauseIfTrueSelector, timeOut, initialPauseState, scheduler: scheduler).Run();
+ }
+
+ ///
+ /// The source to conditionally buffer.
+ /// An that controls buffering: begins buffering, flushes the buffer.
+ /// If , starts in a paused (buffering) state.
+ /// An optional timer. The buffer is flushed each time the timer produces a value, and buffering ceases when it completes.
+ /// An optional for scheduling work.
+ /// This overload accepts an explicit timer observable instead of a timeout.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, IObservable? timer = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => new BatchIf(source, pauseIfTrueSelector, null, initialPauseState, timer, scheduler).Run();
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Bind.cs b/src/DynamicData/Cache/ObservableCacheEx.Bind.cs
new file mode 100644
index 00000000..1d94d768
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Bind.cs
@@ -0,0 +1,367 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// ObservableCache extensions for Bind.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The number of changes before a reset notification is triggered.
+ /// An observable which will emit change sets.
+ /// source.
+ ///
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, int refreshThreshold = BindingOptions.DefaultResetThreshold)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+
+ var options = refreshThreshold == BindingOptions.DefaultResetThreshold
+ ? defaults
+ : defaults with { ResetThreshold = refreshThreshold };
+
+ return source?.Bind(destination, new ObservableCollectionAdaptor(options)) ?? throw new ArgumentNullException(nameof(source));
+ }
+
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ return source?.Bind(destination, new ObservableCollectionAdaptor(options)) ?? throw new ArgumentNullException(nameof(source));
+ }
+
+ ///
+ /// Binds the results to the specified binding collection using the specified update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that applies changes to the bound collection.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, IObservableCollectionAdaptor updater)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+ updater.ThrowArgumentNullExceptionIfNull(nameof(updater));
+
+ return Observable.Create>(
+ observer =>
+ source.SynchronizeSafe(InternalEx.NewLock()).Select(
+ changes =>
+ {
+ updater.Adapt(changes, destination);
+ return changes;
+ }).SubscribeSafe(observer));
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ var target = new ObservableCollectionExtended();
+ readOnlyObservableCollection = new ReadOnlyObservableCollection(target);
+ return source.Bind(target, new ObservableCollectionAdaptor(options));
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The number of changes before a reset notification is triggered.
+ /// When , uses Replace instead of Remove/Add for updates in the bound collection. Not all platforms support replace notifications.
+ /// An optional that controls how the target collection is updated.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, int resetThreshold = BindingOptions.DefaultResetThreshold, bool useReplaceForUpdates = BindingOptions.DefaultUseReplaceForUpdates, IObservableCollectionAdaptor? adaptor = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ if (adaptor is not null)
+ {
+ var target = new ObservableCollectionExtended();
+ readOnlyObservableCollection = new ReadOnlyObservableCollection(target);
+ return source.Bind(target, adaptor);
+ }
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+
+ var options = resetThreshold == BindingOptions.DefaultResetThreshold && useReplaceForUpdates == BindingOptions.DefaultUseReplaceForUpdates
+ ? defaults
+ : defaults with { ResetThreshold = resetThreshold, UseReplaceForUpdates = useReplaceForUpdates };
+
+ return source.Bind(out readOnlyObservableCollection, options);
+ }
+
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ return source.Bind(destination, DynamicDataOptions.Binding);
+ }
+
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ var updater = new SortedObservableCollectionAdaptor(options);
+ return source.Bind(destination, updater);
+ }
+
+ ///
+ /// Binds the results to the specified binding collection using the specified update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that applies changes to the bound collection.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, ISortedObservableCollectionAdaptor updater)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+ updater.ThrowArgumentNullExceptionIfNull(nameof(updater));
+
+ return Observable.Create>(
+ observer =>
+ source.SynchronizeSafe(InternalEx.NewLock()).Select(
+ changes =>
+ {
+ updater.Adapt(changes, destination);
+ return changes;
+ }).SubscribeSafe(observer));
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ var target = new ObservableCollectionExtended();
+ var result = new ReadOnlyObservableCollection(target);
+ var updater = new SortedObservableCollectionAdaptor(options);
+ readOnlyObservableCollection = result;
+ return source.Bind(target, updater);
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The number of changes before a reset event is called on the observable collection.
+ /// When , uses Replace instead of Remove/Add for updates in the bound collection. Not all platforms support replace notifications.
+ /// An that specify an adaptor to change the algorithm to update the target collection.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, int resetThreshold = BindingOptions.DefaultResetThreshold, bool useReplaceForUpdates = BindingOptions.DefaultUseReplaceForUpdates, ISortedObservableCollectionAdaptor? adaptor = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+ var options = resetThreshold == BindingOptions.DefaultResetThreshold && useReplaceForUpdates == BindingOptions.DefaultUseReplaceForUpdates
+ ? defaults
+ : defaults with { ResetThreshold = resetThreshold, UseReplaceForUpdates = useReplaceForUpdates };
+
+ adaptor ??= new SortedObservableCollectionAdaptor(options);
+
+ var target = new ObservableCollectionExtended();
+ readOnlyObservableCollection = new ReadOnlyObservableCollection(target);
+ return source.Bind(target, adaptor);
+ }
+
+#if SUPPORTS_BINDINGLIST
+
+ ///
+ /// Binds a clone of the observable change set to the target observable collection.
+ ///
+ /// The object type.
+ /// The key type.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The reset threshold.
+ /// An observable which will emit change sets.
+ ///
+ /// source
+ /// or
+ /// targetCollection.
+ ///
+ public static IObservable> Bind<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TObject, TKey>(this IObservable> source, BindingList bindingList, int resetThreshold = BindingOptions.DefaultResetThreshold)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ bindingList.ThrowArgumentNullExceptionIfNull(nameof(bindingList));
+
+ return source.Adapt(new BindingListAdaptor(bindingList, resetThreshold));
+ }
+
+ ///
+ /// Binds a clone of the observable change set to the target observable collection.
+ ///
+ /// The object type.
+ /// The key type.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The reset threshold.
+ /// An observable which will emit change sets.
+ ///
+ /// source
+ /// or
+ /// targetCollection.
+ ///
+ public static IObservable> Bind<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TObject, TKey>(this IObservable> source, BindingList bindingList, int resetThreshold = BindingOptions.DefaultResetThreshold)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ bindingList.ThrowArgumentNullExceptionIfNull(nameof(bindingList));
+
+ return source.Adapt(new SortedBindingListAdaptor(bindingList, resetThreshold));
+ }
+
+#endif
+
+ ///
+ /// Converts moves changes to remove + add.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to convert move events into remove/add pairs.
+ /// the same SortedChangeSets, except all moves are replaced with remove + add.
+ public static IObservable> TreatMovesAsRemoveAdd(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ static IEnumerable> ReplaceMoves(IChangeSet items)
+ {
+ foreach (var change in items.ToConcreteType())
+ {
+ if (change.Reason == ChangeReason.Moved)
+ {
+ yield return new Change(ChangeReason.Remove, change.Key, change.Current, change.PreviousIndex);
+
+ yield return new Change(ChangeReason.Add, change.Key, change.Current, change.CurrentIndex);
+ }
+ else
+ {
+ yield return change;
+ }
+ }
+ }
+
+ return source.Select(changes => new SortedChangeSet(changes.SortedItems, ReplaceMoves(changes)));
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.ChangeStream.cs b/src/DynamicData/Cache/ObservableCacheEx.ChangeStream.cs
new file mode 100644
index 00000000..f045dcca
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.ChangeStream.cs
@@ -0,0 +1,208 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// ObservableCache extensions for changeset stream lifecycle helpers.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Buffers the initial burst of changesets for the specified duration, merges them into a single
+ /// changeset, then passes all subsequent changesets through without buffering.
+ ///
+ /// The object type.
+ /// The type of the key.
+ /// The source to buffer during the initial loading period.
+ /// The time window to buffer, measured from when the first changeset arrives.
+ /// The scheduler for timing. Defaults to .
+ /// An observable that emits one merged changeset for the initial burst, then passthrough for the rest.
+ ///
+ ///
+ /// Useful for aggregating the initial snapshot (which may arrive as many small changesets) into a
+ /// single changeset for efficient downstream processing, while leaving subsequent live updates untouched.
+ ///
+ /// Internally uses , Rx Buffer, and .
+ ///
+ ///
+ ///
+ public static IObservable> BufferInitial(this IObservable> source, TimeSpan initialBuffer, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => source.DeferUntilLoaded().Publish(
+ shared =>
+ {
+ var initial = shared.Buffer(initialBuffer, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult().Take(1);
+
+ return initial.Concat(shared);
+ });
+
+ ///
+ /// Suppresses all emissions until the first non-empty changeset arrives, then replays that changeset and all subsequent ones.
+ /// If the source never produces a non-empty changeset, the stream waits indefinitely.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to defer until the first changeset arrives.
+ /// An observable that begins emitting changesets once the first non-empty changeset is received.
+ ///
+ /// Worth noting: Blocks indefinitely if the cache or stream never receives any data. Ensure the source will eventually emit at least one changeset.
+ ///
+ ///
+ public static IObservable> DeferUntilLoaded(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DeferUntilLoaded(source).Run();
+ }
+
+ ///
+ public static IObservable> DeferUntilLoaded(this IObservableCache source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DeferUntilLoaded(source).Run();
+ }
+
+ ///
+ /// Skips the initial snapshot changeset that Connect() typically emits, then forwards all subsequent changesets.
+ /// Internally uses DeferUntilLoaded().Skip(1).
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to skip the initial changeset.
+ /// An observable that skips the first changeset and forwards all others.
+ /// is .
+ ///
+ ///
+ public static IObservable> SkipInitial(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.DeferUntilLoaded().Skip(1);
+ }
+
+ ///
+ /// Prepends an empty changeset to the source stream, ensuring subscribers always receive an immediate
+ /// (empty) notification on subscription. Uses Rx's StartWith.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to prepend an empty changeset to.
+ /// An observable that emits an empty changeset first, then all source changesets.
+ ///
+ public static IObservable> StartWithEmpty(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull => source.StartWith(ChangeSet.Empty);
+
+ ///
+ /// The source to prepend an empty changeset to.
+ /// An observable that emits an empty sorted changeset first, then all source changesets.
+ /// Overload for .
+ public static IObservable> StartWithEmpty(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull => source.StartWith(SortedChangeSet.Empty);
+
+ ///
+ /// The source to prepend an empty changeset to.
+ /// An observable that emits an empty virtual changeset first, then all source changesets.
+ /// Overload for .
+ public static IObservable> StartWithEmpty(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull => source.StartWith(VirtualChangeSet.Empty);
+
+ ///
+ /// The source to prepend an empty changeset to.
+ /// An observable that emits an empty paged changeset first, then all source changesets.
+ /// Overload for .
+ public static IObservable> StartWithEmpty(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull => source.StartWith(PagedChangeSet.Empty);
+
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The grouping key type.
+ /// The source to prepend an empty changeset to.
+ /// An observable that emits an empty group changeset first, then all source changesets.
+ /// Overload for .
+ public static IObservable> StartWithEmpty(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull
+ where TGroupKey : notnull => source.StartWith(GroupChangeSet.Empty);
+
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The grouping key type.
+ /// The source to prepend an empty changeset to.
+ /// An observable that emits an empty immutable group changeset first, then all source changesets.
+ /// Overload for .
+ public static IObservable> StartWithEmpty(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull
+ where TGroupKey : notnull => source.StartWith(ImmutableGroupChangeSet.Empty);
+
+ ///
+ /// The type of the item.
+ /// The source of to prepend an empty changeset to.
+ /// An observable that emits an empty collection first, then all source collections.
+ /// Overload for .
+ public static IObservable> StartWithEmpty(this IObservable> source) => source.StartWith(ReadOnlyCollectionLight.Empty);
+
+ ///
+ /// The source to prepend an initial item to.
+ /// The item to prepend. The key is extracted from .
+ /// Overload for items that implement . Delegates to the explicit key overload.
+ public static IObservable> StartWithItem(this IObservable> source, TObject item)
+ where TObject : IKey
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.StartWithItem(item, item.Key);
+ }
+
+ ///
+ /// Prepends a changeset containing a single Add for the given item and key to the source stream.
+ /// The Rx equivalent of StartWith, but wrapped as a DynamicData changeset.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to prepend an initial item to.
+ /// The item to prepend.
+ /// The key for the item.
+ /// An observable that emits a single-item Add changeset first, then all source changesets.
+ public static IObservable> StartWithItem(this IObservable> source, TObject item, TKey key)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ var change = new Change(ChangeReason.Add, key, item);
+ return source.StartWith(new ChangeSet { change });
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Combinators.cs b/src/DynamicData/Cache/ObservableCacheEx.Combinators.cs
new file mode 100644
index 00000000..2ab95df5
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Combinators.cs
@@ -0,0 +1,549 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// ObservableCache extensions for set-style combinators (And, Or, Xor, Except).
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Applied a logical And operator between the collections i.e items which are in all of the
+ /// sources are included.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to combine.
+ /// The additional streams to combine with.
+ /// An observable which emits change sets.
+ /// source or others.
+ ///
+ public static IObservable> And(this IObservable> source, params IObservable>[] others)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return others is null || others.Length == 0
+ ? throw new ArgumentNullException(nameof(others))
+ : source.Combine(CombineOperator.And, others);
+ }
+
+ ///
+ /// Applied a logical And operator between the collections i.e items which are in all of the sources are included.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of streams to combine.
+ /// An observable which emits change sets.
+ ///
+ /// source
+ /// or
+ /// others.
+ ///
+ public static IObservable> And(this ICollection>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ ///
+ /// Dynamically apply a logical And operator between the items in the outer observable list.
+ /// Items which are in all of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> And(this IObservableList>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ ///
+ /// Dynamically apply a logical And operator between the items in the outer observable list.
+ /// Items which are in all of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> And(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ ///
+ /// Dynamically apply a logical And operator between the items in the outer observable list.
+ /// Items which are in all of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> And(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ private static IObservable> Combine(this IObservableList> source, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return Observable.Create>(
+ observer =>
+ {
+ var connections = source.Connect().Transform(x => x.Connect()).AsObservableList();
+ var subscriber = connections.Combine(type).SubscribeSafe(observer);
+ return new CompositeDisposable(connections, subscriber);
+ });
+ }
+
+ private static IObservable> Combine(this IObservableList> source, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return Observable.Create>(
+ observer =>
+ {
+ var connections = source.Connect().Transform(x => x.Connect()).AsObservableList();
+ var subscriber = connections.Combine(type).SubscribeSafe(observer);
+ return new CompositeDisposable(connections, subscriber);
+ });
+ }
+
+ private static IObservable> Combine(this IObservableList>> source, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DynamicCombiner(source, type).Run();
+ }
+
+ private static IObservable> Combine(this ICollection>> sources, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return Observable.Create>(
+ observer =>
+ {
+ void UpdateAction(IChangeSet updates)
+ {
+ try
+ {
+ observer.OnNext(updates);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ }
+ }
+
+ var subscriber = Disposable.Empty;
+ try
+ {
+ var combiner = new Combiner(type, UpdateAction);
+ subscriber = combiner.Subscribe([.. sources]);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ observer.OnCompleted();
+ }
+
+ return subscriber;
+ });
+ }
+
+ private static IObservable> Combine(this IObservable> source, CombineOperator type, params IObservable>[] combineTarget)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ combineTarget.ThrowArgumentNullExceptionIfNull(nameof(combineTarget));
+
+ return Observable.Create>(
+ observer =>
+ {
+ void UpdateAction(IChangeSet updates)
+ {
+ try
+ {
+ observer.OnNext(updates);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ observer.OnCompleted();
+ }
+ }
+
+ var subscriber = Disposable.Empty;
+ try
+ {
+ var list = combineTarget.ToList();
+ list.Insert(0, source);
+
+ var combiner = new Combiner(type, UpdateAction);
+ subscriber = combiner.Subscribe([.. list]);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ observer.OnCompleted();
+ }
+
+ return subscriber;
+ });
+ }
+
+ ///
+ /// Dynamically apply a logical Except operator between the collections
+ /// Items from the first collection in the outer list are included unless contained in any of the other lists.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to combine.
+ /// The additional streams to combine with.
+ /// An observable which emits change sets.
+ ///
+ /// source
+ /// or
+ /// others.
+ ///
+ ///
+ public static IObservable> Except(this IObservable> source, params IObservable>[] others)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ if (others is null || others.Length == 0)
+ {
+ throw new ArgumentNullException(nameof(others));
+ }
+
+ return source.Combine(CombineOperator.Except, others);
+ }
+
+ ///
+ /// Dynamically apply a logical Except operator between the collections
+ /// Items from the first collection in the outer list are included unless contained in any of the other lists.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of streams to combine.
+ /// An observable which emits change sets.
+ ///
+ /// source
+ /// or
+ /// others.
+ ///
+ public static IObservable> Except(this ICollection>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Except);
+ }
+
+ ///
+ /// Dynamically apply a logical Except operator between the collections
+ /// Items from the first collection in the outer list are included unless contained in any of the other lists.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> Except(this IObservableList>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Except);
+ }
+
+ ///
+ /// Dynamically apply a logical Except operator between the items in the outer observable list.
+ /// Items which are in any of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> Except(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Except);
+ }
+
+ ///
+ /// Dynamically apply a logical Except operator between the items in the outer observable list.
+ /// Items which are in any of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> Except(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Except);
+ }
+
+ ///
+ /// Combines multiple changeset streams using logical OR (union). An item appears downstream if it exists in any source.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to combine.
+ /// The additional streams to combine with.
+ /// A changeset stream containing items present in any of the sources.
+ ///
+ ///
+ /// Items are tracked via reference counting across all sources. An item appears downstream as long as
+ /// at least one source contains it. When the last source holding a key removes it, the item is removed downstream.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddIf this is the first source to provide the key, an Add is emitted. If other sources already have the key, the reference count is incremented but no emission occurs.
+ /// - UpdateIf the item is currently downstream, an Update is emitted.
+ /// - RemoveReference count decremented. If the count reaches zero (no source holds the key), a Remove is emitted. Otherwise no emission.
+ /// - RefreshIf the item is downstream, a Refresh is forwarded.
+ ///
+ ///
+ /// or is .
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Or(this IObservable> source, params IObservable>[] others)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ if (others is null || others.Length == 0)
+ {
+ throw new ArgumentNullException(nameof(others));
+ }
+
+ return source.Combine(CombineOperator.Or, others);
+ }
+
+ ///
+ /// The of streams to combine.
+ /// This overload accepts a pre-built collection of sources instead of a params array.
+ public static IObservable> Or(this ICollection>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Or);
+ }
+
+ ///
+ /// Dynamically apply a logical Or operator between the items in the outer observable list.
+ /// Items which are in any of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> Or(this IObservableList>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Or);
+ }
+
+ ///
+ /// Dynamically apply a logical Or operator between the items in the outer observable list.
+ /// Items which are in any of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> Or(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Or);
+ }
+
+ ///
+ /// Dynamically apply a logical Or operator between the items in the outer observable list.
+ /// Items which are in any of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> Or(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.Or);
+ }
+
+ ///
+ /// Combines multiple changeset streams using logical XOR (symmetric difference).
+ /// An item appears downstream only if it exists in exactly one source.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to combine.
+ /// The additional streams to combine with.
+ /// A changeset stream containing items present in exactly one source.
+ ///
+ ///
+ /// Items are tracked via reference counting. An item appears downstream only when exactly one
+ /// source holds it. Adding the same key from a second source removes it from the result;
+ /// removing from that second source restores it.
+ ///
+ ///
+ ///