Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io;

import org.apache.flink.annotation.Internal;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/** Reports available only when all child {@link AvailabilityProvider}s are available. */
@Internal
public class CompositeAvailabilityProvider implements AvailabilityProvider {
private final List<AvailabilityProvider> providers;

private CompositeAvailabilityProvider(List<AvailabilityProvider> providers) {
this.providers = new ArrayList<>(providers);
}

@Override
public CompletableFuture<?> getAvailableFuture() {
CompletableFuture<?> result = AvailabilityProvider.AVAILABLE;
for (AvailabilityProvider p : providers) {
result = AvailabilityProvider.and(result, p.getAvailableFuture());
}
return result;
}

public static AvailabilityProvider of(List<AvailabilityProvider> providers) {
if (providers.size() == 1) {
return providers.get(0);
}
return new CompositeAvailabilityProvider(providers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.AvailabilityProvider;

/**
* Interface for operators that absorb downstream availability internally, handling back-pressure by
* deferring emits and yielding the mailbox rather than blocking.
*
* <p>{@code OperatorChain} injects the composite downstream {@link AvailabilityProvider} via {@link
* #setDownstreamAvailabilityProvider} and lets the operator represent the chain's availability.
*/
@Internal
public interface SupportsSoftBackpressure extends AvailabilityProvider {

/**
* Called once by {@code OperatorChain} during construction to inject downstream availability.
*/
void setDownstreamAvailabilityProvider(AvailabilityProvider provider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
Expand Down Expand Up @@ -222,6 +223,10 @@ public void flush() throws IOException {
recordWriter.flushAll();
}

public AvailabilityProvider getOutputAvailabilityProvider() {
return recordWriter;
}

@Override
public void close() {
recordWriter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.CompositeAvailabilityProvider;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
Expand Down Expand Up @@ -61,6 +63,7 @@
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.SupportsSoftBackpressure;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
Expand Down Expand Up @@ -153,6 +156,8 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>

protected boolean isClosed;

private AvailabilityProvider operatorChainAvailabilityProvider = null;

public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
Expand Down Expand Up @@ -198,6 +203,7 @@ public OperatorChain(
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperatorWrapper<?, ?>> allOpWrappers =
new ArrayList<>(chainedConfigs.size());
List<AvailabilityProvider> mainAvailProviders = new ArrayList<>();
this.mainOperatorOutput =
createOutputCollector(
containingTask,
Expand All @@ -207,7 +213,8 @@ public OperatorChain(
recordWriterOutputs,
allOpWrappers,
containingTask.getMailboxExecutorFactory(),
operatorFactory != null);
operatorFactory != null,
mainAvailProviders);

if (operatorFactory != null) {
Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
Expand All @@ -232,6 +239,22 @@ public OperatorChain(
mainOperatorAndTimeService.f1,
true);

// Soft-backpressure head absorbs downstream availability.
if (mainOperator instanceof SupportsSoftBackpressure) {
SupportsSoftBackpressure head = (SupportsSoftBackpressure) mainOperator;
if (!mainAvailProviders.isEmpty()) {
head.setDownstreamAvailabilityProvider(
CompositeAvailabilityProvider.of(mainAvailProviders));
}
mainAvailProviders = new ArrayList<>();
mainAvailProviders.add(head);
}

if (!mainAvailProviders.isEmpty()) {
operatorChainAvailabilityProvider =
CompositeAvailabilityProvider.of(mainAvailProviders);
}

// add main operator to end of chain
allOpWrappers.add(mainOperatorWrapper);

Expand Down Expand Up @@ -485,6 +508,10 @@ public boolean isClosed() {
return isClosed;
}

public AvailabilityProvider getOperatorChainAvailabilityProvider() {
return operatorChainAvailabilityProvider;
}

/** Wrapper class to access the chained sources and their's outputs. */
public static class ChainedSource {
private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
Expand Down Expand Up @@ -712,7 +739,8 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory,
boolean shouldAddMetric) {
boolean shouldAddMetric,
List<AvailabilityProvider> downstreamAvailProviders) {
List<OutputWithChainingCheck<StreamRecord<T>>> allOutputs = new ArrayList<>(4);

// create collectors for the network outputs
Expand All @@ -723,6 +751,9 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
(RecordWriterOutput<T>) recordWriterOutputs.get(streamOutput.getDataSetId());

allOutputs.add(recordWriterOutput);
if (downstreamAvailProviders != null) {
downstreamAvailProviders.add(recordWriterOutput.getOutputAvailabilityProvider());
}
}

// Create collectors for the chained outputs
Expand All @@ -741,7 +772,8 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory,
shouldAddMetric);
shouldAddMetric,
downstreamAvailProviders);
checkState(output instanceof OutputWithChainingCheck);
allOutputs.add((OutputWithChainingCheck) output);
// If the operator has multiple downstream chained operators, only one of them should
Expand Down Expand Up @@ -820,7 +852,10 @@ private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorC
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
OutputTag<IN> outputTag,
MailboxExecutorFactory mailboxExecutorFactory,
boolean shouldAddMetricForPrevOperator) {
boolean shouldAddMetricForPrevOperator,
@Nullable List<AvailabilityProvider> parentDownstreamAvailProviders) {
List<AvailabilityProvider> myDownstreamAvailProviders = new ArrayList<>();

// create the output that the operator writes to first. this may recursively create more
// operators
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
Expand All @@ -832,7 +867,8 @@ private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorC
recordWriterOutputs,
allOperatorWrappers,
mailboxExecutorFactory,
true);
true,
myDownstreamAvailProviders);

OneInputStreamOperator<IN, OUT> chainedOperator =
createOperator(
Expand All @@ -843,6 +879,22 @@ private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorC
allOperatorWrappers,
false);

// Soft-backpressure operator absorbs downstream; only it is surfaced upward.
if (chainedOperator instanceof SupportsSoftBackpressure) {
SupportsSoftBackpressure op = (SupportsSoftBackpressure) chainedOperator;
if (!myDownstreamAvailProviders.isEmpty()) {
op.setDownstreamAvailabilityProvider(
CompositeAvailabilityProvider.of(myDownstreamAvailProviders));
}
if (parentDownstreamAvailProviders != null) {
parentDownstreamAvailProviders.add(op);
}
} else {
if (parentDownstreamAvailProviders != null) {
parentDownstreamAvailProviders.addAll(myDownstreamAvailProviders);
}
}

return wrapOperatorIntoOutput(
chainedOperator,
containingTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>

@Nullable private final AvailabilityProvider changelogWriterAvailabilityProvider;

@Nullable private AvailabilityProvider operatorChainAvailabilityProvider = null;

private long initializeStateEndTs;

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -695,6 +697,10 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
// waiting for changelog availability is reported as busy
timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
} else if (operatorChainAvailabilityProvider != null
&& !operatorChainAvailabilityProvider.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
resumeFuture = operatorChainAvailabilityProvider.getAvailableFuture();
} else {
// data availability has changed in the meantime; retry immediately
return;
Expand Down Expand Up @@ -808,6 +814,8 @@ void restoreInternal() throws Exception {
? new FinishedOperatorChain<>(this, recordWriter)
: new RegularOperatorChain<>(this, recordWriter);
mainOperator = operatorChain.getMainOperator();
operatorChainAvailabilityProvider =
operatorChain.getOperatorChainAvailabilityProvider();

getEnvironment()
.getTaskStateManager()
Expand Down Expand Up @@ -1188,7 +1196,9 @@ public boolean hasMail() {
private boolean taskIsAvailable() {
return recordWriter.isAvailable()
&& (changelogWriterAvailabilityProvider == null
|| changelogWriterAvailabilityProvider.isAvailable());
|| changelogWriterAvailabilityProvider.isAvailable())
&& (operatorChainAvailabilityProvider == null
|| operatorChainAvailabilityProvider.isAvailable());
}

public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() {
Expand Down
Loading