Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ public void afterBulk(final long executionId, final List<IndexBulkItemResult> re
}

handleSuccess(successful);
// 50% failure rate forces a rebuild of the BulkProcessor.
// Guard: skip rebuild when the batch was empty — an empty response list
// with lastBatchSize == 0 is not an error condition.
// 50% failure rate guard: log a warning so the failure is observable.
// No explicit rebuild needed — ReindexThread creates a fresh processor per batch,
// so the next batch will automatically start with a clean processor.
if (lastBatchSize > 0 && (totalResponses == 0 || ((float) successful.size() / totalResponses < .5))) {
ReindexThread.rebuildBulkIndexer();
Logger.warn(this.getClass(),
"High bulk-index failure rate detected (>50%) — next batch will use a fresh processor.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
import com.dotcms.concurrent.DotSubmitter;
import com.dotcms.content.elasticsearch.business.ContentletIndexAPI;
import com.dotcms.content.elasticsearch.util.ESReindexationProcessStatus;
import com.dotcms.content.index.domain.IndexBulkProcessor;
import com.dotcms.content.model.annotation.IndexLibraryIndependent;
import com.dotcms.notifications.bean.NotificationLevel;
import com.dotcms.notifications.bean.NotificationType;
import com.dotcms.notifications.business.NotificationAPI;
import com.dotcms.shutdown.ShutdownCoordinator;
import com.dotcms.util.I18NMessage;
import com.dotmarketing.business.*;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.business.CacheLocator;
import com.dotmarketing.business.Role;
import com.dotmarketing.business.RoleAPI;
import com.dotmarketing.business.UserAPI;
import com.dotmarketing.db.DbConnectionFactory;
import com.dotmarketing.db.HibernateUtil;
import com.dotmarketing.exception.DotDataException;
Expand All @@ -24,16 +30,12 @@
import com.liferay.portal.language.LanguageException;
import com.liferay.portal.model.User;
import io.vavr.Lazy;
import org.apache.felix.framework.OSGISystem;
import com.dotcms.content.index.domain.IndexBulkProcessor;

import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.framework.OSGISystem;
Comment thread
fabrizzio-dotCMS marked this conversation as resolved.

/**
* This thread is in charge of re-indexing the contenlet information placed in the
Expand Down Expand Up @@ -69,6 +71,7 @@
* @version 3.3
* @since Mar 22, 2012
*/
@IndexLibraryIndependent
public class ReindexThread {

private enum ThreadState {
Expand Down Expand Up @@ -118,19 +121,12 @@ private enum ThreadState {
"WAIT_BEFORE_PAUSE_SECONDS", 0);


private AtomicReference<ThreadState> state = new AtomicReference<>(ThreadState.STOPPED);
private final AtomicReference<ThreadState> state = new AtomicReference<>(ThreadState.STOPPED);


private final static String REINDEX_THREAD_PAUSED = "REINDEX_THREAD_PAUSED";
private final static Lazy<SystemCache> cache = Lazy.of(() -> CacheLocator.getSystemCache());

private final static AtomicBoolean rebuildBulkIndexer = new AtomicBoolean(false);

public static void rebuildBulkIndexer() {
Logger.warn(ReindexThread.class, "--- ReindexThread BulkProcessor needs to be Rebuilt");
ReindexThread.rebuildBulkIndexer.set(true);
}

private ReindexThread() {

this(APILocator.getReindexQueueAPI(), APILocator.getNotificationAPI(),
Expand Down Expand Up @@ -174,24 +170,12 @@ long totalESPuts() {
}


private IndexBulkProcessor closeBulkProcessor(final IndexBulkProcessor bulkProcessor)
throws InterruptedException {
if (bulkProcessor != null) {
try {
bulkProcessor.close();
} catch (final Exception e) {
Logger.warnAndDebug(ReindexThread.class, "Error closing bulk processor: " + e.getMessage(), e);
}
}
rebuildBulkIndexer.set(false);
return null;
}


private IndexBulkProcessor finalizeReIndex(IndexBulkProcessor bulkProcessor)
/**
* Handles queue-drained logic: attempts index switchover and pauses the thread
* if no full reindex is in progress.
*/
private void finalizeReIndex()
throws InterruptedException, LanguageException, DotDataException, SQLException {
bulkProcessor = closeBulkProcessor(bulkProcessor);

// Don't perform switchover operations during shutdown
if (!ShutdownCoordinator.isRequestDraining()) {
switchOverIfNeeded();
Expand All @@ -201,8 +185,6 @@ private IndexBulkProcessor finalizeReIndex(IndexBulkProcessor bulkProcessor)
} else {
Logger.debug(this, "Skipping reindex finalization due to shutdown in progress");
}
return bulkProcessor;

}


Expand All @@ -212,13 +194,15 @@ private IndexBulkProcessor finalizeReIndex(IndexBulkProcessor bulkProcessor)
* Elastic index. If that's not possible, a notification containing the content identifier will
* be sent to the user via the Notifications API to take care of the problem as soon as
* possible.
*
* <p><strong>Thread-safety:</strong> each batch gets its own {@link BulkProcessorListener}
* and {@link IndexBulkProcessor}. The processor is closed (blocking until {@code afterBulk}
* completes) before the next batch starts, so there is no shared mutable state between
* consecutive batches and no TOCTOU race on the processor reference.</p>
*/
private void runReindexLoop() {
IndexBulkProcessor bulkProcessor = null;
BulkProcessorListener bulkProcessorListener = null;
while (state.get() != ThreadState.STOPPED) {
try {
// Check for shutdown before doing any database operations
if (ShutdownCoordinator.isRequestDraining()) {
Logger.info(this, "Shutdown detected, stopping reindex operations");
break;
Expand All @@ -227,34 +211,29 @@ private void runReindexLoop() {
final Map<String, ReindexEntry> workingRecords = queueApi.findContentToReindex();

if (workingRecords.isEmpty()) {
bulkProcessor = finalizeReIndex(bulkProcessor);
}

if (!workingRecords.isEmpty()) {
// Check again before processing records
finalizeReIndex();
} else {
if (ShutdownCoordinator.isRequestDraining()) {
Logger.info(this, "Shutdown detected during record processing, stopping reindex operations");
break;
}

Logger.debug(this,
"Found " + workingRecords + " index items to process");

if (bulkProcessor == null || rebuildBulkIndexer.get()) {
closeBulkProcessor(bulkProcessor);
bulkProcessorListener = new BulkProcessorListener();
bulkProcessor = indexAPI.createBulkProcessor(bulkProcessorListener);
}
bulkProcessorListener.workingRecords.putAll(workingRecords);
indexAPI.appendToBulkProcessor(bulkProcessor, workingRecords.values());
contentletsIndexed += bulkProcessorListener.getContentletsIndexed();
// otherwise, reindex normally

Logger.debug(this, "Found " + workingRecords + " index items to process");

// Fresh listener per batch: each afterBulk callback resolves against its own
// immutable workingRecords snapshot — no race with the next putAll.
final BulkProcessorListener batchListener = new BulkProcessorListener();
batchListener.workingRecords.putAll(workingRecords);
try (final IndexBulkProcessor batchProcessor =
indexAPI.createBulkProcessor(batchListener)) {
indexAPI.appendToBulkProcessor(batchProcessor, workingRecords.values());
} // close() blocks until afterBulk completes before the next batch starts
contentletsIndexed += batchListener.getContentletsIndexed();
}

} catch (Throwable ex) {
// Check if this is a shutdown-related exception
if (isShutdownRelated(ex) || ShutdownCoordinator.isRequestDraining() ||
ex instanceof com.dotcms.shutdown.ShutdownException) {
if (isShutdownRelated(ex) || ShutdownCoordinator.isRequestDraining()
|| ex instanceof com.dotcms.shutdown.ShutdownException) {
Logger.debug(this, "ReindexThread stopping due to shutdown: " + ex.getMessage());
break;
}
Expand All @@ -265,15 +244,6 @@ private void runReindexLoop() {
}
sleep();
}

// Clean up bulk processor on exit
try {
if (bulkProcessor != null) {
closeBulkProcessor(bulkProcessor);
}
} catch (Exception e) {
Logger.debug(this, "Exception while closing bulk processor during shutdown: " + e.getMessage());
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.dotcms.api.system.event.Visibility;
import com.dotcms.api.system.event.verifier.RoleVerifier;
import com.dotcms.content.elasticsearch.business.ContentletIndexAPI;
import com.dotcms.content.index.domain.IndexBulkListener;
import com.dotcms.content.index.domain.IndexBulkProcessor;
import com.dotcms.notifications.bean.NotificationLevel;
import com.dotcms.notifications.bean.NotificationType;
import com.dotcms.notifications.business.NotificationAPI;
Expand All @@ -18,17 +20,35 @@
import com.dotmarketing.util.Config;
import com.liferay.portal.model.User;
import com.liferay.portal.util.WebKeys;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletContext;
import org.junit.After;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import javax.servlet.ServletContext;
import java.util.Locale;
import java.util.UUID;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Test for {@link ReindexThread}
Expand Down Expand Up @@ -117,16 +137,115 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}
}

/**
* Verifies that {@code runReindexLoop} creates a <em>fresh</em>
* {@link BulkProcessorListener} for every batch, eliminating the TOCTOU race
* where {@code putAll(batch N+1)} could interleave with {@code afterBulk(batch N)}
* reading {@code workingRecords} on the BulkProcessor callback thread.
*
* <p>The test runs the actual loop (via reflection) against fully mocked dependencies
* and uses a {@link CountDownLatch} to let exactly two batches complete before
* stopping the thread.</p>
*/
@Test
public void testPerBatchListenerIsolation() throws Exception {
// — mocks —
final ReindexQueueAPI queueApi = mock(ReindexQueueAPI.class);
final ContentletIndexAPI indexApi = mock(ContentletIndexAPI.class);
final IndexBulkProcessor mockProc = mock(IndexBulkProcessor.class);

final ReindexEntry e1 = mock(ReindexEntry.class);
final ReindexEntry e2 = mock(ReindexEntry.class);
final Map<String, ReindexEntry> batch1 = Map.of("id1", e1);
final Map<String, ReindexEntry> batch2 = Map.of("id2", e2);

// Alternate batch1 / batch2 so the queue is never empty and
// finalizeReIndex() (which uses static helpers) is never reached.
final AtomicInteger callCount = new AtomicInteger();
when(queueApi.findContentToReindex())
.thenAnswer(inv -> (callCount.getAndIncrement() % 2 == 0) ? batch1 : batch2);

// close() inside try-with-resources must be a no-op
doAnswer(inv -> null).when(mockProc).close();
when(indexApi.createBulkProcessor(any(IndexBulkListener.class))).thenReturn(mockProc);

// Count down once per batch so the test thread knows when to stop
final CountDownLatch twoBatches = new CountDownLatch(2);
doAnswer(inv -> { twoBatches.countDown(); return null; })
.when(indexApi).appendToBulkProcessor(any(), any());

// — instantiate (sets the static singleton used by stopThread) —
final ReindexThread thread = new ReindexThread(
queueApi,
mock(NotificationAPI.class),
mock(UserAPI.class),
mock(RoleAPI.class),
indexApi);
setStateRunning(thread);

// — run the private loop in a background thread —
final Method runLoop = ReindexThread.class.getDeclaredMethod("runReindexLoop");
runLoop.setAccessible(true);
final Thread loopThread = new Thread(() -> {
try {
runLoop.invoke(thread);
} catch (Exception ignored) {
// InvocationTargetException is expected if stopThread races with invoke
}
});
loopThread.start();

// — wait for two complete batches, then stop —
assertTrue("Two batches should complete within 5 s",
twoBatches.await(5, TimeUnit.SECONDS));
ReindexThread.stopThread();
loopThread.join(3_000);
assertFalse("runReindexLoop must exit after stopThread()", loopThread.isAlive());

// — verify per-batch isolation —
final ArgumentCaptor<IndexBulkListener> captor =
ArgumentCaptor.forClass(IndexBulkListener.class);
verify(indexApi, atLeast(2)).createBulkProcessor(captor.capture());

final List<IndexBulkListener> captured = captor.getAllValues();
final BulkProcessorListener l0 = (BulkProcessorListener) captured.get(0);
final BulkProcessorListener l1 = (BulkProcessorListener) captured.get(1);

assertNotSame(
"Each batch must receive a fresh listener — reusing listeners is the TOCTOU hazard",
l0, l1);
assertEquals("First listener must contain only batch-1 entries",
Set.of("id1"), l0.workingRecords.keySet());
assertEquals("Second listener must contain only batch-2 entries",
Set.of("id2"), l1.workingRecords.keySet());
}

@After
public void restore() {

//Restore the original version of the Verifier
PayloadVerifierFactory payloadVerifierFactory = PayloadVerifierFactory.getInstance();
payloadVerifierFactory.register(Visibility.ROLE, this.originalRoleVerifier);

if (originalRoleVerifier != null) {
PayloadVerifierFactory payloadVerifierFactory = PayloadVerifierFactory.getInstance();
payloadVerifierFactory.register(Visibility.ROLE, this.originalRoleVerifier);
}
}




/**
* Uses reflection to set the private {@code state} field of a {@link ReindexThread}
* instance to {@code ThreadState.RUNNING}, bypassing the private enum visibility.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static void setStateRunning(final ReindexThread thread) throws Exception {
final Field stateField = ReindexThread.class.getDeclaredField("state");
stateField.setAccessible(true);
final AtomicReference<Object> stateRef =
(AtomicReference<Object>) stateField.get(thread);

for (final Class<?> inner : ReindexThread.class.getDeclaredClasses()) {
if (inner.isEnum() && "ThreadState".equals(inner.getSimpleName())) {
stateRef.set(Enum.valueOf((Class<Enum>) inner, "RUNNING"));
return;
}
}
throw new IllegalStateException("ThreadState.RUNNING not found via reflection");
}
}
Loading