diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/CompositeAvailabilityProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/CompositeAvailabilityProvider.java new file mode 100644 index 0000000000000..1f6985e37beef --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/CompositeAvailabilityProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io; + +import org.apache.flink.annotation.Internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Reports available only when all child {@link AvailabilityProvider}s are available. */ +@Internal +public class CompositeAvailabilityProvider implements AvailabilityProvider { + private final List providers; + + private CompositeAvailabilityProvider(List providers) { + this.providers = new ArrayList<>(providers); + } + + @Override + public CompletableFuture getAvailableFuture() { + CompletableFuture result = AvailabilityProvider.AVAILABLE; + for (AvailabilityProvider p : providers) { + result = AvailabilityProvider.and(result, p.getAvailableFuture()); + } + return result; + } + + public static AvailabilityProvider of(List providers) { + if (providers.size() == 1) { + return providers.get(0); + } + return new CompositeAvailabilityProvider(providers); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SupportsSoftBackpressure.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SupportsSoftBackpressure.java new file mode 100644 index 0000000000000..38c1624720362 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SupportsSoftBackpressure.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.AvailabilityProvider; + +/** + * Interface for operators that absorb downstream availability internally, handling back-pressure by + * deferring emits and yielding the mailbox rather than blocking. + * + *

{@code OperatorChain} injects the composite downstream {@link AvailabilityProvider} via {@link + * #setDownstreamAvailabilityProvider} and lets the operator represent the chain's availability. + */ +@Internal +public interface SupportsSoftBackpressure extends AvailabilityProvider { + + /** + * Called once by {@code OperatorChain} during construction to inject downstream availability. + */ + void setDownstreamAvailabilityProvider(AvailabilityProvider provider); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 815fa13bc3963..3fe4aa1c10867 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; @@ -222,6 +223,10 @@ public void flush() throws IOException { recordWriter.flushAll(); } + public AvailabilityProvider getOutputAvailabilityProvider() { + return recordWriter; + } + @Override public void close() { recordWriter.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 78d202c62c7ae..89dba13f4f4ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.CompositeAvailabilityProvider; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate; @@ -61,6 +63,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.SupportsSoftBackpressure; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; @@ -153,6 +156,8 @@ public abstract class OperatorChain> protected boolean isClosed; + private AvailabilityProvider operatorChainAvailabilityProvider = null; + public OperatorChain( StreamTask containingTask, RecordWriterDelegate>> recordWriterDelegate) { @@ -198,6 +203,7 @@ public OperatorChain( // we create the chain of operators and grab the collector that leads into the chain List> allOpWrappers = new ArrayList<>(chainedConfigs.size()); + List mainAvailProviders = new ArrayList<>(); this.mainOperatorOutput = createOutputCollector( containingTask, @@ -207,7 +213,8 @@ public OperatorChain( recordWriterOutputs, allOpWrappers, containingTask.getMailboxExecutorFactory(), - operatorFactory != null); + operatorFactory != null, + mainAvailProviders); if (operatorFactory != null) { Tuple2> mainOperatorAndTimeService = @@ -232,6 +239,22 @@ public OperatorChain( mainOperatorAndTimeService.f1, true); + // Soft-backpressure head absorbs downstream availability. + if (mainOperator instanceof SupportsSoftBackpressure) { + SupportsSoftBackpressure head = (SupportsSoftBackpressure) mainOperator; + if (!mainAvailProviders.isEmpty()) { + head.setDownstreamAvailabilityProvider( + CompositeAvailabilityProvider.of(mainAvailProviders)); + } + mainAvailProviders = new ArrayList<>(); + mainAvailProviders.add(head); + } + + if (!mainAvailProviders.isEmpty()) { + operatorChainAvailabilityProvider = + CompositeAvailabilityProvider.of(mainAvailProviders); + } + // add main operator to end of chain allOpWrappers.add(mainOperatorWrapper); @@ -485,6 +508,10 @@ public boolean isClosed() { return isClosed; } + public AvailabilityProvider getOperatorChainAvailabilityProvider() { + return operatorChainAvailabilityProvider; + } + /** Wrapper class to access the chained sources and their's outputs. */ public static class ChainedSource { private final WatermarkGaugeExposingOutput> chainedSourceOutput; @@ -712,7 +739,8 @@ private WatermarkGaugeExposingOutput> createOutputCollector( Map> recordWriterOutputs, List> allOperatorWrappers, MailboxExecutorFactory mailboxExecutorFactory, - boolean shouldAddMetric) { + boolean shouldAddMetric, + List downstreamAvailProviders) { List>> allOutputs = new ArrayList<>(4); // create collectors for the network outputs @@ -723,6 +751,9 @@ private WatermarkGaugeExposingOutput> createOutputCollector( (RecordWriterOutput) recordWriterOutputs.get(streamOutput.getDataSetId()); allOutputs.add(recordWriterOutput); + if (downstreamAvailProviders != null) { + downstreamAvailProviders.add(recordWriterOutput.getOutputAvailabilityProvider()); + } } // Create collectors for the chained outputs @@ -741,7 +772,8 @@ private WatermarkGaugeExposingOutput> createOutputCollector( allOperatorWrappers, outputEdge.getOutputTag(), mailboxExecutorFactory, - shouldAddMetric); + shouldAddMetric, + downstreamAvailProviders); checkState(output instanceof OutputWithChainingCheck); allOutputs.add((OutputWithChainingCheck) output); // If the operator has multiple downstream chained operators, only one of them should @@ -820,7 +852,10 @@ private WatermarkGaugeExposingOutput> createOperatorC List> allOperatorWrappers, OutputTag outputTag, MailboxExecutorFactory mailboxExecutorFactory, - boolean shouldAddMetricForPrevOperator) { + boolean shouldAddMetricForPrevOperator, + @Nullable List parentDownstreamAvailProviders) { + List myDownstreamAvailProviders = new ArrayList<>(); + // create the output that the operator writes to first. this may recursively create more // operators WatermarkGaugeExposingOutput> chainedOperatorOutput = @@ -832,7 +867,8 @@ private WatermarkGaugeExposingOutput> createOperatorC recordWriterOutputs, allOperatorWrappers, mailboxExecutorFactory, - true); + true, + myDownstreamAvailProviders); OneInputStreamOperator chainedOperator = createOperator( @@ -843,6 +879,22 @@ private WatermarkGaugeExposingOutput> createOperatorC allOperatorWrappers, false); + // Soft-backpressure operator absorbs downstream; only it is surfaced upward. + if (chainedOperator instanceof SupportsSoftBackpressure) { + SupportsSoftBackpressure op = (SupportsSoftBackpressure) chainedOperator; + if (!myDownstreamAvailProviders.isEmpty()) { + op.setDownstreamAvailabilityProvider( + CompositeAvailabilityProvider.of(myDownstreamAvailProviders)); + } + if (parentDownstreamAvailProviders != null) { + parentDownstreamAvailProviders.add(op); + } + } else { + if (parentDownstreamAvailProviders != null) { + parentDownstreamAvailProviders.addAll(myDownstreamAvailProviders); + } + } + return wrapOperatorIntoOutput( chainedOperator, containingTask, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e06164125fd41..abe7e244e2b2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -328,6 +328,8 @@ public abstract class StreamTask> @Nullable private final AvailabilityProvider changelogWriterAvailabilityProvider; + @Nullable private AvailabilityProvider operatorChainAvailabilityProvider = null; + private long initializeStateEndTs; // ------------------------------------------------------------------------ @@ -695,6 +697,10 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E // waiting for changelog availability is reported as busy timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond()); resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture(); + } else if (operatorChainAvailabilityProvider != null + && !operatorChainAvailabilityProvider.isAvailable()) { + timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond()); + resumeFuture = operatorChainAvailabilityProvider.getAvailableFuture(); } else { // data availability has changed in the meantime; retry immediately return; @@ -808,6 +814,8 @@ void restoreInternal() throws Exception { ? new FinishedOperatorChain<>(this, recordWriter) : new RegularOperatorChain<>(this, recordWriter); mainOperator = operatorChain.getMainOperator(); + operatorChainAvailabilityProvider = + operatorChain.getOperatorChainAvailabilityProvider(); getEnvironment() .getTaskStateManager() @@ -1188,7 +1196,9 @@ public boolean hasMail() { private boolean taskIsAvailable() { return recordWriter.isAvailable() && (changelogWriterAvailabilityProvider == null - || changelogWriterAvailabilityProvider.isAvailable()); + || changelogWriterAvailabilityProvider.isAvailable()) + && (operatorChainAvailabilityProvider == null + || operatorChainAvailabilityProvider.isAvailable()); } public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java index d425fc61b008e..081ce8709100c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java @@ -19,8 +19,11 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -37,6 +40,7 @@ import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.SupportsSoftBackpressure; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorChain; import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain; @@ -49,7 +53,9 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.streaming.api.operators.StreamOperatorUtils.setupStreamOperator; import static org.assertj.core.api.Assertions.assertThat; @@ -276,6 +282,143 @@ public void invoke(String value, Context ctx) throws Exception { } } + /** + * Exercises the head branch of {@link OperatorChain}: when the main operator implements {@link + * SupportsSoftBackpressure}, the chain availability provider must be the head operator itself. + */ + @Test + void testChainAvailabilityProviderForHeadSoftBackpressureOperator() throws Exception { + JobVertex chainedVertex = buildChainedVertexWithSoftBackpressureOperator(true); + + StreamConfig streamConfig = new StreamConfig(chainedVertex.getConfiguration()); + + try (MockEnvironment environment = createMockEnvironment(chainedVertex.getName())) { + environment.addOutputs( + Collections.singletonList(new AvailabilityTestResultPartitionWriter(true))); + StreamTask> mockTask = + createMockTask(streamConfig, environment); + OperatorChain> operatorChain = + createOperatorChain(streamConfig, environment, mockTask); + + AvailabilityProvider chainAvailability = + operatorChain.getOperatorChainAvailabilityProvider(); + assertThat(chainAvailability) + .as("Head soft-backpressure operator should represent chain" + " availability") + .isSameAs(operatorChain.getMainOperator()); + assertThat(chainAvailability.isAvailable()).as("Operator starts available").isTrue(); + } + } + + /** + * Exercises the chained branch of {@link OperatorChain}: when a {@link + * SupportsSoftBackpressure} operator sits behind the head, it replaces the downstream leaves in + * the parent's collector, so the chain availability provider must be that chained operator. + */ + @Test + void testChainAvailabilityProviderForChainedSoftBackpressureOperator() throws Exception { + JobVertex chainedVertex = buildChainedVertexWithSoftBackpressureOperator(false); + + StreamConfig streamConfig = new StreamConfig(chainedVertex.getConfiguration()); + + try (MockEnvironment environment = createMockEnvironment(chainedVertex.getName())) { + environment.addOutputs( + Collections.singletonList(new AvailabilityTestResultPartitionWriter(true))); + StreamTask> mockTask = + createMockTask(streamConfig, environment); + OperatorChain> operatorChain = + createOperatorChain(streamConfig, environment, mockTask); + + SupportsSoftBackpressure chainedSbp = findSingleSoftBackpressureOperator(operatorChain); + AvailabilityProvider chainAvailability = + operatorChain.getOperatorChainAvailabilityProvider(); + assertThat(chainAvailability) + .as( + "Chained soft-backpressure operator should replace downstream" + + " leaves and act as chain availability boundary") + .isSameAs(chainedSbp); + assertThat(chainAvailability) + .as( + "Head operator (not the soft-backpressure one) is not the chain" + + " availability provider") + .isNotSameAs(operatorChain.getMainOperator()); + assertThat(chainAvailability.isAvailable()).as("Operator starts available").isTrue(); + } + } + + private static JobVertex buildChainedVertexWithSoftBackpressureOperator( + boolean softBackpressureAtHead) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream input = env.fromData(1, 2, 3); + + if (softBackpressureAtHead) { + input = + input.transform( + "SoftBackpressureMap", + input.getType(), + new SoftBackpressureMapOperator<>(value -> value)); + input = input.map(value -> value); + } else { + input = input.map(value -> value); + input = + input.transform( + "SoftBackpressureMap", + input.getType(), + new SoftBackpressureMapOperator<>(value -> value)); + } + + input.map(value -> value) + .startNewChain() + .addSink( + new SinkFunction() { + @Override + public void invoke(Integer value, Context ctx) {} + }); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + assertThat(jobGraph.getVerticesSortedTopologicallyFromSources()).hasSize(3); + return jobGraph.getVerticesSortedTopologicallyFromSources().get(1); + } + + private static SupportsSoftBackpressure findSingleSoftBackpressureOperator( + OperatorChain chain) { + SupportsSoftBackpressure found = null; + for (StreamOperatorWrapper wrapper : chain.getAllOperators()) { + if (wrapper.getStreamOperator() instanceof SupportsSoftBackpressure) { + assertThat(found) + .as("Expected exactly one SupportsSoftBackpressure operator in chain") + .isNull(); + found = (SupportsSoftBackpressure) wrapper.getStreamOperator(); + } + } + assertThat(found).as("Expected a SupportsSoftBackpressure operator in chain").isNotNull(); + return found; + } + + /** + * A {@link StreamMap} that also implements {@link SupportsSoftBackpressure} for testing + * OperatorChain availability-provider wiring without depending on flink-streaming-java + * operators. + */ + private static class SoftBackpressureMapOperator extends StreamMap + implements SupportsSoftBackpressure { + + private static final long serialVersionUID = 1L; + + SoftBackpressureMapOperator(MapFunction mapper) { + super(mapper); + } + + @Override + public void setDownstreamAvailabilityProvider(AvailabilityProvider provider) {} + + @Override + public CompletableFuture getAvailableFuture() { + return AvailabilityProvider.AVAILABLE; + } + } + private > OperatorChain createOperatorChain( StreamConfig streamConfig, Environment environment, StreamTask task) { return new TestOperatorChain<>( diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/CompositeAvailabilityProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/CompositeAvailabilityProviderTest.java new file mode 100644 index 0000000000000..6e4fd5e04ee42 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/CompositeAvailabilityProviderTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.CompositeAvailabilityProvider; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CompositeAvailabilityProvider}. */ +class CompositeAvailabilityProviderTest { + + @Test + void testSingleChildReturnsChildDirectly() { + TestAvailabilityProvider child = new TestAvailabilityProvider(); + AvailabilityProvider composite = + CompositeAvailabilityProvider.of(Collections.singletonList(child)); + + assertThat(composite).isSameAs(child); + assertThat(composite.getAvailableFuture()).isSameAs(child.future); + assertThat(composite.isAvailable()).isFalse(); + + child.future.complete(null); + assertThat(composite.isAvailable()).isTrue(); + } + + @Test + void testMultipleChildrenWrappedInComposite() { + TestAvailabilityProvider first = new TestAvailabilityProvider(); + TestAvailabilityProvider second = new TestAvailabilityProvider(); + List children = Arrays.asList(first, second); + + AvailabilityProvider composite = CompositeAvailabilityProvider.of(children); + + assertThat(composite).isInstanceOf(CompositeAvailabilityProvider.class); + assertThat(composite.isAvailable()).isFalse(); + } + + @Test + void testGetAvailableFutureCompletesOnlyWhenAllChildrenComplete() { + TestAvailabilityProvider first = new TestAvailabilityProvider(); + TestAvailabilityProvider second = new TestAvailabilityProvider(); + AvailabilityProvider composite = + CompositeAvailabilityProvider.of(Arrays.asList(first, second)); + + CompletableFuture available = composite.getAvailableFuture(); + assertThat(available.isDone()).isFalse(); + + first.future.complete(null); + assertThat(available.isDone()) + .as("composite future should remain incomplete while one child is pending") + .isFalse(); + + second.future.complete(null); + assertThat(available.isDone()).isTrue(); + assertThat(composite.isAvailable()).isTrue(); + } + + /** Minimal {@link AvailabilityProvider} backed by a mutable {@link CompletableFuture}. */ + private static final class TestAvailabilityProvider implements AvailabilityProvider { + private final CompletableFuture future = new CompletableFuture<>(); + + @Override + public CompletableFuture getAvailableFuture() { + return future; + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index 323fe4106f30c..ce32e8fdd529a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -37,6 +38,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.SupportsSoftBackpressure; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue; import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue; @@ -57,6 +59,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,7 +95,7 @@ @Internal public class AsyncWaitOperator extends AbstractUdfStreamOperator> - implements OneInputStreamOperator, BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput, SupportsSoftBackpressure { private static final long serialVersionUID = 1L; private static final String STATE_NAME = "_async_wait_operator_state_"; @@ -139,6 +142,8 @@ public class AsyncWaitOperator /** Whether retry is disabled due to task finish, initially set to false. */ private transient AtomicBoolean retryDisabledOnFinish; + private AvailabilityProvider downstreamAvailabilityProvider; + public AsyncWaitOperator( StreamOperatorParameters parameters, @Nonnull AsyncFunction asyncFunction, @@ -390,6 +395,25 @@ private void waitInFlightInputsFinished() throws InterruptedException { */ private void outputCompletedElement() { if (queue.hasCompletedElements()) { + // Defer emission until downstream becomes available. + if (downstreamAvailabilityProvider != null + && !downstreamAvailabilityProvider.isAvailable()) { + downstreamAvailabilityProvider + .getAvailableFuture() + .thenRun( + () -> { + try { + mailboxExecutor.execute( + this::outputCompletedElement, + "AsyncWaitOperator#outputCompletedElement(deferred)"); + } catch (RejectedExecutionException e) { + LOG.debug( + "Deferred element emission is ignored since the mailbox rejected the execution.", + e); + } + }); + return; + } // emit only one element to not block the mailbox thread unnecessarily queue.emitCompletedElement(timestampedCollector); // if there are more completed elements, emit them with subsequent mails @@ -431,6 +455,16 @@ private ScheduledFuture registerTimer( timeoutTimestamp, timestamp -> callback.accept(null)); } + @Override + public CompletableFuture getAvailableFuture() { + return queue.getAvailableFuture(); + } + + @Override + public void setDownstreamAvailabilityProvider(AvailabilityProvider provider) { + this.downstreamAvailabilityProvider = provider; + } + /** A delegator holds the real {@link ResultHandler} to handle retries. */ private class RetryableResultHandlerDelegator implements ResultFuture { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java index 6d8fa95f94f72..84a94a531d3a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.CompletableFuture; /** * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue provides @@ -52,11 +53,14 @@ public final class OrderedStreamElementQueue implements StreamElementQueue< /** Queue for the inserted StreamElementQueueEntries. */ private final Queue> queue; + private final AvailabilityHelper availabilityHelper = new AvailabilityHelper(); + public OrderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; this.queue = new ArrayDeque<>(capacity); + this.availabilityHelper.resetAvailable(); } @Override @@ -67,8 +71,12 @@ public boolean hasCompletedElements() { @Override public void emitCompletedElement(TimestampedCollector output) { if (hasCompletedElements()) { + boolean isFullBefore = isFull(); final StreamElementQueueEntry head = queue.poll(); head.emitResult(output); + if (isFullBefore && !isFull()) { + availabilityHelper.getUnavailableToResetAvailable().complete(null); + } } } @@ -93,7 +101,7 @@ public int size() { @Override public Optional> tryPut(StreamElement streamElement) { - if (queue.size() < capacity) { + if (!isFull()) { StreamElementQueueEntry queueEntry = createEntry(streamElement); queue.add(queueEntry); @@ -104,6 +112,10 @@ public Optional> tryPut(StreamElement streamElement) { queue.size(), capacity); + if (isFull()) { + availabilityHelper.resetUnavailable(); + } + return Optional.of(queueEntry); } else { LOG.debug( @@ -116,6 +128,15 @@ public Optional> tryPut(StreamElement streamElement) { } } + @Override + public CompletableFuture getAvailableFuture() { + return availabilityHelper.getAvailableFuture(); + } + + private boolean isFull() { + return queue.size() >= capacity; + } + private StreamElementQueueEntry createEntry(StreamElement streamElement) { if (streamElement.isRecord()) { return new StreamRecordQueueEntry<>((StreamRecord) streamElement); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java index bb6c18dc31069..e26ab02959feb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; @@ -29,7 +30,7 @@ /** Interface for stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue extends AvailabilityProvider { /** * Tries to put the given element in the queue. This operation succeeds if the queue has diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java index 289b1b89087af..7e79c2cb8cd68 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.CompletableFuture; /** * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue @@ -63,6 +64,8 @@ public final class UnorderedStreamElementQueue implements StreamElementQueu private int numberOfEntries; + private final AvailabilityHelper availabilityHelper = new AvailabilityHelper(); + public UnorderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); @@ -70,11 +73,12 @@ public UnorderedStreamElementQueue(int capacity) { // most likely scenario are 4 segments this.segments = new ArrayDeque<>(4); this.numberOfEntries = 0; + this.availabilityHelper.resetAvailable(); } @Override public Optional> tryPut(StreamElement streamElement) { - if (size() < capacity) { + if (!isFull()) { StreamElementQueueEntry queueEntry; if (streamElement.isRecord()) { queueEntry = addRecord((StreamRecord) streamElement); @@ -92,6 +96,10 @@ public Optional> tryPut(StreamElement streamElement) { size(), capacity); + if (isFull()) { + availabilityHelper.resetUnavailable(); + } + return Optional.of(queueEntry); } else { LOG.debug( @@ -155,6 +163,7 @@ public void emitCompletedElement(TimestampedCollector output) { if (segments.isEmpty()) { return; } + boolean isFullBefore = isFull(); final Segment currentSegment = segments.getFirst(); numberOfEntries -= currentSegment.emitCompleted(output); @@ -163,6 +172,9 @@ public void emitCompletedElement(TimestampedCollector output) { if (segments.size() > 1 && currentSegment.isEmpty()) { segments.pop(); } + if (isFullBefore && !isFull()) { + availabilityHelper.getUnavailableToResetAvailable().complete(null); + } } @Override @@ -184,6 +196,15 @@ public int size() { return numberOfEntries; } + @Override + public CompletableFuture getAvailableFuture() { + return availabilityHelper.getAvailableFuture(); + } + + private boolean isFull() { + return size() >= capacity; + } + /** An entry that notifies the respective segment upon completion. */ static class SegmentedStreamRecordQueueEntry extends StreamRecordQueueEntry { private final Segment segment; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 6ed1fa364ea2b..b16a603b630d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -55,6 +56,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder; +import org.apache.flink.streaming.runtime.tasks.mailbox.Mail; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; @@ -486,6 +488,134 @@ private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exceptio } } + /** + * Tests that the OperatorChain availability provider exposed to {@link + * org.apache.flink.streaming.runtime.tasks.StreamTask} flips to unavailable when an {@link + * AsyncWaitOperator}'s queue is full, and recovers once a slot is freed by an emitted result. + * This is the integration that drives the soft-backpressure path in {@code + * StreamTask#processInput}. + */ + @Test + void testStreamTaskChainAvailabilityFlipsWithAsyncQueueCapacity() throws Exception { + SharedReference>> resultFutures = sharedObjects.add(new ArrayList<>()); + + StreamTaskMailboxTestHarnessBuilder builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO); + + try (StreamTaskMailboxTestHarness harness = + builder.setupOutputForSingletonOperatorChain( + new AsyncWaitOperatorFactory<>( + new CollectableFuturesAsyncFunction<>(resultFutures), + TIMEOUT, + 2, + AsyncDataStream.OutputMode.ORDERED)) + .build()) { + + AvailabilityProvider chainAvailability = + harness.getOperatorChain().getOperatorChainAvailabilityProvider(); + assertThat(chainAvailability) + .as("Singleton AsyncWait chain must expose an availability provider") + .isNotNull(); + assertThat(chainAvailability.isAvailable()) + .as("Empty async queue starts available") + .isTrue(); + + // Fill the capacity-2 queue. The async function never auto-completes. + harness.processElement(new StreamRecord<>(1, 0L)); + harness.processElement(new StreamRecord<>(2, 1L)); + + assertThat(chainAvailability.isAvailable()) + .as( + "After filling capacity, chain availability must drop to false so" + + " StreamTask#processInput enters the soft-backpressure" + + " branch") + .isFalse(); + + // Complete one entry; the head emits and frees a slot, restoring availability. + completeWithSingle(resultFutures.get().get(0), 10); + harness.processAll(); + + assertThat(chainAvailability.isAvailable()) + .as("Freeing one slot must restore chain availability") + .isTrue(); + + // Drain the rest to keep the harness clean. + completeWithSingle(resultFutures.get().get(1), 20); + harness.processAll(); + assertThat(harness.getOutput()) + .containsExactly(new StreamRecord<>(10, 0L), new StreamRecord<>(20, 1L)); + } + } + + /** + * Tests that {@link AsyncWaitOperator} defers element emission while the configured downstream + * {@link AvailabilityProvider} is unavailable, and that completing the downstream's future + * schedules a deferred mail that finally drains the buffered element. + */ + @Test + void testOutputCompletedElementDefersWhenDownstreamUnavailable() throws Exception { + final OneInputStreamOperatorTestHarness testHarness = + createTestHarness( + new ImmediateAsyncFunction(), + TIMEOUT, + 2, + AsyncDataStream.OutputMode.ORDERED); + + testHarness.open(); + + final AsyncWaitOperator operator = + (AsyncWaitOperator) testHarness.getOneInputOperator(); + final CompletableFuture downstreamFuture = new CompletableFuture<>(); + operator.setDownstreamAvailabilityProvider(() -> downstreamFuture); + + synchronized (testHarness.getCheckpointLock()) { + testHarness.processElement(new StreamRecord<>(7, 0L)); + } + + // ImmediateAsyncFunction completes synchronously; draining runs the resulting + // outputCompletedElement mail, which must observe the unavailable downstream + // and register a thenRun callback instead of emitting. + drainMailbox(testHarness); + assertThat(testHarness.getOutput()) + .as("Element must not be emitted while downstream is unavailable") + .isEmpty(); + + // Signal downstream availability. The thenRun callback runs synchronously on + // this thread and enqueues a deferred outputCompletedElement mail. + downstreamFuture.complete(null); + drainMailbox(testHarness); + assertThat(testHarness.getOutput()).containsExactly(new StreamRecord<>(7, 0L)); + + synchronized (testHarness.getCheckpointLock()) { + testHarness.endInput(); + testHarness.close(); + } + } + + private static void drainMailbox(OneInputStreamOperatorTestHarness testHarness) + throws Exception { + for (Mail mail : testHarness.getTaskMailbox().drain()) { + mail.run(); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static void completeWithSingle(ResultFuture resultFuture, Object value) { + ((ResultFuture) resultFuture).complete(Collections.singletonList(value)); + } + + /** A synchronous {@link AsyncFunction} that completes the result on the calling thread. */ + private static class ImmediateAsyncFunction implements AsyncFunction { + private static final long serialVersionUID = 1L; + + @Override + public void asyncInvoke(Integer input, ResultFuture resultFuture) { + resultFuture.complete(Collections.singletonList(input)); + } + } + /** Tests that the AsyncWaitOperator works together with chaining. */ @Test void testOperatorChainWithProcessingTime() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java index e5f83ab901d6d..b79d094a1e76a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSuccessfully; @@ -70,4 +71,54 @@ void testCompletionOrder() { assertThat(queue.size()).isZero(); assertThat(queue.isEmpty()).isTrue(); } + + /** + * Tests that {@link OrderedStreamElementQueue#isAvailable()} flips to false once the queue is + * full and that {@link OrderedStreamElementQueue#getAvailableFuture()} returns an incomplete + * future in that state. + */ + @Test + void testIsAvailableTogglesWithCapacity() { + final OrderedStreamElementQueue queue = new OrderedStreamElementQueue<>(2); + + assertThat(queue.isAvailable()).isTrue(); + assertThat(queue.getAvailableFuture().isDone()).isTrue(); + + putSuccessfully(queue, new StreamRecord<>(1, 0L)); + assertThat(queue.isAvailable()) + .as("queue with one free slot remaining should still be available") + .isTrue(); + + putSuccessfully(queue, new StreamRecord<>(2, 1L)); + assertThat(queue.isAvailable()).as("queue at capacity should be unavailable").isFalse(); + assertThat(queue.getAvailableFuture().isDone()).isFalse(); + } + + /** + * Tests that the future returned by {@link OrderedStreamElementQueue#getAvailableFuture()} + * completes once a slot is freed by emitting a completed head element, and that the queue + * becomes available again. + */ + @Test + void testGetAvailableFutureCompletesWhenSlotFreed() { + final OrderedStreamElementQueue queue = new OrderedStreamElementQueue<>(2); + + ResultFuture entry1 = putSuccessfully(queue, new StreamRecord<>(1, 0L)); + putSuccessfully(queue, new StreamRecord<>(2, 1L)); + + CompletableFuture availableFuture = queue.getAvailableFuture(); + assertThat(availableFuture.isDone()).isFalse(); + assertThat(queue.isAvailable()).isFalse(); + + // Completing the head and emitting it frees one slot, which must complete the future + // captured while the queue was full. + entry1.complete(Collections.singleton(10)); + assertThat(popCompleted(queue)).containsExactly(new StreamRecord<>(10, 0L)); + + assertThat(availableFuture.isDone()) + .as("availability future should be completed when capacity is freed") + .isTrue(); + assertThat(queue.isAvailable()).isTrue(); + assertThat(queue.getAvailableFuture().isDone()).isTrue(); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java index 35b8292aef482..c358552d3b216 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSuccessfully; @@ -93,4 +95,54 @@ void testCompletionOrder() { assertThat(queue.isEmpty()).isTrue(); assertThat(popCompleted(queue)).isEmpty(); } + + /** + * Tests that {@link UnorderedStreamElementQueue#isAvailable()} starts true, stays true while + * the queue has free capacity, and flips to false once it becomes full; the future returned by + * {@link UnorderedStreamElementQueue#getAvailableFuture()} mirrors this state. + */ + @Test + void testIsAvailableTogglesWithCapacity() { + final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue<>(2); + + assertThat(queue.isAvailable()).isTrue(); + assertThat(queue.getAvailableFuture().isDone()).isTrue(); + + putSuccessfully(queue, new StreamRecord<>(1, 0L)); + assertThat(queue.isAvailable()) + .as("queue with one free slot remaining should still be available") + .isTrue(); + + putSuccessfully(queue, new StreamRecord<>(2, 1L)); + assertThat(queue.isAvailable()).as("queue at capacity should be unavailable").isFalse(); + assertThat(queue.getAvailableFuture().isDone()).isFalse(); + } + + /** + * Tests that the future returned by {@link UnorderedStreamElementQueue#getAvailableFuture()} + * completes once a slot is freed by emitting a completed element from the head segment, and + * that the queue becomes available again. + */ + @Test + void testGetAvailableFutureCompletesWhenSlotFreed() { + final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue<>(2); + + ResultFuture entry1 = putSuccessfully(queue, new StreamRecord<>(1, 0L)); + putSuccessfully(queue, new StreamRecord<>(2, 1L)); + + CompletableFuture availableFuture = queue.getAvailableFuture(); + assertThat(availableFuture.isDone()).isFalse(); + assertThat(queue.isAvailable()).isFalse(); + + // Completing an entry in the head segment and emitting it frees one slot, which must + // complete the future captured while the queue was full. + entry1.complete(Collections.singleton(11)); + assertThat(popCompleted(queue)).containsExactly(new StreamRecord<>(11, 0L)); + + assertThat(availableFuture.isDone()) + .as("availability future should be completed when capacity is freed") + .isTrue(); + assertThat(queue.isAvailable()).isTrue(); + assertThat(queue.getAvailableFuture().isDone()).isTrue(); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java index 4e733481536b8..b813bd30cfb2b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java @@ -69,6 +69,10 @@ public TestTaskStateManager getTaskStateManager() { return streamTask; } + public OperatorChain getOperatorChain() { + return streamTask.operatorChain; + } + public TimerService getTimerService() { return streamTask.getTimerService(); }