diff --git a/build.gradle b/build.gradle index 276a28e114..8bf8616439 100644 --- a/build.gradle +++ b/build.gradle @@ -47,10 +47,13 @@ dependencies { testImplementation "org.reactivestreams:reactive-streams-tck-flow:$reactiveStreamsVersion" testImplementation "org.junit.jupiter:junit-jupiter:$jupiterVersion" - testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$jupiterVersion" - // The missing piece – required by Gradle 9+ - testRuntimeOnly "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" // match your JUnit version family + // Explicitly add these for IDE compatibility (especially Eclipse) + testImplementation "org.junit.platform:junit-platform-commons:$jupiterLauncherVersion" + testImplementation "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" + + testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$jupiterVersion" + testRuntimeOnly "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" // already have this } // === Experimental JDK handling for Outreach Program === diff --git a/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java b/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java index feb7fc2087..42e01e8432 100644 --- a/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java +++ b/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java @@ -21,6 +21,7 @@ import io.reactivex.rxjava4.annotations.NonNull; import io.reactivex.rxjava4.disposables.*; +import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic; import io.reactivex.rxjava4.plugins.RxJavaPlugins; /** @@ -84,7 +85,7 @@ public CompletionStageDisposable(@NonNull CompletionStage stage, @NonNull Dis */ public void await() { state.lazySet(true);; - Streamer.await(stage); + AwaitCoordinatorStatic.await(stage); } /** @@ -93,7 +94,7 @@ public void await() { */ public void await(DisposableContainer canceller) { state.lazySet(true);; - Streamer.await(stage, canceller); + AwaitCoordinatorStatic.await(stage, canceller); } /** diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamable.java b/src/main/java/io/reactivex/rxjava4/core/Streamable.java index 3c9f9e9846..12d6d3dcbb 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamable.java @@ -14,7 +14,7 @@ package io.reactivex.rxjava4.core; import java.lang.reflect.InvocationTargetException; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import io.reactivex.rxjava4.annotations.*; @@ -22,6 +22,7 @@ import io.reactivex.rxjava4.exceptions.Exceptions; import io.reactivex.rxjava4.functions.*; import io.reactivex.rxjava4.internal.operators.streamable.*; +import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic; import io.reactivex.rxjava4.schedulers.Schedulers; import io.reactivex.rxjava4.subscribers.TestSubscriber; @@ -170,6 +171,49 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNu .toStreamable(); } + /** + * Generates a sequence in order which the stages complete in any form. + * @param the common element type + * @param stages the iterable of stages to be relayed in the order they complete + * @param executor the executor to run the blocking operator + * @return the new Streamable instance + */ + @SuppressWarnings("unchecked") + @NonNull + static <@NonNull T> Streamable> fromStages(@NonNull Iterable> stages, ExecutorService executor) { + return create(emitter -> { + var list = new ArrayList>(); + for(var stage : stages) { + list.add(stage); + } + while (list.size() != 0) { + var winner = AwaitCoordinatorStatic.awaitFirstIndex(list, emitter.canceller()); + emitter.emit((CompletionStage)list.remove(winner)); + } + }, executor); + } + + /** + * Emits the elements of each inner sequence produced by the outher sequence. + * @param the common element type + * @param sources the streamable of inner streamables + * @param exec the executorservice where to run the virtual wait + * @return the new Streamable instance. + */ + static <@NonNull T> Streamable concat(Streamable> sources, ExecutorService exec) { + return create(emitter -> { + try (var mainSource = sources.forEach(item -> { + try (var innerSource = item.forEach(inner -> { + emitter.emit(inner); + }, emitter.canceller().derive(), exec)) { + innerSource.await(emitter.canceller()); + } + }, emitter.canceller(), exec)) { + mainSource.await(emitter.canceller()); + }; + }, exec); + } + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo // Operators // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamer.java b/src/main/java/io/reactivex/rxjava4/core/Streamer.java index efa8dd97c9..47a527fb51 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamer.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamer.java @@ -14,11 +14,11 @@ package io.reactivex.rxjava4.core; import java.util.*; -import java.util.concurrent.*; -import java.util.function.Function; +import java.util.concurrent.CompletionStage; -import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.annotations.NonNull; import io.reactivex.rxjava4.disposables.*; +import io.reactivex.rxjava4.internal.util.AwaitCoordinator; /** * A realized stream which can then be consumed asynchronously in steps. @@ -31,7 +31,7 @@ * TODO proper docs * @since 4.0.0 */ -public interface Streamer<@NonNull T> extends AutoCloseable { +public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator { // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo // API @@ -195,79 +195,4 @@ default void awaitFinish() { default void awaitFinish(@NonNull DisposableContainer cancellation) { await(finish(cancellation), cancellation); } - - // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo - // ASYNC/AWAIT "Language" keyword implementations - // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo - - /** - * The {@code await} keyword for async/await. - * @param the type of the returned value if any. - * @param stage the stage to await virtual-blockingly - * @return the awaited value - */ - @Nullable - static T await(@NonNull CompletionStage stage) { - return await(stage, null); - } - - /** - * The cancellable {@code await} keyword for async/await. - * @param the type of the returned value if any. - * @param stage the stage to await virtual-blockingly - * @param canceller the container that can trigger a cancellation on demand - * @return the awaited value - */ - @Nullable - static T await(@NonNull CompletionStage stage, @Nullable DisposableContainer canceller) { - var f = stage.toCompletableFuture(); - if (canceller == null) { - return f.join(); - } - var d = Disposable.fromFuture(f, true); - try (var _ = canceller.subscribe(d)) { - return f.join(); - } - } - - /** - * Runs a function while turning it into a CompletionStage with a canceller supplied too. - * @param the return type of the function - * @param function the function to apply - * @param canceller the canceller to use - * @param executor the executor to use - * @return the new stage - */ - static CompletionStage runStage(Function function, - DisposableContainer canceller, Executor executor) { - var loopback = new SerialDisposable(); - canceller.add(loopback); - - // new Exception().printStackTrace(); - - var f = CompletableFuture.supplyAsync(() -> { - try { - return function.apply(canceller); - } finally { - canceller.delete(loopback); - } - }, executor); - - var d = Disposable.fromFuture(f, true); - loopback.replace(d); - - return f; - } - - /** - * Runs a function while turning it into a CompletionStage with a canceller supplied too. - * @param the return type of the function - * @param function the function to apply - * @param canceller the canceller to use - * @return the new stage - */ - static CompletionStage runStage(Function function, - DisposableContainer canceller) { - return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor()); - } } diff --git a/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java b/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java index df5be23c65..616363dfd4 100644 --- a/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java +++ b/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java @@ -268,4 +268,14 @@ public void reset() { resources = null; } } + + @Override + public DisposableContainer derive() { + var result = new CompositeDisposable(); + + add(result); + result.add(Disposable.fromRunnable(() -> delete(result))); + + return result; + } } diff --git a/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java b/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java index 5586be4b03..f39d5a9e4f 100644 --- a/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java +++ b/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java @@ -56,6 +56,14 @@ public interface DisposableContainer extends Disposable { */ void clear(); + /** + * Create a derived sub container that can get cancelled by this container, + * but cancelling the subcontainer does not cancel this container. + * @return the derived subcontainer + * @since 4.0 + */ + DisposableContainer derive(); + /** * Registers a {@link Disposable} with this container so that it can be removed and disposed * via a simple {@link #dispose()} call to the returned Disposable. @@ -133,5 +141,10 @@ public void reset() { public void clear() { // Who cares? } + + @Override + public DisposableContainer derive() { + return NEVER; + } } } diff --git a/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java b/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java index a64faa29df..24b9600fd2 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java +++ b/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java @@ -198,4 +198,14 @@ public void reset() { } } + @Override + public DisposableContainer derive() { + var result = new ListCompositeDisposable(); + + add(result); + result.add(Disposable.fromRunnable(() -> delete(result))); + + return result; + } + } diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java index 3e1c3fe2de..e3b4745969 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java @@ -22,7 +22,7 @@ import io.reactivex.rxjava4.disposables.DisposableContainer; import io.reactivex.rxjava4.internal.fuseable.HasUpstreamPublisher; import io.reactivex.rxjava4.internal.subscriptions.SubscriptionHelper; -import io.reactivex.rxjava4.internal.util.ExceptionHelper; +import io.reactivex.rxjava4.internal.util.*; import io.reactivex.rxjava4.internal.virtual.VirtualResumable; public record StreamableFromPublisher(@NonNull Publisher source, @@ -89,7 +89,7 @@ public void onComplete() { @Override public @NonNull CompletionStage next(@NonNull DisposableContainer canceller) { // System.out.println("next()"); - return Streamer.runStage(_ -> { + return AwaitCoordinatorStatic.runStage(_ -> { item.lazySet(null); // System.out.println("Requesting the next item"); SubscriptionHelper.deferredRequest(upstream, requester, 1); @@ -143,7 +143,7 @@ public void onComplete() { @Override public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { // new Exception("StreamableFromPublisher::finish").printStackTrace(); - return Streamer.runStage(_ -> { + return AwaitCoordinatorStatic.runStage(_ -> { SubscriptionHelper.cancel(upstream); return null; }, cancellation, executor); diff --git a/src/main/java/io/reactivex/rxjava4/internal/util/AwaitCoordinator.java b/src/main/java/io/reactivex/rxjava4/internal/util/AwaitCoordinator.java new file mode 100644 index 0000000000..1c7ccddf4b --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/util/AwaitCoordinator.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.util; + +import java.util.concurrent.*; +import java.util.function.Function; + +import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; + +/** + * Static methods to coordinate {@link CompletionStage}s for various operators. + */ +public interface AwaitCoordinator { + + /** + * The {@code await} keyword for async/await. + * @param the type of the returned value if any. + * @param stage the stage to await virtual-blockingly + * @return the awaited value + */ + @Nullable + default T await(@NonNull CompletionStage stage) { + return AwaitCoordinatorStatic.await(stage, null); + } + + /** + * The cancellable {@code await} keyword for async/await. + * @param the type of the returned value if any. + * @param stage the stage to await virtual-blockingly + * @param canceller the container that can trigger a cancellation on demand + * @return the awaited value + */ + @Nullable + default T await(@NonNull CompletionStage stage, @Nullable DisposableContainer canceller) { + return AwaitCoordinatorStatic.await(stage, canceller); + } + + /** + * Runs a function while turning it into a CompletionStage with a canceller supplied too. + * @param the return type of the function + * @param function the function to apply + * @param canceller the canceller to use + * @param executor the executor to use + * @return the new stage + */ + default CompletionStage runStage(Function function, + DisposableContainer canceller, Executor executor) { + return AwaitCoordinatorStatic.runStage(function, canceller, executor); + } + + /** + * Runs a function while turning it into a CompletionStage with a canceller supplied too. + * @param the return type of the function + * @param function the function to apply + * @param canceller the canceller to use + * @return the new stage + */ + default CompletionStage runStage(Function function, + DisposableContainer canceller) { + return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor()); + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/util/AwaitCoordinatorStatic.java b/src/main/java/io/reactivex/rxjava4/internal/util/AwaitCoordinatorStatic.java new file mode 100644 index 0000000000..cdff8f0c77 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/util/AwaitCoordinatorStatic.java @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.util; + +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; + +import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.core.Notification; +import io.reactivex.rxjava4.disposables.*; +import io.reactivex.rxjava4.exceptions.Exceptions; + +/** + * Static Async coordination methods. + * @since 4.0.0 + */ +public interface AwaitCoordinatorStatic { + + /** + * The {@code await} keyword for async/await. + * @param the type of the returned value if any. + * @param stage the stage to await virtual-blockingly + * @return the awaited value + */ + @Nullable + static T await(@NonNull CompletionStage stage) { + return await(stage, null); + } + + /** + * The cancellable {@code await} keyword for async/await. + * @param the type of the returned value if any. + * @param stage the stage to await virtual-blockingly + * @param canceller the container that can trigger a cancellation on demand + * @return the awaited value + */ + @Nullable + static T await(@NonNull CompletionStage stage, @Nullable DisposableContainer canceller) { + var f = stage.toCompletableFuture(); + if (canceller == null) { + return f.join(); + } + var d = Disposable.fromFuture(f, true); + try (var _ = canceller.subscribe(d)) { + return f.join(); + } + } + + /** + * Runs a function while turning it into a CompletionStage with a canceller supplied too. + * @param the return type of the function + * @param function the function to apply + * @param canceller the canceller to use + * @param executor the executor to use + * @return the new stage + */ + static CompletionStage runStage(Function function, + DisposableContainer canceller, Executor executor) { + var loopback = new SerialDisposable(); + canceller.add(loopback); + + // new Exception().printStackTrace(); + + var f = CompletableFuture.supplyAsync(() -> { + try { + return function.apply(canceller); + } finally { + canceller.delete(loopback); + } + }, executor); + + var d = Disposable.fromFuture(f, true); + loopback.replace(d); + + return f; + } + + /** + * Await all stages to completely some way and then return a list for each outcome in Notification format. + * @param the common element type + * @param stages the stages to await all + * @param canceller to trigger a cancellation + * @return the list of outcomes as notifications + */ + static List> awaitAll(Iterable> stages, DisposableContainer canceller) { + var result = new ArrayList>(); + for (var stage : stages) { + try { + var v = await(stage, canceller); + if (v == null) { + result.add(Notification.createOnComplete()); + } else { + result.add(Notification.createOnNext(v)); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + result.add(Notification.createOnError(ex)); + } + } + return result; + } + + /** + * Await for the very first responder, including value, failure or completion. + * @param the common element type + * @param stages the stages to await all + * @param canceller to trigger a cancellation + * @return the list of outcomes as notifications + */ + static T awaitFirst(Iterable> stages, DisposableContainer canceller) { + var winner = new CompletableFuture>(); + + for (var stage : stages) { + stage.whenComplete((_, _) -> { + winner.complete(stage); + }); + } + return await(winner, canceller).toCompletableFuture().getNow(null); + } + + /** + * Await for the very first successful responder and return its value. + * @param the common element type + * @param stages the stages to await all + * @param canceller to trigger a cancellation + * @return the list of outcomes as notifications + */ + @NonNull + @SuppressWarnings("unchecked") + static CompletionStage awaitFirstStage( + @NonNull Iterable> stages, + @NonNull DisposableContainer canceller) { + var winner = new CompletableFuture>(); + + for (var stage : stages) { + stage.whenComplete((_, _) -> { + winner.complete(stage); + }); + } + return (CompletionStage)await(winner, canceller).toCompletableFuture().getNow(null); + } + + /** + * Await for the very first successful responder and return its index. + * @param the common element type + * @param stages the stages to await all + * @param canceller to trigger a cancellation + * @return the list of outcomes as notifications + */ + @NonNull + static int awaitFirstIndex( + @NonNull List> stages, + @NonNull DisposableContainer canceller) { + var winner = new CompletableFuture(); + + for (int i = 0; i < stages.size(); i++) { + var stage = stages.get(i); + var fi = i; + stage.whenComplete((_, _) -> { + winner.complete(fi); + }); + } + return await(winner, canceller); + } + + /** + * Await for the very first successful responder and return its value. + * @param the common element type + * @param stages the stages to await all + * @param canceller to trigger a cancellation + * @return the list of outcomes as notifications + */ + static T awaitFirstSuccess(Iterable> stages, DisposableContainer canceller) { + var winner = new CompletableFuture>(); + + for (var stage : stages) { + stage.whenComplete((_, e) -> { + if (e == null) { + winner.complete(stage); + } + }); + } + return await(winner, canceller).toCompletableFuture().getNow(null); + } + +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java index 126ee906f7..14b5d810e8 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java @@ -262,4 +262,19 @@ public void rangeTransformTake() throws Throwable { assertEquals(1, cancelled.get(), "Cancellation count "); }); } + + @Test + public void concat() throws Throwable { + TestHelper.withVirtual(exec -> { + + var srcs = Flowable.just(Streamable.just(1), Streamable.empty(), Streamable.just(2)) + .toStreamable(); + + Streamable.concat(srcs, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2); + + }); + } }