diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java index 749ca1d04e99..f242a01976c7 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java @@ -24,6 +24,8 @@ import org.apache.nifi.provenance.serialization.EventFileCompressor; import org.apache.nifi.provenance.store.iterator.AggregateEventIterator; import org.apache.nifi.provenance.store.iterator.EventIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -37,9 +39,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; public class PartitionedWriteAheadEventStore extends PartitionedEventStore { + private static final Logger logger = LoggerFactory.getLogger(PartitionedWriteAheadEventStore.class); + private static final long REINDEX_PROGRESS_LOG_INTERVAL_MINUTES = 1; private final BlockingQueue filesToCompress; private final List partitions; private final RepositoryConfiguration repoConfig; @@ -125,14 +131,35 @@ public void reindexLatestEvents(final EventIndex eventIndex) { } executor.shutdown(); + + final long startNanos = System.nanoTime(); + final long warnIntervalNanos = TimeUnit.MINUTES.toNanos(REINDEX_PROGRESS_LOG_INTERVAL_MINUTES); + long nextWarnAtNanos = warnIntervalNanos; + for (final Future future : futures) { - try { - future.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Failed to re-index events because Thread was interrupted", e); - } catch (ExecutionException e) { - throw new RuntimeException("Failed to re-index events", e); + while (true) { + try { + future.get(REINDEX_PROGRESS_LOG_INTERVAL_MINUTES, TimeUnit.MINUTES); + break; + } catch (final TimeoutException e) { + final long elapsedNanos = System.nanoTime() - startNanos; + if (elapsedNanos >= nextWarnAtNanos) { + long pending = 0; + for (final Future f : futures) { + if (!f.isDone()) { + pending++; + } + } + logger.info("Provenance re-indexing still in progress after {} seconds; {} of {} partitions still pending", + TimeUnit.NANOSECONDS.toSeconds(elapsedNanos), pending, numPartitions); + nextWarnAtNanos = elapsedNanos + warnIntervalNanos; + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to re-index events because Thread was interrupted", e); + } catch (final ExecutionException e) { + throw new RuntimeException("Failed to re-index events", e); + } } } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index d1a51c9bed46..b565bf2e8aaf 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -74,6 +74,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -89,6 +90,7 @@ public class FileSystemRepository implements ContentRepository { public static final long MIN_CLEANUP_INTERVAL_MILLIS = TimeUnit.SECONDS.toMillis(1L); public static final long DEFAULT_CLEANUP_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1L); public static final String ARCHIVE_DIR_NAME = "archive"; + private static final long ARCHIVE_SCAN_PROGRESS_LOG_INTERVAL_MINUTES = 1L; // 100 MB cap for the configurable NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE property to prevent // unnecessarily large resource claim files public static final String APPENDABLE_CLAIM_LENGTH_CAP = "100 MB"; @@ -332,15 +334,39 @@ private synchronized void initializeRepository() throws IOException { executor.shutdown(); - // Wait for all futures to complete + // Wait for all futures to complete, logging a periodic warning if the archive scan is taking a long time. + // On busy installs the archive directories can hold large numbers of files. + final long startNanos = System.nanoTime(); + final long warnIntervalNanos = TimeUnit.MINUTES.toNanos(ARCHIVE_SCAN_PROGRESS_LOG_INTERVAL_MINUTES); + long nextWarnAtNanos = warnIntervalNanos; + for (final Future future : futures) { - try { - future.get(); - } catch (final ExecutionException | InterruptedException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw new RuntimeException(e); + while (true) { + try { + future.get(ARCHIVE_SCAN_PROGRESS_LOG_INTERVAL_MINUTES, TimeUnit.MINUTES); + break; + } catch (final TimeoutException e) { + final long elapsedNanos = System.nanoTime() - startNanos; + if (elapsedNanos >= nextWarnAtNanos) { + long pending = 0; + for (final Future f : futures) { + if (!f.isDone()) { + pending++; + } + } + LOG.info("Content repository archive directory scan still in progress after {} seconds; {} of {} containers still pending", + TimeUnit.NANOSECONDS.toSeconds(elapsedNanos), pending, futures.size()); + nextWarnAtNanos = elapsedNanos + warnIntervalNanos; + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to scan archive directories because Thread was interrupted", e); + } catch (final ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new RuntimeException("Failed to scan archive directories", e); + } } } }