Skip to content

Commit 863140e

Browse files
committed
[FLINK-38342][runtime] Support record rescale information
Co-authored-by: Matthias Pohl <mpohl@confluent.io> Co-authored-by: Wei Zhong <weizhong0618@gmail.com> Co-authored-by: Ferenc Csaky <fcsaky@apache.org> Co-authored-by: Rui Fan <1996fanrui@gmail.com> Signed-off-by: Yuepeng Pan <panyuepeng@apache.org>
1 parent b6cb13a commit 863140e

23 files changed

Lines changed: 525 additions & 23 deletions

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@
115115
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
116116
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
117117
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
118+
import org.apache.flink.runtime.scheduler.adaptive.timeline.DefaultRescaleTimeline;
119+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
120+
import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
121+
import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminalState;
122+
import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
123+
import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
118124
import org.apache.flink.runtime.scheduler.adaptivebatch.NonAdaptiveExecutionPlanSchedulingContext;
119125
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
120126
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
@@ -149,6 +155,7 @@
149155
import java.util.Iterator;
150156
import java.util.List;
151157
import java.util.Map;
158+
import java.util.Objects;
152159
import java.util.Optional;
153160
import java.util.concurrent.CompletableFuture;
154161
import java.util.concurrent.Executor;
@@ -162,6 +169,7 @@
162169
import static org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL;
163170
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
164171
import static org.apache.flink.runtime.scheduler.adaptive.ForwardEdgesAdapter.copyJobGraphWithAdaptedForwardEdges;
172+
import static org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline.NoOpRescaleTimeline;
165173

166174
/**
167175
* A {@link SchedulerNG} implementation that uses the declarative resource management and
@@ -453,6 +461,8 @@ public JobRescaleConfigInfo toJobRescaleConfigInfo() {
453461

454462
private final Supplier<Temporal> clock = Instant::now;
455463

464+
private final RescaleTimeline rescaleTimeline;
465+
456466
public AdaptiveScheduler(
457467
Settings settings,
458468
JobGraph jobGraph,
@@ -546,6 +556,12 @@ public AdaptiveScheduler(
546556
this.initialParallelismStore = vertexParallelismStore;
547557
this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore);
548558

559+
final int rescaleHistoryMax = settings.getRescaleHistoryMax();
560+
this.rescaleTimeline =
561+
rescaleHistoryMax <= 0
562+
? NoOpRescaleTimeline.INSTANCE
563+
: new DefaultRescaleTimeline(() -> jobInformation, rescaleHistoryMax);
564+
549565
this.declarativeSlotPool = declarativeSlotPool;
550566
this.initializationTimestamp = initializationTimestamp;
551567
this.ioExecutor = ioExecutor;
@@ -730,6 +746,11 @@ static VertexParallelismStore computeVertexParallelismStoreForExecution(
730746
jobGraph.getVertices(), defaultMaxParallelismFunc);
731747
}
732748

749+
@Override
750+
public RescaleTimeline getRescaleTimeline() {
751+
return rescaleTimeline;
752+
}
753+
733754
private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) {
734755
state.tryRun(
735756
ResourceListener.class,
@@ -1125,6 +1146,7 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq
11251146
if (maybeUpdateVertexParallelismStore.isPresent()) {
11261147
this.jobInformation =
11271148
new JobGraphJobInformation(jobGraph, maybeUpdateVertexParallelismStore.get());
1149+
recordRescaleForNewResourceRequirements();
11281150
declareDesiredResources();
11291151
state.tryRun(
11301152
ResourceListener.class,
@@ -1133,6 +1155,27 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq
11331155
}
11341156
}
11351157

1158+
private void recordRescaleForNewResourceRequirements() {
1159+
rescaleTimeline.updateRescale(
1160+
rescale ->
1161+
rescale.addSchedulerState(state)
1162+
.setTerminatedReason(TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED)
1163+
.setEndToNow()
1164+
.log());
1165+
rescaleTimeline.newRescale(true);
1166+
rescaleTimeline.updateRescale(
1167+
rescale ->
1168+
rescale.setStartToNow()
1169+
.setDesiredVertexParallelism(jobInformation)
1170+
.setTriggerCause(TriggerCause.UPDATE_REQUIREMENT)
1171+
.setDesiredSlots(jobInformation)
1172+
.setMinimalRequiredSlots(jobInformation)
1173+
.setPreRescaleSlotsAndParallelisms(
1174+
jobInformation,
1175+
rescaleTimeline.getLatestRescale(TerminalState.COMPLETED))
1176+
.log());
1177+
}
1178+
11361179
// ----------------------------------------------------------------
11371180

11381181
@Override
@@ -1288,6 +1331,15 @@ public void goToCanceling(
12881331
OperatorCoordinatorHandler operatorCoordinatorHandler,
12891332
List<ExceptionHistoryEntry> failureCollection) {
12901333

1334+
if (!rescaleTimeline.isIdling()) {
1335+
rescaleTimeline.updateRescale(
1336+
rescale ->
1337+
rescale.addSchedulerState(state)
1338+
.setEndToNow()
1339+
.setTerminatedReason(TerminatedReason.JOB_CANCELED)
1340+
.log());
1341+
}
1342+
12911343
transitionToState(
12921344
new Canceling.Factory(
12931345
this,
@@ -1308,6 +1360,8 @@ public void goToRestarting(
13081360
@Nullable VertexParallelism restartWithParallelism,
13091361
List<ExceptionHistoryEntry> failureCollection) {
13101362

1363+
recordRescaleForJobRestarting(restartWithParallelism);
1364+
13111365
for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
13121366
final int attemptNumber =
13131367
executionVertex.getCurrentExecutionAttempt().getAttemptNumber();
@@ -1336,13 +1390,54 @@ public void goToRestarting(
13361390
}
13371391
}
13381392

1393+
private void recordRescaleForJobRestarting(VertexParallelism restartWithParallelism) {
1394+
// For the normal rescaling restarting.
1395+
if (Objects.nonNull(restartWithParallelism)) {
1396+
rescaleTimeline.updateRescale(
1397+
rescale ->
1398+
rescale.setMinimalRequiredSlots(jobInformation)
1399+
.setDesiredVertexParallelism(jobInformation)
1400+
.setDesiredSlots(jobInformation)
1401+
.log());
1402+
return;
1403+
}
1404+
// For the failover restarting.
1405+
if (!rescaleTimeline.isIdling()) {
1406+
// Process by https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h.
1407+
LOG.info(
1408+
"Merge the current non-terminated rescale and the new rescale triggered by recoverable failover into the current rescale.");
1409+
rescaleTimeline.updateRescale(Rescale::clearSchedulerStates);
1410+
} else if (rescaleTimeline.isIdling()) {
1411+
rescaleTimeline.newRescale(false);
1412+
}
1413+
rescaleTimeline.updateRescale(
1414+
rescale ->
1415+
rescale.setStartToNow()
1416+
.setTriggerCause(TriggerCause.RECOVERABLE_FAILOVER)
1417+
.setMinimalRequiredSlots(jobInformation)
1418+
.setPreRescaleSlotsAndParallelisms(
1419+
jobInformation,
1420+
rescaleTimeline.getLatestRescale(TerminalState.COMPLETED))
1421+
.setDesiredVertexParallelism(jobInformation)
1422+
.setDesiredSlots(jobInformation)
1423+
.log());
1424+
}
1425+
13391426
@Override
13401427
public void goToFailing(
13411428
ExecutionGraph executionGraph,
13421429
ExecutionGraphHandler executionGraphHandler,
13431430
OperatorCoordinatorHandler operatorCoordinatorHandler,
13441431
Throwable failureCause,
13451432
List<ExceptionHistoryEntry> failureCollection) {
1433+
rescaleTimeline.updateRescale(
1434+
rescale ->
1435+
rescale.setEndToNow()
1436+
.addSchedulerState(state, failureCause)
1437+
.setTerminatedReason(TerminatedReason.JOB_FAILED)
1438+
.setStringifiedException(
1439+
ExceptionUtils.stringifyException(failureCause))
1440+
.log());
13461441
transitionToState(
13471442
new Failing.Factory(
13481443
this,
@@ -1381,6 +1476,12 @@ public CompletableFuture<String> goToStopWithSavepoint(
13811476

13821477
@Override
13831478
public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
1479+
rescaleTimeline.updateRescale(
1480+
rescale ->
1481+
rescale.addSchedulerState(state)
1482+
.setEndToNow()
1483+
.setTerminatedReason(TerminatedReason.JOB_FINISHED)
1484+
.log());
13841485
transitionToState(new Finished.Factory(this, archivedExecutionGraph, LOG));
13851486
}
13861487

@@ -1650,6 +1751,9 @@ <T extends State> T transitionToState(StateFactory<T> targetState) {
16501751
final JobStatus previousJobStatus = state.getJobStatus();
16511752

16521753
state.onLeave(targetState.getStateClass());
1754+
1755+
rescaleTimeline.updateRescale(rescale -> rescale.addSchedulerState(state));
1756+
16531757
T targetStateInstance = targetState.getState();
16541758
state = targetStateInstance;
16551759

@@ -1669,8 +1773,7 @@ <T extends State> T transitionToState(StateFactory<T> targetState) {
16691773
}
16701774
}
16711775

1672-
@VisibleForTesting
1673-
State getState() {
1776+
public State getState() {
16741777
return state;
16751778
}
16761779

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package org.apache.flink.runtime.scheduler.adaptive;
2020

2121
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
23+
import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
24+
import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
2225

2326
import org.slf4j.Logger;
2427

@@ -39,9 +42,24 @@ public JobStatus getJobStatus() {
3942

4043
/** Starts the scheduling by going into the {@link WaitingForResources} state. */
4144
void startScheduling() {
45+
recordRescaleForInitialScheduling();
4246
context.goToWaitingForResources(null);
4347
}
4448

49+
private void recordRescaleForInitialScheduling() {
50+
RescaleTimeline rescaleTimeline = context.getRescaleTimeline();
51+
rescaleTimeline.newRescale(true);
52+
JobInformation jobInformation = rescaleTimeline.getJobInformation();
53+
rescaleTimeline.updateRescale(
54+
rescale ->
55+
rescale.setTriggerCause(TriggerCause.INITIAL_SCHEDULE)
56+
.setStartTimestamp(getDurable().getEnterTimestamp())
57+
.setDesiredVertexParallelism(jobInformation)
58+
.setDesiredSlots(jobInformation)
59+
.setMinimalRequiredSlots(jobInformation)
60+
.log());
61+
}
62+
4563
/** Context of the {@link Created} state. */
4664
interface Context
4765
extends StateWithoutExecutionGraph.Context, StateTransitions.ToWaitingForResources {}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
3434
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
3535
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
36+
import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
37+
import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
3638
import org.apache.flink.util.IterableUtils;
3739
import org.apache.flink.util.Preconditions;
3840
import org.apache.flink.util.concurrent.FutureUtils;
@@ -42,6 +44,7 @@
4244
import javax.annotation.Nullable;
4345

4446
import java.time.Duration;
47+
import java.time.Instant;
4548
import java.util.Collections;
4649
import java.util.concurrent.CompletableFuture;
4750
import java.util.concurrent.Executor;
@@ -97,6 +100,9 @@ private void handleExecutionGraphCreation(
97100
CreatingExecutionGraph.class.getSimpleName(),
98101
Executing.class.getSimpleName(),
99102
throwable);
103+
104+
recordRescaleForException(throwable);
105+
100106
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable));
101107
} else {
102108
for (ExecutionVertex vertex :
@@ -108,6 +114,8 @@ private void handleExecutionGraphCreation(
108114
context.tryToAssignSlots(executionGraphWithVertexParallelism);
109115

110116
if (result.isSuccess()) {
117+
recordRescaleForGraphCreation(executionGraphWithVertexParallelism);
118+
111119
getLogger()
112120
.debug(
113121
"Successfully reserved and assigned the required slots for the ExecutionGraph.");
@@ -155,6 +163,31 @@ private void handleExecutionGraphCreation(
155163
}
156164
}
157165

166+
private void recordRescaleForGraphCreation(
167+
ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism) {
168+
RescaleTimeline rescaleTimeline = context.getRescaleTimeline();
169+
rescaleTimeline.updateRescale(
170+
rescale ->
171+
rescale.setPostRescaleVertexParallelism(
172+
rescaleTimeline.getJobInformation(),
173+
executionGraphWithVertexParallelism.getVertexParallelism())
174+
.setPostRescaleSlots(
175+
executionGraphWithVertexParallelism.jobSchedulingPlan
176+
.getSlotAssignments())
177+
.log());
178+
}
179+
180+
private void recordRescaleForException(Throwable throwable) {
181+
final long epochMilli = Instant.now().toEpochMilli();
182+
context.getRescaleTimeline()
183+
.updateRescale(
184+
rescale ->
185+
rescale.setTerminatedReason(TerminatedReason.EXCEPTION_OCCURRED)
186+
.setEndTimestamp(epochMilli)
187+
.addSchedulerState(this, throwable)
188+
.log());
189+
}
190+
158191
@Override
159192
public JobStatus getJobStatus() {
160193
return JobStatus.CREATED;

0 commit comments

Comments
 (0)