[FLINK-38909] Fix Unable to delete S3 checkpoint due to presence of default file#27423
[FLINK-38909] Fix Unable to delete S3 checkpoint due to presence of default file#27423llllssss94 wants to merge 3 commits intoapache:masterfrom
Conversation
| } | ||
| fs.delete(exclusiveCheckpointDir, false); | ||
| // Recursively delete the checkpoint directory and all its contents | ||
| fs.delete(exclusiveCheckpointDir, true); |
There was a problem hiding this comment.
the change looks good.
I was wondering if the deletion fails (e.g. for permissions reasons ) should we look to catch and log that error.
There was a problem hiding this comment.
Thanks!
I thought about this. Even before this change, errors from the underlying filesystem handler were already caught and logged, and those logs were sufficient to debug the issue.
If we find that permission-related failures need special handling, we can add a targeted catch for that specific error type. For now, I think the existing logging is sufficient.
There was a problem hiding this comment.
In our environment, MinIO automatically generates a default.txt file by default, which prevents us from deleting the checkpoint directory after Flink creates checkpoints.
Since this appears to be a straightforward issue to resolve, I hope this PR will be merged soon.
| } | ||
| fs.delete(exclusiveCheckpointDir, false); | ||
| // Recursively delete the checkpoint directory and all its contents | ||
| fs.delete(exclusiveCheckpointDir, true); |
|
The recusrive deletion alone makes sense but are we sure that |
|
I've just tried and MinIO doesn't create "default.txt" file. Are you sure about this? MinIO is passive storage... |
Hi, @gaborgsomogyi! Thanks for your review. Although MinIO doesn't perform this action itself, similar situations could arise with other storage services. Therefore, I believe recursive deletion is necessary. However, as you mentioned, we should verify any potential side effects with incremental checkpoints. Could you please re-review this once we've verified the incremental checkpoint behavior and tested the new changes? your review once testing is complete. |
I strongly believe that Flink is responsible to delete the files/directories what it's created. I can be convinced if we would talk about Flink generated garbage/not needed files deletion and we can justify that never ever delete shared things. This can be done by testing all 3 RestoreMode cases (CLAIM, NO_CLAIM, LEGACY) and listing all cases which call this delete recursive code together with the justification why it's safe. It's important to highlight that I basically tend to feel that recursive delete is a good direction but:
All in all I can look back when more proof is available here. Again. I'm not nitpicking, state cleanup is required and good direction but we must give exact proofs. |
|
Hi, @gaborgsomogyi . Thanks for the detailed feedback — these are exactly the right questions to ask for a change like this. Let me walk through each concern:
Totally agree. The webhook-created default.txt was the trigger that exposed the bug, but the fix itself doesn't rely on that justification. The real issue is that Flink's own checkpoint files (state files, metadata, etc.) are nested inside chk-{id}/, and recursive=false fails to clean them up on object storage like S3/MinIO.
I traced all callers end-to-end: disposeStorageLocation() is only called from
In both cases, the target path is always a Flink-created chk-{id} directory. In NO_CLAIM mode → forUnclaimedSnapshot() → discardSubsumed=false In fact, both forUnclaimedSnapshot() and forSavepoint() set ALL In CLAIM mode → Flink takes ownership, deletion is expected
The exclusiveCheckpointDir passed into disposeStorageLocation() originates exclusively from FsCheckpointStorageLocation or PersistentMetadataCheckpointStorageLocation constructors — both of which receive a chk-{id} directory that Flink itself created via createMetadataOutputStream(). There is no code path where an externally-created or user-owned directory ends up as the target of disposeStorageLocation().
Interestingly, disposeOnFailure() in the same codebase already uses recursive=true: // FsCheckpointStorageLocation.java On test reproducibility: Flink's LocalFileSystem.delete(path, false) actually throws IOException("Directory is not empty") for non-empty directories — same behavior as S3AFileSystem. So the unit test FsCompletedCheckpointStorageLocationTest directly reproduces the bug without needing a MinIO setup, and fails when recursive=false is used. The IT test covers all 3 RecoveryClaimModes (CLAIM, NO_CLAIM, LEGACY) to verify the full checkpoint lifecycle. Hope this addresses the concerns — happy to dig deeper into any specific part! |
What is the purpose of the change
This pull request fixes a critical bug (FLINK-38909) that causes checkpoint cleanup to fail with a
PathIsNotEmptyDirectoryException. The root cause was an incorrect, non-recursive delete call on a checkpoint's storage location, which by design contains multiple files.A completed Flink checkpoint always consists of multiple data files and a metadata file, grouped under a common path (
exclusiveCheckpointDir). This logical location is never empty. Attempting to delete it with a non-recursivedelete(path, false)command is fundamentally incorrect and guaranteed to fail on any compliant file system. This bug leads to orphaned checkpoint data and storage leaks.This fix corrects the logic by using a recursive delete, ensuring that all files and objects associated with a checkpoint's location are properly removed, regardless of the underlying filesystem's architecture.
Brief change log
FsCompletedCheckpointStorageLocation.disposeStorageLocation(), the filesystem call was changed tofs.delete(exclusiveCheckpointDir, true). This enables recursive deletion, ensuring the entire directory tree of a checkpoint is properly removed.Verifying this change
This change added tests and can be verified as follows:
FsCompletedCheckpointStorageLocationTestto specifically reproduce the bug and validate the fix. This test simulates a real, non-empty checkpoint by creating a storage location with subdirectories and files. It then calls thedisposeStorageLocation()method and asserts that no exception is thrown and the location is completely removed.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation