2121import org .apache .nifi .controller .repository .claim .ContentClaim ;
2222import org .apache .nifi .controller .repository .claim .ResourceClaim ;
2323import org .apache .nifi .controller .repository .claim .ResourceClaimManager ;
24+ import org .apache .nifi .controller .repository .claim .StandardContentClaim ;
2425import org .apache .nifi .flowfile .attributes .CoreAttributes ;
26+ import org .apache .nifi .processor .DataUnit ;
2527import org .apache .nifi .repository .schema .FieldCache ;
2628import org .apache .nifi .util .FormatUtils ;
2729import org .apache .nifi .util .NiFiProperties ;
@@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
98100 private final List <File > flowFileRepositoryPaths = new ArrayList <>();
99101 private final ScheduledExecutorService checkpointExecutor ;
100102 private final int maxCharactersToCache ;
103+ private final long truncationThreshold ;
101104
102105 private volatile Collection <SerializedRepositoryRecord > recoveredRecords = null ;
103106 private final Set <ResourceClaim > orphanedResourceClaims = Collections .synchronizedSet (new HashSet <>());
@@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
132135 // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed
133136 // on restart.
134137 private final ConcurrentMap <Integer , BlockingQueue <ResourceClaim >> claimsAwaitingDestruction = new ConcurrentHashMap <>();
138+ private final ConcurrentMap <Integer , BlockingQueue <ContentClaim >> claimsAwaitingTruncation = new ConcurrentHashMap <>();
135139
136140 /**
137141 * default no args constructor for service loading only.
@@ -143,6 +147,7 @@ public WriteAheadFlowFileRepository() {
143147 nifiProperties = null ;
144148 retainOrphanedFlowFiles = true ;
145149 maxCharactersToCache = 0 ;
150+ truncationThreshold = Long .MAX_VALUE ;
146151 }
147152
148153 public WriteAheadFlowFileRepository (final NiFiProperties nifiProperties ) {
@@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
153158 retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean .parseBoolean (orphanedFlowFileProperty );
154159
155160 this .maxCharactersToCache = nifiProperties .getIntegerProperty (FLOWFILE_REPO_CACHE_SIZE , DEFAULT_CACHE_SIZE );
161+ final long maxAppendableClaimLength = DataUnit .parseDataSize (nifiProperties .getMaxAppendableClaimSize (), DataUnit .B ).longValue ();
162+ truncationThreshold = Math .min (1_000_000 , maxAppendableClaimLength );
156163
157164 final String directoryName = nifiProperties .getProperty (FLOWFILE_REPOSITORY_DIRECTORY_PREFIX );
158165 flowFileRepositoryPaths .add (new File (directoryName ));
@@ -445,33 +452,48 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
445452 // The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful.
446453 // Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim,
447454 // it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content
448- // Claim to the 'claimsAwaitDestruction ' map. As a result, we can call #markDestructable with the same ContentClaim
455+ // Claim to the 'claimsAwaitingDestruction ' map. As a result, we can call #markDestructable with the same ContentClaim
449456 // multiple times, and the #markDestructable method is not necessarily idempotent.
450457 // However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times.
451458 // This does not, however, cause problems, as ContentRepository should handle this
452459 // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface.
453- final Set <ResourceClaim > claimsToAdd = new HashSet <>();
460+ final Set <ResourceClaim > destructableClaims = new HashSet <>();
461+ final Set <ContentClaim > truncatableClaims = new HashSet <>();
454462
455463 final Set <String > swapLocationsAdded = new HashSet <>();
456464 final Set <String > swapLocationsRemoved = new HashSet <>();
457465
458466 for (final RepositoryRecord record : repositoryRecords ) {
459467 updateClaimCounts (record );
460468
469+ final ContentClaim contentClaim = record .getCurrentClaim ();
470+ final boolean truncationCandidate = contentClaim != null && contentClaim .isTruncationCandidate ();
471+ final boolean claimChanged = !Objects .equals (record .getOriginalClaim (), contentClaim );
461472 if (record .getType () == RepositoryRecordType .DELETE ) {
462- // For any DELETE record that we have, if claim is destructible, mark it so
463- if (record .getCurrentClaim () != null && isDestructable (record .getCurrentClaim ())) {
464- claimsToAdd .add (record .getCurrentClaim ().getResourceClaim ());
473+ // For any DELETE record that we have, if claim is destructible or truncatable, mark it so
474+ if (isDestructable (contentClaim )) {
475+ destructableClaims .add (contentClaim .getResourceClaim ());
476+ } else if (truncationCandidate ) {
477+ truncatableClaims .add (contentClaim );
465478 }
466479
467- // If the original claim is different than the current claim and the original claim is destructible, mark it so
468- if (record .getOriginalClaim () != null && !record .getOriginalClaim ().equals (record .getCurrentClaim ()) && isDestructable (record .getOriginalClaim ())) {
469- claimsToAdd .add (record .getOriginalClaim ().getResourceClaim ());
480+ // If the original claim is different than the current claim and the original claim is destructible
481+ // or truncatable, mark it so
482+ if (claimChanged ) {
483+ if (isDestructable (record .getOriginalClaim ())) {
484+ destructableClaims .add (record .getOriginalClaim ().getResourceClaim ());
485+ } else if (record .getOriginalClaim () != null && record .getOriginalClaim ().isTruncationCandidate ()) {
486+ truncatableClaims .add (record .getOriginalClaim ());
487+ }
470488 }
471489 } else if (record .getType () == RepositoryRecordType .UPDATE ) {
472490 // if we have an update, and the original is no longer needed, mark original as destructible
473- if (record .getOriginalClaim () != null && record .getCurrentClaim () != record .getOriginalClaim () && isDestructable (record .getOriginalClaim ())) {
474- claimsToAdd .add (record .getOriginalClaim ().getResourceClaim ());
491+ if (claimChanged ) {
492+ if (isDestructable (record .getOriginalClaim ())) {
493+ destructableClaims .add (record .getOriginalClaim ().getResourceClaim ());
494+ } else if (record .getOriginalClaim () != null && record .getOriginalClaim ().isTruncationCandidate ()) {
495+ truncatableClaims .add (record .getOriginalClaim ());
496+ }
475497 }
476498 } else if (record .getType () == RepositoryRecordType .SWAP_OUT ) {
477499 final String swapLocation = record .getSwapLocation ();
@@ -484,13 +506,16 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
484506 }
485507 }
486508
487- // Once the content claim counts have been updated for all records, collect any transient claims that are eligible for destruction
509+ // Once the content claim counts have been updated for all records, collect any transient
510+ // claims that are eligible for destruction or truncation
488511 for (final RepositoryRecord record : repositoryRecords ) {
489512 final List <ContentClaim > transientClaims = record .getTransientClaims ();
490513 if (transientClaims != null ) {
491514 for (final ContentClaim transientClaim : transientClaims ) {
492515 if (isDestructable (transientClaim )) {
493- claimsToAdd .add (transientClaim .getResourceClaim ());
516+ destructableClaims .add (transientClaim .getResourceClaim ());
517+ } else if (transientClaim .isTruncationCandidate ()) {
518+ truncatableClaims .add (transientClaim );
494519 }
495520 }
496521 }
@@ -504,19 +529,15 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
504529 }
505530 }
506531
507- if (!claimsToAdd .isEmpty ()) {
508- // Get / Register a Set<ContentClaim> for the given Partition Index
509- final Integer partitionKey = Integer .valueOf (partitionIndex );
510- BlockingQueue <ResourceClaim > claimQueue = claimsAwaitingDestruction .get (partitionKey );
511- if (claimQueue == null ) {
512- claimQueue = new LinkedBlockingQueue <>();
513- final BlockingQueue <ResourceClaim > existingClaimQueue = claimsAwaitingDestruction .putIfAbsent (partitionKey , claimQueue );
514- if (existingClaimQueue != null ) {
515- claimQueue = existingClaimQueue ;
516- }
517- }
532+ if (!destructableClaims .isEmpty ()) {
533+ // Get / Register a Set<ResourceClaim> for the given Partition Index
534+ final BlockingQueue <ResourceClaim > claimQueue = claimsAwaitingDestruction .computeIfAbsent (partitionIndex , key -> new LinkedBlockingQueue <>());
535+ claimQueue .addAll (destructableClaims );
536+ }
518537
519- claimQueue .addAll (claimsToAdd );
538+ if (!truncatableClaims .isEmpty ()) {
539+ final BlockingQueue <ContentClaim > claimQueue = claimsAwaitingTruncation .computeIfAbsent (partitionIndex , key -> new LinkedBlockingQueue <>());
540+ claimQueue .addAll (truncatableClaims );
520541 }
521542 }
522543
@@ -566,16 +587,24 @@ private static String getLocationSuffix(final String swapLocation) {
566587
567588 @ Override
568589 public void onSync (final int partitionIndex ) {
569- final BlockingQueue <ResourceClaim > claimQueue = claimsAwaitingDestruction .get (partitionIndex );
570- if (claimQueue == null ) {
571- return ;
590+ final BlockingQueue <ResourceClaim > destructionClaimQueue = claimsAwaitingDestruction .get (partitionIndex );
591+ if (destructionClaimQueue != null ) {
592+ final Set <ResourceClaim > claimsToDestroy = new HashSet <>();
593+ destructionClaimQueue .drainTo (claimsToDestroy );
594+
595+ for (final ResourceClaim claim : claimsToDestroy ) {
596+ markDestructable (claim );
597+ }
572598 }
573599
574- final Set <ResourceClaim > claimsToDestroy = new HashSet <>();
575- claimQueue .drainTo (claimsToDestroy );
600+ final BlockingQueue <ContentClaim > truncationClaimQueue = claimsAwaitingTruncation .get (partitionIndex );
601+ if (truncationClaimQueue != null ) {
602+ final Set <ContentClaim > claimsToTruncate = new HashSet <>();
603+ truncationClaimQueue .drainTo (claimsToTruncate );
576604
577- for (final ResourceClaim claim : claimsToDestroy ) {
578- markDestructable (claim );
605+ for (final ContentClaim claim : claimsToTruncate ) {
606+ claimManager .markTruncatable (claim );
607+ }
579608 }
580609 }
581610
@@ -589,6 +618,15 @@ public void onGlobalSync() {
589618 markDestructable (claim );
590619 }
591620 }
621+
622+ for (final BlockingQueue <ContentClaim > claimQueue : claimsAwaitingTruncation .values ()) {
623+ final Set <ContentClaim > claimsToTruncate = new HashSet <>();
624+ claimQueue .drainTo (claimsToTruncate );
625+
626+ for (final ContentClaim claim : claimsToTruncate ) {
627+ claimManager .markTruncatable (claim );
628+ }
629+ }
592630 }
593631
594632 /**
@@ -723,6 +761,10 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
723761 queueMap .put (queue .getIdentifier (), queue );
724762 }
725763
764+ final Set <StandardContentClaim > truncationEligibleClaims = new HashSet <>();
765+ final Set <ContentClaim > forbiddenTruncationClaims = new HashSet <>();
766+ final Map <ResourceClaim , ContentClaim > latestContentClaimByResourceClaim = new HashMap <>();
767+
726768 final List <SerializedRepositoryRecord > dropRecords = new ArrayList <>();
727769 int numFlowFilesMissingQueue = 0 ;
728770 long maxId = 0 ;
@@ -748,6 +790,15 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
748790 }
749791
750792 final ContentClaim claim = record .getContentClaim ();
793+
794+ // Track the latest Content Claim for each Resource Claim so that we can determine which claims are eligible for truncation.
795+ if (claim != null ) {
796+ final ContentClaim latestContentClaim = latestContentClaimByResourceClaim .get (claim .getResourceClaim ());
797+ if (latestContentClaim == null || claim .getOffset () > latestContentClaim .getOffset ()) {
798+ latestContentClaimByResourceClaim .put (claim .getResourceClaim (), claim );
799+ }
800+ }
801+
751802 final FlowFileQueue flowFileQueue = queueMap .get (queueId );
752803 final boolean orphaned = flowFileQueue == null ;
753804 if (orphaned ) {
@@ -777,6 +828,18 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
777828
778829 continue ;
779830 } else if (claim != null ) {
831+ // If the claim exceeds the max appendable claim length on its own and doesn't start the Resource Claim,
832+ // we will consider it to be eligible for truncation. However, if there are multiple FlowFiles sharing the
833+ // same claim, we cannot truncate it because doing so would affect the other FlowFiles.
834+ if (claim .getOffset () > 0 && claim .getLength () > truncationThreshold && claim instanceof final StandardContentClaim scc ) {
835+ if (forbiddenTruncationClaims .contains (claim ) || truncationEligibleClaims .contains (scc )) {
836+ truncationEligibleClaims .remove (scc );
837+ forbiddenTruncationClaims .add (scc );
838+ } else {
839+ truncationEligibleClaims .add (scc );
840+ }
841+ }
842+
780843 claimManager .incrementClaimantCount (claim .getResourceClaim ());
781844 }
782845
@@ -786,6 +849,16 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
786849 // If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected.
787850 recoveredRecords = null ;
788851
852+ // If any Content Claim was determined to be truncatable, mark it as such now.
853+ for (final StandardContentClaim eligible : truncationEligibleClaims ) {
854+ final ContentClaim latestForResource = latestContentClaimByResourceClaim .get (eligible .getResourceClaim ());
855+ if (!Objects .equals (eligible , latestForResource )) {
856+ continue ;
857+ }
858+
859+ eligible .setTruncationCandidate (true );
860+ }
861+
789862 // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
790863 // return the appropriate number.
791864 flowFileSequenceGenerator .set (maxId + 1 );
@@ -852,7 +925,7 @@ public long getNextFlowFileSequence() {
852925 }
853926
854927 @ Override
855- public long getMaxFlowFileIdentifier () throws IOException {
928+ public long getMaxFlowFileIdentifier () {
856929 // flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong
857930 return flowFileSequenceGenerator .get () - 1 ;
858931 }
0 commit comments