Skip to content

Commit d9c9208

Browse files
committed
feat: multi-target index routing with catch-up lifecycle
- Add multi-target routing helpers and active-target resolution - Add per-target catch-up indexing lifecycle - Simplify catch-up execution and scope graduation to owning lane - Align with marker-based catch-up provider contract Made-with: Cursor
1 parent 144b835 commit d9c9208

13 files changed

Lines changed: 1960 additions & 14 deletions

File tree

oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
import com.codahale.metrics.MetricRegistry;
5454
import org.apache.commons.lang3.exception.ExceptionUtils;
55+
import org.apache.jackrabbit.JcrConstants;
5556
import org.apache.jackrabbit.api.stats.TimeSeries;
5657
import org.apache.jackrabbit.oak.api.CommitFailedException;
5758
import org.apache.jackrabbit.oak.api.PropertyState;
@@ -134,6 +135,8 @@ public class AsyncIndexUpdate implements Runnable, Closeable {
134135

135136
private final IndexEditorProvider provider;
136137

138+
private final CatchUpRunner catchUpRunner;
139+
137140
/**
138141
* Property name which stores the timestamp upto which the repository is
139142
* indexed
@@ -225,6 +228,7 @@ public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
225228
this.lastIndexedTo = lastIndexedTo(name);
226229
this.store = requireNonNull(store);
227230
this.provider = requireNonNull(provider);
231+
this.catchUpRunner = new CatchUpRunner(store, provider, name);
228232
this.switchOnSync = switchOnSync;
229233
this.leaseTimeOut = DEFAULT_ASYNC_TIMEOUT;
230234
this.statisticsProvider = statsProvider;
@@ -709,6 +713,217 @@ private static NodeBuilder childBuilder(NodeBuilder nb, String path) {
709713
return nb;
710714
}
711715

716+
private static void setTrackingNodeType(NodeBuilder node) {
717+
if (!node.hasProperty(JcrConstants.JCR_PRIMARYTYPE)) {
718+
node.setProperty(JcrConstants.JCR_PRIMARYTYPE, "oak:Unstructured", Type.NAME);
719+
}
720+
}
721+
722+
/**
723+
* Detects targets newly added to {@code storeTargets} on existing indexes
724+
* and writes {@link CatchUpCapable#CATCH_UP_FROM_START} to the tracking node
725+
* for each such target.
726+
*
727+
* <p>New indexes (not present in {@code before}) are skipped — they are
728+
* handled by the normal reindex mechanism.</p>
729+
*
730+
* <p>When {@code before} is {@code MISSING_NODE} (first indexing run, no previous
731+
* checkpoint), the before-index is empty, so no index is treated as "existing" and
732+
* this method is a no-op — which is the correct behaviour.</p>
733+
*/
734+
void detectNewTargets(NodeBuilder builder, NodeState before, NodeState after) {
735+
NodeState afterIndex = after.getChildNode("oak:index");
736+
NodeState beforeIndex = before.getChildNode("oak:index");
737+
738+
for (String indexName : afterIndex.getChildNodeNames()) {
739+
NodeState afterDef = afterIndex.getChildNode(indexName);
740+
NodeState beforeDef = beforeIndex.getChildNode(indexName);
741+
742+
if (!beforeDef.exists()) {
743+
continue; // new index — reindex handles it
744+
}
745+
746+
if (!isIndexOnLane(afterDef)) {
747+
continue;
748+
}
749+
750+
PropertyState storeTargetsAfter = afterDef.getProperty("storeTargets");
751+
if (storeTargetsAfter == null) {
752+
continue;
753+
}
754+
755+
Set<String> previousTargets = new HashSet<>();
756+
PropertyState storeTargetsBefore = beforeDef.getProperty("storeTargets");
757+
if (storeTargetsBefore != null) {
758+
for (String t : storeTargetsBefore.getValue(Type.STRINGS)) {
759+
previousTargets.add(t);
760+
}
761+
} else {
762+
// Migration from legacy 'type' property: if the index had a 'type' property
763+
// before and it matches one of the new storeTargets, that target should NOT
764+
// be marked for catch-up since it was already being indexed.
765+
PropertyState typeBefore = beforeDef.getProperty("type");
766+
if (typeBefore != null) {
767+
previousTargets.add(typeBefore.getValue(Type.STRING));
768+
}
769+
}
770+
771+
// Check if tracking node exists in the BEFORE state to avoid re-creating
772+
// tracking properties for targets that have already graduated
773+
NodeState beforeTracking = beforeDef.getChildNode(CatchUpCapable.CATCH_UP_TRACKING_NODE);
774+
775+
NodeBuilder trackingBuilder = null;
776+
for (String target : storeTargetsAfter.getValue(Type.STRINGS)) {
777+
if (!previousTargets.contains(target)) {
778+
// Only mark for catch-up if there's no tracking property in the BEFORE state
779+
// (i.e., this is truly a new target, not one that has already graduated)
780+
if (!beforeTracking.hasProperty(target)) {
781+
if (trackingBuilder == null) {
782+
trackingBuilder = builder.child("oak:index").child(indexName)
783+
.child(CatchUpCapable.CATCH_UP_TRACKING_NODE);
784+
setTrackingNodeType(trackingBuilder);
785+
}
786+
log.info("[{}] New storeTarget '{}' on index '{}', marking for catch-up",
787+
name, target, indexName);
788+
trackingBuilder.setProperty(target, CatchUpCapable.CATCH_UP_FROM_START);
789+
}
790+
}
791+
}
792+
}
793+
}
794+
795+
/**
796+
* Graduates any catch-up target that has caught up to the current content state.
797+
* A target is considered caught up when the content diff between its tracking
798+
* checkpoint and the current beforeCheckpoint is empty (no content changes).
799+
* The tracking property is removed so the target enters the normal indexing flow.
800+
*/
801+
void graduateTargets(NodeBuilder builder, String beforeCheckpoint, NodeState after) {
802+
if (beforeCheckpoint == null) {
803+
return;
804+
}
805+
NodeState oakIndex = after.getChildNode("oak:index");
806+
for (String indexName : oakIndex.getChildNodeNames()) {
807+
NodeState indexDef = oakIndex.getChildNode(indexName);
808+
NodeState trackingNode = indexDef.getChildNode(CatchUpCapable.CATCH_UP_TRACKING_NODE);
809+
if (!trackingNode.exists()) {
810+
continue;
811+
}
812+
if (!isIndexOnLane(indexDef)) {
813+
continue;
814+
}
815+
NodeBuilder trackingBuilder = builder.child("oak:index").child(indexName)
816+
.child(CatchUpCapable.CATCH_UP_TRACKING_NODE);
817+
boolean anyRemaining = false;
818+
for (PropertyState prop : trackingNode.getProperties()) {
819+
String targetType = prop.getName();
820+
if (targetType.startsWith(":") || targetType.startsWith("jcr:")) {
821+
continue; // skip Oak internal properties and JCR properties
822+
}
823+
String trackingCheckpoint = prop.getValue(Type.STRING);
824+
825+
// Check if the target has caught up by comparing content states
826+
if (hasTargetCaughtUp(trackingCheckpoint, beforeCheckpoint)) {
827+
log.info("[{}] Graduating catch-up target '{}' on index '{}' — caught up from {} to {}",
828+
name, targetType, indexName, trackingCheckpoint, beforeCheckpoint);
829+
trackingBuilder.removeProperty(targetType);
830+
} else {
831+
anyRemaining = true;
832+
}
833+
}
834+
if (!anyRemaining) {
835+
trackingBuilder.remove();
836+
}
837+
}
838+
}
839+
840+
/**
841+
* Returns {@code true} if the given index definition belongs to this lane.
842+
* An index belongs to a lane when its {@code async} property contains the lane name.
843+
*/
844+
private boolean isIndexOnLane(NodeState indexDef) {
845+
PropertyState asyncProp = indexDef.getProperty(IndexConstants.ASYNC_PROPERTY_NAME);
846+
if (asyncProp == null) {
847+
return name == null;
848+
}
849+
if (name == null) {
850+
return false;
851+
}
852+
for (String value : asyncProp.getValue(Type.STRINGS)) {
853+
if (name.equals(value)) {
854+
return true;
855+
}
856+
}
857+
return false;
858+
}
859+
860+
/**
861+
* Checks if a catch-up target has caught up to the current content state.
862+
* Returns true if the content diff between the tracking checkpoint and the
863+
* current checkpoint is empty (no indexable content changes).
864+
*/
865+
private boolean hasTargetCaughtUp(String trackingCheckpoint, String currentCheckpoint) {
866+
if (CatchUpCapable.CATCH_UP_FROM_START.equals(trackingCheckpoint)) {
867+
return false; // hasn't started yet
868+
}
869+
if (trackingCheckpoint.equals(currentCheckpoint)) {
870+
return true; // exact match
871+
}
872+
873+
// Check if there are any content changes between the two checkpoints
874+
try {
875+
NodeState trackingState = store.retrieve(trackingCheckpoint);
876+
NodeState currentState = store.retrieve(currentCheckpoint);
877+
878+
if (trackingState == null || currentState == null) {
879+
log.debug("[{}] Cannot compare checkpoints {} and {} - one or both not found",
880+
name, trackingCheckpoint, currentCheckpoint);
881+
return false;
882+
}
883+
884+
// Compare the content states (excluding /oak:index and hidden node changes)
885+
return noVisibleChangesExcludingIndex(trackingState, currentState);
886+
} catch (Exception e) {
887+
log.warn("[{}] Failed to compare checkpoints {} and {}: {}",
888+
name, trackingCheckpoint, currentCheckpoint, e.getMessage());
889+
return false;
890+
}
891+
}
892+
893+
/**
894+
* Checks whether there are no visible content changes between the given states,
895+
* excluding changes to /oak:index (which are not indexable content).
896+
*/
897+
private static boolean noVisibleChangesExcludingIndex(NodeState before, NodeState after) {
898+
return after.compareAgainstBaseState(before, new NodeStateDiff() {
899+
@Override
900+
public boolean propertyAdded(PropertyState after) {
901+
return isHidden(after.getName());
902+
}
903+
@Override
904+
public boolean propertyChanged(PropertyState before, PropertyState after) {
905+
return isHidden(after.getName());
906+
}
907+
@Override
908+
public boolean propertyDeleted(PropertyState before) {
909+
return isHidden(before.getName());
910+
}
911+
@Override
912+
public boolean childNodeAdded(String name, NodeState after) {
913+
return isHidden(name) || "oak:index".equals(name);
914+
}
915+
@Override
916+
public boolean childNodeChanged(String name, NodeState before, NodeState after) {
917+
return isHidden(name) || "oak:index".equals(name)
918+
|| after.compareAgainstBaseState(before, this);
919+
}
920+
@Override
921+
public boolean childNodeDeleted(String name, NodeState before) {
922+
return isHidden(name) || "oak:index".equals(name);
923+
}
924+
});
925+
}
926+
712927
private void maybeCleanUpCheckpoints() {
713928
if (cleanupIntervalMinutes < 0) {
714929
log.debug("checkpoint cleanup skipped because cleanupIntervalMinutes set to: " + cleanupIntervalMinutes);
@@ -803,6 +1018,8 @@ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
8031018
NodeBuilder builder = store.getRoot().builder();
8041019

8051020
markFailingIndexesAsCorrupt(builder);
1021+
detectNewTargets(builder, before, after);
1022+
graduateTargets(builder, beforeCheckpoint, after);
8061023

8071024
CommitInfo info = new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN,
8081025
Map.of(IndexConstants.CHECKPOINT_CREATION_TIME, afterTime));
@@ -859,6 +1076,17 @@ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
8591076
checkpointToReleaseRef.set(beforeCheckpoint);
8601077
indexingFailed = false;
8611078

1079+
try {
1080+
// Run catch-up within the lane's lease scope to preserve mutual exclusion
1081+
// in a clustered environment. Do not move this call outside the lease boundary.
1082+
// Use store.getRoot() AFTER the merge to get the latest committed state including
1083+
// the tracking nodes that were just created by detectNewTargets().
1084+
NodeState currentRoot = store.getRoot();
1085+
catchUpRunner.run(currentRoot, after, afterCheckpoint);
1086+
} catch (Exception e) {
1087+
log.warn("[{}] catch-up run failed; will retry next cycle", name, e);
1088+
}
1089+
8621090
if (indexUpdate.isReindexingPerformed()) {
8631091
log.info("[{}] Reindexing completed for indexes: {} in {} ({} ms)",
8641092
name, indexUpdate.getReindexStats(),
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.jackrabbit.oak.plugins.index;
18+
19+
/**
20+
* Implemented by index editor providers that support per-target catch-up indexing.
21+
*
22+
* <p>When a new target is added to {@code storeTargets} on an existing index,
23+
* the catch-up mechanism runs an {@code EditorDiff} from the last known checkpoint
24+
* (or {@link #CATCH_UP_FROM_START} for a full traversal) to the current lane
25+
* checkpoint, without affecting any other target.</p>
26+
*
27+
* <p>The tracking state is kept under a {@value #CATCH_UP_TRACKING_NODE} child
28+
* node on the index definition. Each property on that node names a target type
29+
* (as it appears in {@code storeTargets}) and holds the checkpoint up to which
30+
* that target has been indexed. A missing property means the target is in sync
31+
* with the lane.</p>
32+
*/
33+
public interface CatchUpCapable {
34+
35+
/**
36+
* Sentinel value stored on the tracking node to request a full traversal
37+
* (equivalent to {@code reindex=true} but scoped to a single target).
38+
*/
39+
String CATCH_UP_FROM_START = "INITIAL";
40+
41+
/**
42+
* Name of the child node under each index definition that holds
43+
* per-target catch-up state.
44+
*/
45+
String CATCH_UP_TRACKING_NODE = "tracking";
46+
47+
// Marker interface - providers that implement this support catch-up indexing.
48+
// Catch-up uses the same getIndexEditor() method as normal indexing, just with
49+
// a different targetType and checkpoint management.
50+
}

0 commit comments

Comments
 (0)