NIFI-15570: Keep track of Content Claims where the last Claim in a Re…#10874
NIFI-15570: Keep track of Content Claims where the last Claim in a Re…#10874markap14 wants to merge 2 commits intoapache:mainfrom
Conversation
| return false; | ||
| } | ||
|
|
||
| private void truncate(final ContentClaim claim) { |
There was a problem hiding this comment.
The truncate method doesn't verify that the claimant count is still 0 before truncating. If a clone operation increments the claimant count while the truncation task is mid-flight, we could truncate content that is still referenced. Isn't it a concern?
Wondering if we could have a race condition:
TruncateClaims.truncateClaims()checksclaim.isTruncationCandidate()and seestrue- A clone operation calls
incrementClaimaintCount(), which setstruncationCandidate = falseand increments the claimant count TruncateClaims.truncate()proceeds to truncate the file anyway, corrupting the data for the newly cloned FlowFile
Or maybe this scenario is not an option for some reasons that I missed?
There was a problem hiding this comment.
Thanks for reviewing @pvillard31!
In short, no, that should not be possible. The only way we will ever queue up the ContentClaim for truncation is if the FlowFile Repository is synched to disk (typically on checkpoint but also possible on every commit if fsync property in nifi.properties is set to true) and the Content Claim has truncationCandidate = true. So at this point, the FlowFile Repository is the owner of the Content Claim and no Processor has access to it, and the Repository determines that there are no longer any references to it. As a result, we'll only queue up the Content Claim for truncation if there's only 1 referencing FlowFile and that one referencing FlowFile is now being removed. So no concerns about the claimant count going back up.
|
As a side note, the integration test failure was caused by another commit and is now fixed if you rebase on main. |
46becb2 to
a62dfca
Compare
…source Claim can be truncated if it is large. Whenever FlowFile Repository is checkpointed, truncate any large Resource Claims when possible and necessary to avoid having a situtation where a small FlowFile in a given Resource Claim prevents a large Content Claim from being cleaned up.
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for the detailed work on this improvement @markap14. The basic concept makes sense, and the overall implementation looks straightforward, with very helpful tests at multiple levels.
I noted a handful of most minor suggestions and questions, but this looks close the completion.
| private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class); | ||
| private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>(); | ||
| private final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); | ||
| private final BlockingQueue<ContentClaim> truncatableClaims = new LinkedBlockingQueue<>(100000); |
There was a problem hiding this comment.
Is there any particular reason for selecting 100,000 as the queue size? Is it related to the destructableClaims size, or just a reasonably high limit? If there is any particular reason, it would be helpful to add a comment for future reference.
There was a problem hiding this comment.
No, no particular reason. Just wanted a big value that's small enough to not cause heap exhaustion.
| logger.debug("Marking {} as truncatable", contentClaim); | ||
| try { | ||
| if (!truncatableClaims.offer(contentClaim, 1, TimeUnit.MINUTES)) { | ||
| logger.debug("Unable to mark {} as truncatable because the queue is full.", contentClaim); |
There was a problem hiding this comment.
It seems like this would be better as an INFO level, and it would be useful to include the queue size.
| logger.debug("Unable to mark {} as truncatable because the queue is full.", contentClaim); | |
| logger.info("Unable to mark {} as truncatable because maximum queue size [{}] reached", truncatableClaims.size(), contentClaim); |
| .description("The maximum number of batches to generate. Each batch produces 10 FlowFiles (9 small + 1 large). " | ||
| + "Once this many batches have been generated, no more FlowFiles will be produced until the processor is stopped and restarted.") |
There was a problem hiding this comment.
Recommend using a multi-line string
| for (final FlowFileRecord ff : recovered) { | ||
| if (ff.getContentClaim() != null) { |
There was a problem hiding this comment.
Is it possible for all of the recovered records to have null content Claims? It looks like it would be better to get a filtered list of non-null recovered records, assert that the list is not empty, and then check the truncation candidate status.
...re/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
Outdated
Show resolved
Hide resolved
...framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
Show resolved
Hide resolved
| // If able, truncate those claims. Otherwise, save those claims in the Truncation Claim Manager to be truncated on the next run. | ||
| // This prevents us from having a case where we could truncate a big claim but we don't because we're not yet running out of disk space, | ||
| // but then we later start to run out of disk space and lost the opportunity to truncate that big claim. | ||
| while (true) { |
There was a problem hiding this comment.
The while (true) { construction always gives me pause, although the return clarifies expected behavior. Since this Runnable is executed on a schedule, is it necessary to have this loop, as opposed to just waiting for the next invocation from the scheduler?
| } | ||
|
|
||
| if (!isArchiveClearedOnLastRun(container)) { | ||
| LOG.debug("Truncation is not active for container {} because the archive was not cleared on the last run.", container); |
There was a problem hiding this comment.
I generally avoid . at the end of log messages, but it seems to be used on many of these logs, recommend removing.
| // This is unlikely but can occur if the claim was truncatable and the underlying Resource Claim becomes | ||
| // destructable. In this case, we may archive or delete the entire ResourceClaim. This is safe to ignore, | ||
| // since it means the data is cleaned up anyway. | ||
| LOG.debug("Failed to truncate {} because file does not exist.", claim, nsfe); |
There was a problem hiding this comment.
| LOG.debug("Failed to truncate {} because file does not exist.", claim, nsfe); | |
| LOG.debug("Failed to truncate {} because file [{}] does not exist", claim, path, nsfe); |
| private static final int MAX_THRESHOLD = 100_000; | ||
| private final Map<String, List<ContentClaim>> truncationClaims = new HashMap<>(); | ||
|
|
||
| public synchronized void addTruncationClaims(final String container, final List<ContentClaim> claim) { |
There was a problem hiding this comment.
Do these class methods need to be public?
|
Thanks for the feedback @exceptionfactory . Updated. |
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for addressing the feedback @markap14! The changes look good, I plan to merge soon pending successful automated builds.
…source Claim can be truncated if it is large. Whenever FlowFile Repository is checkpointed, truncate any large Resource Claims when possible and necessary to avoid having a situtation where a small FlowFile in a given Resource Claim prevents a large Content Claim from being cleaned up.
Summary
NIFI-00000
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation