refactor: make supervisor diff+restart algorithm smarter on submission#19541
refactor: make supervisor diff+restart algorithm smarter on submission#19541jtuglu1 wants to merge 3 commits into
Conversation
b6ae435 to
d3e9127
Compare
1edda80 to
b18c3b3
Compare
d773ffa to
82dc32b
Compare
82dc32b to
fa855ef
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors supervisor “diff + restart” behavior so that supervisor specs can decide whether a changed submission actually requires a restart, while still persisting byte-level spec changes to metadata storage even when a restart is skipped. It introduces a SupervisorSpec.requireRestart(old) hook, implements smarter restart logic for seekable-stream supervisors, and adds IOConfig/spec builders plus equality semantics to support structured comparisons rather than opaque JSON byte diffs.
Changes:
- Add
SupervisorSpec.requireRestart(SupervisorSpec old)(default: restart on any non-identical change) and use it fromSupervisorManager.shouldUpdateSupervisor. - Persist spec updates even when restart is skipped via
SupervisorManager.updateSupervisorSpecWithoutRestart, wired intoSupervisorResourcePOST handling. - Introduce/extend builders and
equals/hashCodeimplementations across seekable-stream specs/configs (Kafka/Kinesis/RabbitStream) and update tests accordingly.
Reviewed changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java | Adds requireRestart(old) hook to let specs participate in restart decisions. |
| indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java | Splits “spec changed” vs “restart required”, adds no-restart persistence path. |
| indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java | Persists changed specs even when restart is skipped. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java | Implements structured restart decision + adds toBuilder() and spec equality/hash. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java | Adds equals/hashCode so spec equality can be structured. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java | Enhances builder (copyFromBase, more fields) and adds a default IOConfig builder. |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java | Makes default lag aggregator comparable across serde (stateless equality). |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java | Adds equals/hashCode for structured comparisons. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java | Adds toBuilder() to support restart comparison/copy patterns. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java | Adds equals/hashCode to ensure tuning changes affect comparisons. |
| extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisIOConfigBuilder.java | New builder for Kinesis IOConfig with copyFrom/build support. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java | Adds toBuilder() to support restart comparison/copy patterns. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java | Adds equals/hashCode to ensure tuning changes affect comparisons. |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java | Expands builder (copyFrom + additional fields), relies on base bounded/serverPriority fields. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java | Adds toBuilder() to support restart comparison/copy patterns. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java | Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java | Adds equals/hashCode to ensure tuning changes affect comparisons. |
| extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamIOConfigBuilder.java | New builder for RabbitStream IOConfig with copyFrom/build support. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java | Updates test spec to support toBuilder() and uses IOConfig builders. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java | Replaces inline IOConfig anonymous classes with builders for stable equality. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java | Adds requireRestart tests; updates IOConfig construction to builder pattern. |
| indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java | Adds equality/hashCode tests for IOConfig + related value types. |
| indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java | Adjusts POST expectations to include no-restart persistence call. |
| indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java | Adds tests for updateSupervisorSpecWithoutRestart and introduces versioned test spec. |
| extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java | Updates tests to use new Kinesis IOConfig builder. |
| extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java | Adds equality/hashCode tests and builder-based construction. |
| extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java | Updates sampler tests to use Kinesis IOConfig builder. |
| extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | Updates tests to use Kafka IOConfig builder. |
| extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java | Adds equality/hashCode tests and builder-based construction. |
| extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java | Updates tests to use RabbitStream IOConfig builder. |
| extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java | Adds equality/hashCode tests and builder-based construction. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 2 |
| P3 | 0 |
| Total | 3 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 2 |
| P3 | 0 |
| Total | 3 |
Reviewed 34 of 34 changed files.
This is an automated review by Codex GPT-5.5
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 2 |
| P3 | 0 |
| Total | 2 |
Reviewed 36 of 36 changed files.
This is an automated review by Codex GPT-5.5
cecemei
left a comment
There was a problem hiding this comment.
overall looks good to me, left a few minor comments
| synchronized (lock) { | ||
| Preconditions.checkState(started, "SupervisorManager not started"); | ||
| final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); | ||
| // Persist whenever the spec actually changed (or is new) — independent of whether a restart is |
There was a problem hiding this comment.
this method is now createOrUpdateAndStartSupervisor(spec, false) ? also we dont need shouldUpdateSupervisor anymore?
There was a problem hiding this comment.
shouldUpdateSupervisor is still used in CompactionSupervisorManager
Perhaps we can remove the usage there and then remove shouldUpdateSupervisor entirely?
| * taskCount change under autoscaling) is persisted without recreating the supervisor; otherwise the | ||
| * supervisor is stopped and recreated (the only behavior when the flag is false). | ||
| */ | ||
| public SpecUpdateOutcome createOrUpdateAndStartSupervisor(SupervisorSpec spec, boolean skipRestartIfUnmodified) |
There was a problem hiding this comment.
nit: maybe we need another forceRestart? since supervisor might not restart when spec is modified but it didnt require so?
There was a problem hiding this comment.
skipRestartIfUnModified=false -> should just mean we restart irrespective of semantic differences in the spec (e.g. current behavior)?
|
|
||
| return Response.ok(ImmutableMap.of("id", spec.getId(), "restarted", true)).build(); | ||
| final boolean restarted = outcome == SupervisorManager.SpecUpdateOutcome.RESTARTED; | ||
| return Response.ok(ImmutableMap.of("id", spec.getId(), "restarted", restarted)).build(); |
There was a problem hiding this comment.
nit: my hunch is that maybe we should have a modified boolean field as well since sometimes it modifies but doesnt restart?
There was a problem hiding this comment.
I think this makes sense. There's currently no distinction between whether a supervisor was unchanged and restarted vs changed and restarted
There was a problem hiding this comment.
+1 – this makes sense.
19edbf8 to
5168809
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
Reviewed 38 of 38 changed files.
This is an automated review by Codex GPT-5.5
| * @param old the currently-running supervisor spec | ||
| * @return true if the supervisor must be restarted to apply this spec | ||
| */ | ||
| default boolean requireRestart(SupervisorSpec old) |
There was a problem hiding this comment.
It would make more sense to invoke this method on the currently running (i.e. old) supervisor spec and pass the new one as an argument, since the question we are asking is "should we restart the current supervisor?".
This would also align this method to validateSpecUpdateTo(), which takes the new proposed spec as an argument.
| */ | ||
| public SupervisorIOConfigBuilder<?, ?> toBuilder() | ||
| { | ||
| return new SupervisorIOConfigBuilder.DefaultSupervisorIOConfigBuilder().copyFromBase(this); |
There was a problem hiding this comment.
If sub-classes must always override this method, we should make it abstract.
| return new LagStats(maxLag, totalLag, avgLag); | ||
| } | ||
|
|
||
| // Stateless: all instances are equal. Needed because Jackson creates fresh instances on |
There was a problem hiding this comment.
Alternatively, you can make the constructor private and add a static @JsonCreator method which just returns the DEFAULT instance.
| */ | ||
| public Builder<?> toBuilder() | ||
| { | ||
| throw new UnsupportedOperationException("No builder is available for this supervisor spec."); |
There was a problem hiding this comment.
Can this method be abstract instead?
| this.taskStorage = spec.taskStorage; | ||
| this.taskMaster = spec.taskMaster; | ||
| this.indexerMetadataStorageCoordinator = spec.indexerMetadataStorageCoordinator; | ||
| this.indexTaskClientFactory = spec.indexTaskClientFactory; | ||
| this.mapper = spec.mapper; | ||
| this.emitter = spec.emitter; | ||
| this.monitorSchedulerConfig = spec.monitorSchedulerConfig; | ||
| this.rowIngestionMetersFactory = spec.rowIngestionMetersFactory; | ||
| this.supervisorStateManagerConfig = spec.supervisorStateManagerConfig; |
There was a problem hiding this comment.
Will the builder ever use these dependencies?
The output of the build() method will only ever be used for an equality check which should not involve the services anyway. So the build() method could simply pass these as null, atleast for now.
| if (!isSpecChangedAndValidate(spec)) { | ||
| return false; | ||
| } | ||
| return spec.requireRestart(currentSupervisor.rhs); |
There was a problem hiding this comment.
Maybe use this for readability:
| if (!isSpecChangedAndValidate(spec)) { | |
| return false; | |
| } | |
| return spec.requireRestart(currentSupervisor.rhs); | |
| return isSpecChangedAndValidate(spec) && spec.requireRestart(currentSupervisor.rhs); |
| * Result of applying a submitted supervisor spec. {@code modified} means the persisted spec changed; | ||
| * {@code restarted} means the running supervisor was stopped and recreated. | ||
| */ | ||
| public static final class SpecUpdateResult |
There was a problem hiding this comment.
Please pull this into a separate file of its own named SupervisorSpecUpdateResult.
| * taskCount change under autoscaling) is persisted without recreating the supervisor; otherwise the | ||
| * supervisor is stopped and recreated (the only behavior when the flag is false). | ||
| */ | ||
| public SpecUpdateResult createOrUpdateAndStartSupervisor( |
There was a problem hiding this comment.
It seems error prone to have 2 flavors of the createOrUpdateAndStartSupervisor method.
Please try to merge them and pass some appropriate value for the skip restart flag if needed.
| synchronized (lock) { | ||
| Preconditions.checkState(started, "SupervisorManager not started"); | ||
| try { | ||
| byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec); |
There was a problem hiding this comment.
Maybe rename this method from shouldUpdateSupervisor to shouldRestartSupervisor to avoid ambiguity.
There was a problem hiding this comment.
Alternatively, add a new method shouldRestartSupervisor and leave shouldUpdateSupervisor unchanged.
Description
Restarting a supervisor spec after submission sometimes isn't always needed and can be expensive for large supervisors. The current skip restart logic is very bare-bones (doing effectively a byte-level check between the 2 specs) instead of allowing each spec to override its own restart requirements, often causing some false positive restarts. This diff can also be affected by false-positive diffs like ioConfig.taskCount being updated while auto-scaling is configured, whitespace changes, etc.
This updates the restart-required checking logic by allowing supervisors to implement their own restart requirement checks (plus some sane defaults) along with proper comparators, and stop treating each supervisor spec as an opaque ball of text. Also adds builders to keep things clean.
Notably, this doesn't change the spec persistence behavior: byte-level differing spec updates are persisted into the DB, but may not necessarily invoke a supervisor restart.
Release note
Key changed/added classes in this PR
MyFooOurBarTheirBazThis PR has: