Skip to content

refactor: make supervisor diff+restart algorithm smarter on submission#19541

Open
jtuglu1 wants to merge 3 commits into
apache:masterfrom
jtuglu1:fix-supervisor-restart-algo-on-submission
Open

refactor: make supervisor diff+restart algorithm smarter on submission#19541
jtuglu1 wants to merge 3 commits into
apache:masterfrom
jtuglu1:fix-supervisor-restart-algo-on-submission

Conversation

@jtuglu1

@jtuglu1 jtuglu1 commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Description

Restarting a supervisor spec after submission sometimes isn't always needed and can be expensive for large supervisors. The current skip restart logic is very bare-bones (doing effectively a byte-level check between the 2 specs) instead of allowing each spec to override its own restart requirements, often causing some false positive restarts. This diff can also be affected by false-positive diffs like ioConfig.taskCount being updated while auto-scaling is configured, whitespace changes, etc.

This updates the restart-required checking logic by allowing supervisors to implement their own restart requirement checks (plus some sane defaults) along with proper comparators, and stop treating each supervisor spec as an opaque ball of text. Also adds builders to keep things clean.

Notably, this doesn't change the spec persistence behavior: byte-level differing spec updates are persisted into the DB, but may not necessarily invoke a supervisor restart.

Release note


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@jtuglu1 jtuglu1 force-pushed the fix-supervisor-restart-algo-on-submission branch 2 times, most recently from 1edda80 to b18c3b3 Compare June 1, 2026 23:08
@jtuglu1 jtuglu1 force-pushed the fix-supervisor-restart-algo-on-submission branch 2 times, most recently from d773ffa to 82dc32b Compare June 2, 2026 02:25
@jtuglu1 jtuglu1 force-pushed the fix-supervisor-restart-algo-on-submission branch from 82dc32b to fa855ef Compare June 2, 2026 03:19
@kfaraz kfaraz requested a review from Copilot June 2, 2026 07:33

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors supervisor “diff + restart” behavior so that supervisor specs can decide whether a changed submission actually requires a restart, while still persisting byte-level spec changes to metadata storage even when a restart is skipped. It introduces a SupervisorSpec.requireRestart(old) hook, implements smarter restart logic for seekable-stream supervisors, and adds IOConfig/spec builders plus equality semantics to support structured comparisons rather than opaque JSON byte diffs.

Changes:

  • Add SupervisorSpec.requireRestart(SupervisorSpec old) (default: restart on any non-identical change) and use it from SupervisorManager.shouldUpdateSupervisor.
  • Persist spec updates even when restart is skipped via SupervisorManager.updateSupervisorSpecWithoutRestart, wired into SupervisorResource POST handling.
  • Introduce/extend builders and equals/hashCode implementations across seekable-stream specs/configs (Kafka/Kinesis/RabbitStream) and update tests accordingly.

Reviewed changes

Copilot reviewed 34 out of 34 changed files in this pull request and generated no comments.

Show a summary per file
File Description
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java Adds requireRestart(old) hook to let specs participate in restart decisions.
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java Splits “spec changed” vs “restart required”, adds no-restart persistence path.
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java Persists changed specs even when restart is skipped.
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java Implements structured restart decision + adds toBuilder() and spec equality/hash.
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying.
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java Adds equals/hashCode so spec equality can be structured.
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java Enhances builder (copyFromBase, more fields) and adds a default IOConfig builder.
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java Makes default lag aggregator comparable across serde (stateless equality).
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java Adds equals/hashCode for structured comparisons.
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java Adds toBuilder() to support restart comparison/copy patterns.
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying.
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java Adds equals/hashCode to ensure tuning changes affect comparisons.
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisIOConfigBuilder.java New builder for Kinesis IOConfig with copyFrom/build support.
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java Adds toBuilder() to support restart comparison/copy patterns.
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying.
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java Adds equals/hashCode to ensure tuning changes affect comparisons.
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java Expands builder (copyFrom + additional fields), relies on base bounded/serverPriority fields.
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java Adds toBuilder() to support restart comparison/copy patterns.
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java Adds equals/hashCode and toBuilder() for IOConfig comparisons/copying.
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java Adds equals/hashCode to ensure tuning changes affect comparisons.
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamIOConfigBuilder.java New builder for RabbitStream IOConfig with copyFrom/build support.
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java Updates test spec to support toBuilder() and uses IOConfig builders.
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java Replaces inline IOConfig anonymous classes with builders for stable equality.
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java Adds requireRestart tests; updates IOConfig construction to builder pattern.
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java Adds equality/hashCode tests for IOConfig + related value types.
indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java Adjusts POST expectations to include no-restart persistence call.
indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java Adds tests for updateSupervisorSpecWithoutRestart and introduces versioned test spec.
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java Updates tests to use new Kinesis IOConfig builder.
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java Adds equality/hashCode tests and builder-based construction.
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java Updates sampler tests to use Kinesis IOConfig builder.
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java Updates tests to use Kafka IOConfig builder.
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java Adds equality/hashCode tests and builder-based construction.
extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java Updates tests to use RabbitStream IOConfig builder.
extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java Adds equality/hashCode tests and builder-based construction.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3
Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3

Reviewed 34 of 34 changed files.


This is an automated review by Codex GPT-5.5

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 2
P3 0
Total 2

Reviewed 36 of 36 changed files.


This is an automated review by Codex GPT-5.5

@cecemei cecemei left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good to me, left a few minor comments

synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
// Persist whenever the spec actually changed (or is new) — independent of whether a restart is

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is now createOrUpdateAndStartSupervisor(spec, false) ? also we dont need shouldUpdateSupervisor anymore?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldUpdateSupervisor is still used in CompactionSupervisorManager

Perhaps we can remove the usage there and then remove shouldUpdateSupervisor entirely?

* taskCount change under autoscaling) is persisted without recreating the supervisor; otherwise the
* supervisor is stopped and recreated (the only behavior when the flag is false).
*/
public SpecUpdateOutcome createOrUpdateAndStartSupervisor(SupervisorSpec spec, boolean skipRestartIfUnmodified)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we need another forceRestart? since supervisor might not restart when spec is modified but it didnt require so?

@jtuglu1 jtuglu1 Jun 3, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipRestartIfUnModified=false -> should just mean we restart irrespective of semantic differences in the spec (e.g. current behavior)?


return Response.ok(ImmutableMap.of("id", spec.getId(), "restarted", true)).build();
final boolean restarted = outcome == SupervisorManager.SpecUpdateOutcome.RESTARTED;
return Response.ok(ImmutableMap.of("id", spec.getId(), "restarted", restarted)).build();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: my hunch is that maybe we should have a modified boolean field as well since sometimes it modifies but doesnt restart?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense. There's currently no distinction between whether a supervisor was unchanged and restarted vs changed and restarted

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 – this makes sense.

@jtuglu1 jtuglu1 force-pushed the fix-supervisor-restart-algo-on-submission branch from 19edbf8 to 5168809 Compare June 3, 2026 22:30

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.

Reviewed 38 of 38 changed files.


This is an automated review by Codex GPT-5.5

@jtuglu1 jtuglu1 added this to the 38.0.0 milestone Jun 5, 2026
@jtuglu1 jtuglu1 requested a review from cecemei June 5, 2026 22:23

@kfaraz kfaraz left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @jtuglu1 ! Overall makes sense, left some suggestions.

* @param old the currently-running supervisor spec
* @return true if the supervisor must be restarted to apply this spec
*/
default boolean requireRestart(SupervisorSpec old)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make more sense to invoke this method on the currently running (i.e. old) supervisor spec and pass the new one as an argument, since the question we are asking is "should we restart the current supervisor?".
This would also align this method to validateSpecUpdateTo(), which takes the new proposed spec as an argument.

*/
public SupervisorIOConfigBuilder<?, ?> toBuilder()
{
return new SupervisorIOConfigBuilder.DefaultSupervisorIOConfigBuilder().copyFromBase(this);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sub-classes must always override this method, we should make it abstract.

return new LagStats(maxLag, totalLag, avgLag);
}

// Stateless: all instances are equal. Needed because Jackson creates fresh instances on

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you can make the constructor private and add a static @JsonCreator method which just returns the DEFAULT instance.

*/
public Builder<?> toBuilder()
{
throw new UnsupportedOperationException("No builder is available for this supervisor spec.");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method be abstract instead?

Comment on lines +432 to +440
this.taskStorage = spec.taskStorage;
this.taskMaster = spec.taskMaster;
this.indexerMetadataStorageCoordinator = spec.indexerMetadataStorageCoordinator;
this.indexTaskClientFactory = spec.indexTaskClientFactory;
this.mapper = spec.mapper;
this.emitter = spec.emitter;
this.monitorSchedulerConfig = spec.monitorSchedulerConfig;
this.rowIngestionMetersFactory = spec.rowIngestionMetersFactory;
this.supervisorStateManagerConfig = spec.supervisorStateManagerConfig;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the builder ever use these dependencies?
The output of the build() method will only ever be used for an equality check which should not involve the services anyway. So the build() method could simply pass these as null, atleast for now.

Comment on lines +338 to +341
if (!isSpecChangedAndValidate(spec)) {
return false;
}
return spec.requireRestart(currentSupervisor.rhs);

@kfaraz kfaraz Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use this for readability:

Suggested change
if (!isSpecChangedAndValidate(spec)) {
return false;
}
return spec.requireRestart(currentSupervisor.rhs);
return isSpecChangedAndValidate(spec) && spec.requireRestart(currentSupervisor.rhs);

* Result of applying a submitted supervisor spec. {@code modified} means the persisted spec changed;
* {@code restarted} means the running supervisor was stopped and recreated.
*/
public static final class SpecUpdateResult

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pull this into a separate file of its own named SupervisorSpecUpdateResult.

* taskCount change under autoscaling) is persisted without recreating the supervisor; otherwise the
* supervisor is stopped and recreated (the only behavior when the flag is false).
*/
public SpecUpdateResult createOrUpdateAndStartSupervisor(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems error prone to have 2 flavors of the createOrUpdateAndStartSupervisor method.
Please try to merge them and pass some appropriate value for the skip restart flag if needed.

synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
try {
byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this method from shouldUpdateSupervisor to shouldRestartSupervisor to avoid ambiguity.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, add a new method shouldRestartSupervisor and leave shouldUpdateSupervisor unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants