Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.api.stats.TimeSeries;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
Expand Down Expand Up @@ -134,6 +135,8 @@ public class AsyncIndexUpdate implements Runnable, Closeable {

private final IndexEditorProvider provider;

private final CatchUpRunner catchUpRunner;

/**
* Property name which stores the timestamp upto which the repository is
* indexed
Expand Down Expand Up @@ -225,6 +228,7 @@ public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
this.lastIndexedTo = lastIndexedTo(name);
this.store = requireNonNull(store);
this.provider = requireNonNull(provider);
this.catchUpRunner = new CatchUpRunner(store, provider, name);
this.switchOnSync = switchOnSync;
this.leaseTimeOut = DEFAULT_ASYNC_TIMEOUT;
this.statisticsProvider = statsProvider;
Expand Down Expand Up @@ -709,6 +713,217 @@ private static NodeBuilder childBuilder(NodeBuilder nb, String path) {
return nb;
}

private static void setTrackingNodeType(NodeBuilder node) {
if (!node.hasProperty(JcrConstants.JCR_PRIMARYTYPE)) {
node.setProperty(JcrConstants.JCR_PRIMARYTYPE, "oak:Unstructured", Type.NAME);
}
}

/**
* Detects targets newly added to {@code storeTargets} on existing indexes
* and writes {@link CatchUpCapable#CATCH_UP_FROM_START} to the tracking node
* for each such target.
*
* <p>New indexes (not present in {@code before}) are skipped — they are
* handled by the normal reindex mechanism.</p>
*
* <p>When {@code before} is {@code MISSING_NODE} (first indexing run, no previous
* checkpoint), the before-index is empty, so no index is treated as "existing" and
* this method is a no-op — which is the correct behaviour.</p>
*/
void detectNewTargets(NodeBuilder builder, NodeState before, NodeState after) {
NodeState afterIndex = after.getChildNode("oak:index");
NodeState beforeIndex = before.getChildNode("oak:index");

for (String indexName : afterIndex.getChildNodeNames()) {
NodeState afterDef = afterIndex.getChildNode(indexName);
NodeState beforeDef = beforeIndex.getChildNode(indexName);

if (!beforeDef.exists()) {
continue; // new index — reindex handles it
}

if (!isIndexOnLane(afterDef)) {
continue;
}

PropertyState storeTargetsAfter = afterDef.getProperty("storeTargets");
if (storeTargetsAfter == null) {
continue;
}

Set<String> previousTargets = new HashSet<>();
PropertyState storeTargetsBefore = beforeDef.getProperty("storeTargets");
if (storeTargetsBefore != null) {
for (String t : storeTargetsBefore.getValue(Type.STRINGS)) {
previousTargets.add(t);
}
} else {
// Migration from legacy 'type' property: if the index had a 'type' property
// before and it matches one of the new storeTargets, that target should NOT
// be marked for catch-up since it was already being indexed.
PropertyState typeBefore = beforeDef.getProperty("type");
if (typeBefore != null) {
previousTargets.add(typeBefore.getValue(Type.STRING));
}
}

// Check if tracking node exists in the BEFORE state to avoid re-creating
// tracking properties for targets that have already graduated
NodeState beforeTracking = beforeDef.getChildNode(CatchUpCapable.CATCH_UP_TRACKING_NODE);

NodeBuilder trackingBuilder = null;
for (String target : storeTargetsAfter.getValue(Type.STRINGS)) {
if (!previousTargets.contains(target)) {
// Only mark for catch-up if there's no tracking property in the BEFORE state
// (i.e., this is truly a new target, not one that has already graduated)
if (!beforeTracking.hasProperty(target)) {
if (trackingBuilder == null) {
trackingBuilder = builder.child("oak:index").child(indexName)
.child(CatchUpCapable.CATCH_UP_TRACKING_NODE);
setTrackingNodeType(trackingBuilder);
}
log.info("[{}] New storeTarget '{}' on index '{}', marking for catch-up",
name, target, indexName);
trackingBuilder.setProperty(target, CatchUpCapable.CATCH_UP_FROM_START);
}
}
}
}
}

/**
* Graduates any catch-up target that has caught up to the current content state.
* A target is considered caught up when the content diff between its tracking
* checkpoint and the current beforeCheckpoint is empty (no content changes).
* The tracking property is removed so the target enters the normal indexing flow.
*/
void graduateTargets(NodeBuilder builder, String beforeCheckpoint, NodeState after) {
if (beforeCheckpoint == null) {
return;
}
NodeState oakIndex = after.getChildNode("oak:index");
for (String indexName : oakIndex.getChildNodeNames()) {
NodeState indexDef = oakIndex.getChildNode(indexName);
NodeState trackingNode = indexDef.getChildNode(CatchUpCapable.CATCH_UP_TRACKING_NODE);
if (!trackingNode.exists()) {
continue;
}
if (!isIndexOnLane(indexDef)) {
continue;
}
NodeBuilder trackingBuilder = builder.child("oak:index").child(indexName)
.child(CatchUpCapable.CATCH_UP_TRACKING_NODE);
boolean anyRemaining = false;
for (PropertyState prop : trackingNode.getProperties()) {
String targetType = prop.getName();
if (targetType.startsWith(":") || targetType.startsWith("jcr:")) {
continue; // skip Oak internal properties and JCR properties
}
String trackingCheckpoint = prop.getValue(Type.STRING);

// Check if the target has caught up by comparing content states
if (hasTargetCaughtUp(trackingCheckpoint, beforeCheckpoint)) {
log.info("[{}] Graduating catch-up target '{}' on index '{}' — caught up from {} to {}",
name, targetType, indexName, trackingCheckpoint, beforeCheckpoint);
trackingBuilder.removeProperty(targetType);
} else {
anyRemaining = true;
}
}
if (!anyRemaining) {
trackingBuilder.remove();
}
}
}

/**
* Returns {@code true} if the given index definition belongs to this lane.
* An index belongs to a lane when its {@code async} property contains the lane name.
*/
private boolean isIndexOnLane(NodeState indexDef) {
PropertyState asyncProp = indexDef.getProperty(IndexConstants.ASYNC_PROPERTY_NAME);
if (asyncProp == null) {
return name == null;
}
if (name == null) {
return false;
}
for (String value : asyncProp.getValue(Type.STRINGS)) {
if (name.equals(value)) {
return true;
}
}
return false;
}

/**
* Checks if a catch-up target has caught up to the current content state.
* Returns true if the content diff between the tracking checkpoint and the
* current checkpoint is empty (no indexable content changes).
*/
private boolean hasTargetCaughtUp(String trackingCheckpoint, String currentCheckpoint) {
if (CatchUpCapable.CATCH_UP_FROM_START.equals(trackingCheckpoint)) {
return false; // hasn't started yet
}
if (trackingCheckpoint.equals(currentCheckpoint)) {
return true; // exact match
}

// Check if there are any content changes between the two checkpoints
try {
NodeState trackingState = store.retrieve(trackingCheckpoint);
NodeState currentState = store.retrieve(currentCheckpoint);

if (trackingState == null || currentState == null) {
log.debug("[{}] Cannot compare checkpoints {} and {} - one or both not found",
name, trackingCheckpoint, currentCheckpoint);
return false;
}

// Compare the content states (excluding /oak:index and hidden node changes)
return noVisibleChangesExcludingIndex(trackingState, currentState);
} catch (Exception e) {
log.warn("[{}] Failed to compare checkpoints {} and {}: {}",
name, trackingCheckpoint, currentCheckpoint, e.getMessage());
return false;
}
}

/**
* Checks whether there are no visible content changes between the given states,
* excluding changes to /oak:index (which are not indexable content).
*/
private static boolean noVisibleChangesExcludingIndex(NodeState before, NodeState after) {
return after.compareAgainstBaseState(before, new NodeStateDiff() {
@Override
public boolean propertyAdded(PropertyState after) {
return isHidden(after.getName());
}
@Override
public boolean propertyChanged(PropertyState before, PropertyState after) {
return isHidden(after.getName());
}
@Override
public boolean propertyDeleted(PropertyState before) {
return isHidden(before.getName());
}
@Override
public boolean childNodeAdded(String name, NodeState after) {
return isHidden(name) || "oak:index".equals(name);
}
@Override
public boolean childNodeChanged(String name, NodeState before, NodeState after) {
return isHidden(name) || "oak:index".equals(name)
|| after.compareAgainstBaseState(before, this);
}
@Override
public boolean childNodeDeleted(String name, NodeState before) {
return isHidden(name) || "oak:index".equals(name);
}
});
}

private void maybeCleanUpCheckpoints() {
if (cleanupIntervalMinutes < 0) {
log.debug("checkpoint cleanup skipped because cleanupIntervalMinutes set to: " + cleanupIntervalMinutes);
Expand Down Expand Up @@ -803,6 +1018,8 @@ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
NodeBuilder builder = store.getRoot().builder();

markFailingIndexesAsCorrupt(builder);
detectNewTargets(builder, before, after);
graduateTargets(builder, beforeCheckpoint, after);

CommitInfo info = new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN,
Map.of(IndexConstants.CHECKPOINT_CREATION_TIME, afterTime));
Expand Down Expand Up @@ -859,6 +1076,17 @@ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
checkpointToReleaseRef.set(beforeCheckpoint);
indexingFailed = false;

try {
// Run catch-up within the lane's lease scope to preserve mutual exclusion
// in a clustered environment. Do not move this call outside the lease boundary.
// Use store.getRoot() AFTER the merge to get the latest committed state including
// the tracking nodes that were just created by detectNewTargets().
NodeState currentRoot = store.getRoot();
catchUpRunner.run(currentRoot, after, afterCheckpoint);
} catch (Exception e) {
log.warn("[{}] catch-up run failed; will retry next cycle", name, e);
}

if (indexUpdate.isReindexingPerformed()) {
log.info("[{}] Reindexing completed for indexes: {} in {} ({} ms)",
name, indexUpdate.getReindexStats(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;

/**
* Implemented by index editor providers that support per-target catch-up indexing.
*
* <p>When a new target is added to {@code storeTargets} on an existing index,
* the catch-up mechanism runs an {@code EditorDiff} from the last known checkpoint
* (or {@link #CATCH_UP_FROM_START} for a full traversal) to the current lane
* checkpoint, without affecting any other target.</p>
*
* <p>The tracking state is kept under a {@value #CATCH_UP_TRACKING_NODE} child
* node on the index definition. Each property on that node names a target type
* (as it appears in {@code storeTargets}) and holds the checkpoint up to which
* that target has been indexed. A missing property means the target is in sync
* with the lane.</p>
*/
public interface CatchUpCapable {

/**
* Sentinel value stored on the tracking node to request a full traversal
* (equivalent to {@code reindex=true} but scoped to a single target).
*/
String CATCH_UP_FROM_START = "INITIAL";

/**
* Name of the child node under each index definition that holds
* per-target catch-up state.
*/
String CATCH_UP_TRACKING_NODE = "tracking";

// Marker interface - providers that implement this support catch-up indexing.
// Catch-up uses the same getIndexEditor() method as normal indexing, just with
// a different targetType and checkpoint management.
}
Loading