Skip to content

[FLINK-38909] Fix Unable to delete S3 checkpoint due to presence of default file#27423

Open
llllssss94 wants to merge 3 commits intoapache:masterfrom
llllssss94:FLINK-38090
Open

[FLINK-38909] Fix Unable to delete S3 checkpoint due to presence of default file#27423
llllssss94 wants to merge 3 commits intoapache:masterfrom
llllssss94:FLINK-38090

Conversation

@llllssss94
Copy link
Copy Markdown

What is the purpose of the change

This pull request fixes a critical bug (FLINK-38909) that causes checkpoint cleanup to fail with a PathIsNotEmptyDirectoryException. The root cause was an incorrect, non-recursive delete call on a checkpoint's storage location, which by design contains multiple files.

A completed Flink checkpoint always consists of multiple data files and a metadata file, grouped under a common path (exclusiveCheckpointDir). This logical location is never empty. Attempting to delete it with a non-recursive delete(path, false) command is fundamentally incorrect and guaranteed to fail on any compliant file system. This bug leads to orphaned checkpoint data and storage leaks.

This fix corrects the logic by using a recursive delete, ensuring that all files and objects associated with a checkpoint's location are properly removed, regardless of the underlying filesystem's architecture.

Brief change log

  • In FsCompletedCheckpointStorageLocation.disposeStorageLocation(), the filesystem call was changed to fs.delete(exclusiveCheckpointDir, true). This enables recursive deletion, ensuring the entire directory tree of a checkpoint is properly removed.

Verifying this change

This change added tests and can be verified as follows:

  • Added a new test case to FsCompletedCheckpointStorageLocationTest to specifically reproduce the bug and validate the fix. This test simulates a real, non-empty checkpoint by creating a storage location with subdirectories and files. It then calls the disposeStorageLocation() method and asserts that no exception is thrown and the location is completely removed.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (This directly impacts the correctness of the checkpoint cleanup lifecycle.)
  • The S3 file system connector: yes (While the fix is in core Flink, the bug is most frequently observed on object storage systems, and this change ensures correct behavior on them.)

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jan 15, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

}
fs.delete(exclusiveCheckpointDir, false);
// Recursively delete the checkpoint directory and all its contents
fs.delete(exclusiveCheckpointDir, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change looks good.

I was wondering if the deletion fails (e.g. for permissions reasons ) should we look to catch and log that error.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

I thought about this. Even before this change, errors from the underlying filesystem handler were already caught and logged, and those logs were sufficient to debug the issue.

If we find that permission-related failures need special handling, we can add a targeted catch for that specific error type. For now, I think the existing logging is sufficient.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jan 16, 2026
Copy link
Copy Markdown
Contributor

@och5351 och5351 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our environment, MinIO automatically generates a default.txt file by default, which prevents us from deleting the checkpoint directory after Flink creates checkpoints.

Since this appears to be a straightforward issue to resolve, I hope this PR will be merged soon.

}
fs.delete(exclusiveCheckpointDir, false);
// Recursively delete the checkpoint directory and all its contents
fs.delete(exclusiveCheckpointDir, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too.

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

The recusrive deletion alone makes sense but are we sure that shared dir remains when incremental checkpoints are enabled? I'm interested in all 3 restore modes with cross-job checkpoint dependencies. My main saying that adding recursive delete is desired but we must know exaclty that we don't try to delete complete checkpoints/savepoints which are hidden because non-recursive delete hides it.

/checkpoint-dir/
├── _metadata
├── shared/
│   ├── sst-file-A  ← Referenced by checkpoint 1 & 2
│   └── sst-file-B  ← Referenced by checkpoint 2 only

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

I've just tried and MinIO doesn't create "default.txt" file. Are you sure about this? MinIO is passive storage...

@och5351
Copy link
Copy Markdown
Contributor

och5351 commented Feb 9, 2026

I've just tried and MinIO doesn't create "default.txt" file. Are you sure about this? MinIO is passive storage...

Hi, @gaborgsomogyi!

Thanks for your review.
You're absolutely right. MinIO itself doesn't create a default file.
it's just passive storage.
However, in our environment (with a webhook), when a PutObject action is detected, a default.txt file is created.

Although MinIO doesn't perform this action itself, similar situations could arise with other storage services. Therefore, I believe recursive deletion is necessary. However, as you mentioned, we should verify any potential side effects with incremental checkpoints.

Could you please re-review this once we've verified the incremental checkpoint behavior and tested the new changes?

your review once testing is complete.

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

You're absolutely right. MinIO itself doesn't create a default file.
it's just passive storage.
However, in our environment (with a webhook), when a PutObject action is detected, a default.txt file is created.

I strongly believe that Flink is responsible to delete the files/directories what it's created. I can be convinced if we would talk about Flink generated garbage/not needed files deletion and we can justify that never ever delete shared things. This can be done by testing all 3 RestoreMode cases (CLAIM, NO_CLAIM, LEGACY) and listing all cases which call this delete recursive code together with the justification why it's safe. It's important to highlight that I basically tend to feel that recursive delete is a good direction but:

  • External system created random files is not business justification to take accidental delete risk
  • Big tech companies are having TB scale states and accidental deletes have serious consequences so we need exact proofs
  • My current not proven understanding is that the actual codebase is depending on the false (non-recursive behavior) and works as a safety belt. Maybe caller side(s) must be changed.

All in all I can look back when more proof is available here. Again. I'm not nitpicking, state cleanup is required and good direction but we must give exact proofs.

@llllssss94
Copy link
Copy Markdown
Author

Hi, @gaborgsomogyi .

Thanks for the detailed feedback — these are exactly the right questions to ask for a change like this.

Let me walk through each concern:

  1. "External system created random files is not business justification"

Totally agree. The webhook-created default.txt was the trigger that exposed the bug, but the fix itself doesn't rely on that justification. The real issue is that Flink's own checkpoint files (state files, metadata, etc.) are nested inside chk-{id}/, and recursive=false fails to clean them up on object storage like S3/MinIO.

  1. "Maybe caller side(s) must be changed" / "safety belt" concern

I traced all callers end-to-end:

disposeStorageLocation() is only called from
CompletedCheckpointDiscardObject.discard(), which is invoked in
two cases:

  • CheckpointsCleaner: when CheckpointProperties.discardOnSubsumed() = true
  • StandaloneCompletedCheckpointStore: on job shutdown when
    shouldBeDiscardedOnShutdown(jobStatus) = true

In both cases, the target path is always a Flink-created chk-{id} directory.

In NO_CLAIM mode → forUnclaimedSnapshot() → discardSubsumed=false
In LEGACY mode → forSavepoint() → discardSubsumed=false

In fact, both forUnclaimedSnapshot() and forSavepoint() set ALL
discard flags to false (discardSubsumed, discardFinished,
discardCancelled, discardFailed, discardSuspended), so the original
snapshot is protected across both CheckpointsCleaner and shutdown paths.

In CLAIM mode → Flink takes ownership, deletion is expected
So recursive=false is not acting as a safety belt here...

  1. "Exact proof that only Flink-created paths are deleted"

The exclusiveCheckpointDir passed into disposeStorageLocation() originates exclusively from FsCheckpointStorageLocation or PersistentMetadataCheckpointStorageLocation constructors — both of which receive a chk-{id} directory that Flink itself created via createMetadataOutputStream().

There is no code path where an externally-created or user-owned directory ends up as the target of disposeStorageLocation().

  1. Consistency point

Interestingly, disposeOnFailure() in the same codebase already uses recursive=true:

// FsCheckpointStorageLocation.java
fileSystem.delete(checkpointDirectory, true); // already recursive!
So disposeStorageLocation() using recursive=false was always an inconsistency.

On test reproducibility:

Flink's LocalFileSystem.delete(path, false) actually throws IOException("Directory is not empty") for non-empty directories — same behavior as S3AFileSystem. So the unit test FsCompletedCheckpointStorageLocationTest directly reproduces the bug without needing a MinIO setup, and fails when recursive=false is used.

The IT test covers all 3 RecoveryClaimModes (CLAIM, NO_CLAIM, LEGACY) to verify the full checkpoint lifecycle.

Hope this addresses the concerns — happy to dig deeper into any specific part!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants