Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/org/labkey/api/exp/Lsid.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ static public String namespaceLikeString(String namespace)
return "urn:lsid:%:" + namespace + ".%:%";
}

static public String namespaceFilter(String columnName, String namespace)
static public SQLFragment namespaceFilter(Enum<?> column, String namespace)
{
return columnName + " LIKE '" + namespaceLikeString(namespace) + "'";
return new SQLFragment().appendIdentifier(column.name()).append(" LIKE ?").add(namespaceLikeString(namespace));
}

/**
Expand Down
19 changes: 17 additions & 2 deletions api/src/org/labkey/api/exp/api/ExperimentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,23 @@ enum DataTypeForExclusion
List<? extends ExpRun> getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol);

List<? extends ExpRun> getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol, @NotNull Predicate<ExpRun> filterFn);

List<? extends ExpRun> getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate<ExpRun> filterFn, @NotNull Container container);

/**
* @param filterSQL optional additional WHERE predicates; callers doing keyset pagination should include
* {@code ER.RowId > minRowId} here
* @param limit max rows to return; pass {@code Table.ALL_ROWS} (-1) for no limit
* @return up to {@code limit} ExpRuns in {@code container} matching {@code filterSQL}, ordered by RowId
*/
List<? extends ExpRun> getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate<ExpRun> filterFn, @NotNull Container container, int limit);

/**
* @param modifiedSince optional upper-exclusive Modified cutoff; pass {@code null} to return all batches
* @param minRowId keyset cursor — only batches with RowId &gt; minRowId are returned; pass 0 for the first page
* @param limit max rows to return
* @return up to {@code limit} assay batches for {@code batchProtocol} in {@code container} with
* RowId &gt; minRowId (and Modified &gt; modifiedSince when non-null), ordered by RowId
*/
List<? extends ExpExperiment> getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, @Nullable Date modifiedSince, long minRowId, int limit);

List<? extends ExpRun> getExpRunsForJobId(long jobId);

Expand Down
107 changes: 90 additions & 17 deletions api/src/org/labkey/api/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.jetbrains.annotations.Nullable;
import org.json.JSONObject;
import org.labkey.api.data.ColumnInfo;
import org.labkey.api.data.CompareType;
import org.labkey.api.data.Container;
import org.labkey.api.data.ContainerManager;
import org.labkey.api.data.DbSchema;
import org.labkey.api.data.SQLFragment;
import org.labkey.api.data.SimpleFilter;
import org.labkey.api.data.Sort;
import org.labkey.api.data.TableInfo;
import org.labkey.api.data.TableSelector;
import org.labkey.api.data.dialect.SqlDialect;
import org.labkey.api.mbean.SearchMXBean;
import org.labkey.api.query.ExprColumn;
Expand Down Expand Up @@ -67,6 +70,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;

public interface SearchService extends SearchMXBean
{
Expand All @@ -75,6 +79,7 @@ public interface SearchService extends SearchMXBean
Logger _log = LogHelper.getLogger(SearchService.class, "Full text search service");

long DEFAULT_FILE_SIZE_LIMIT = 100L; // 100 MB
int INDEXING_LIMIT = 1_000;

/**
* Returns the max file size indexed
Expand Down Expand Up @@ -148,7 +153,7 @@ enum PRIORITY
/** Intended for content that needed indexing or reindexing because it was modified or created */
modified,
/** Highest priority. Used for removing documents from the index, which is generally faster than adding */
delete;
delete
}


Expand Down Expand Up @@ -239,7 +244,7 @@ interface TaskIndexingQueue
//
// plug in interfaces
//

interface ResourceResolver
{
default WebdavResource resolve(@NotNull String resourceIdentifier) { return null; }
Expand Down Expand Up @@ -288,7 +293,7 @@ public String getName()
{
return _name;
}

public String getDescription()
{
return _description;
Expand Down Expand Up @@ -326,7 +331,7 @@ public boolean isShowInAdvancedSearch()
// search
//


class SearchResult
{
public long totalHits;
Expand Down Expand Up @@ -425,7 +430,7 @@ public String normalizeHref(Path contextPath, Container c)
@Nullable SearchHit find(String docId) throws IOException;

String escapeTerm(String term);

List<SearchCategory> getSearchCategories();

//
Expand Down Expand Up @@ -464,7 +469,7 @@ public String normalizeHref(Path contextPath, Container c)

void waitForIdle() throws InterruptedException;


/** default implementation saving lastIndexed */
void setLastIndexedForPath(Path path, long indexed, long modified);

Expand All @@ -475,9 +480,9 @@ public String normalizeHref(Path contextPath, Container c)
void maintenance();

//
// configuration, plugins
// configuration, plugins
//

void addSearchCategory(SearchCategory category);
List<SearchCategory> getAllCategories();
List<SearchCategory> getCategories(String categories);
Expand All @@ -494,12 +499,16 @@ public String normalizeHref(Path contextPath, Container c)
interface DocumentProvider
{
/**
* Enumerate documents for full-text search. Unless it's known there will be a small number of documents
* added to the queue, add Runnable to the IndexTask that adds the Resources from the container to the queue.
* If there are potentially many documents for a container, add resources in batches of 1,000 or so to avoid
* a huge memory footprint.
* Enumerate documents for full-text search indexing. Do NOT fetch an unbounded result set into memory.
*
* <p>Use the <em>recursive-requeue pattern</em>: create an {@link IndexBatchCursor} from {@code minRowId = 0},
* call {@link IndexBatchCursor#createSelector} (for {@code TableSelector}-based callers) or build a
* raw SQL query with {@code RowId > minRowId ORDER BY RowId LIMIT} {@link SearchService#INDEXING_LIMIT},
* process the batch with {@link IndexBatchCursor#forEach}, then call {@link IndexBatchCursor#wasFull()}
* and requeue only if it returns {@code true}. This keeps the ResultSet closed between batches and
* interleaves indexing with other queue work.</p>
*
* @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed)
* @param modifiedSince when null, do a full reindex; otherwise incremental (either modified &gt; modifiedSince, or modified &gt; lastIndexed)
*/
void enumerateDocuments(TaskIndexingQueue adder, @Nullable Date modifiedSince);

Expand Down Expand Up @@ -540,18 +549,81 @@ interface DocumentParser
boolean detect(WebdavResource resource, String contentType, byte[] buf) throws IOException;
void parse(InputStream stream, ContentHandler handler) throws IOException, SAXException;
}



/**
* Keyset-pagination cursor for the recursive-requeue batch-indexing pattern.
* Tracks position (max RowId seen) and batch fullness across one round of indexing.
* Intended for use in {@link DocumentProvider#enumerateDocuments} implementations.
*/
class IndexBatchCursor
{
private long _maxRowId;
private int _count;

public IndexBatchCursor(long minRowId)
{
_maxRowId = minRowId;
}

/** Records {@code rowId} as processed. Throws if results are not strictly ascending by RowId. */
public void advance(long rowId)
{
if (rowId <= _maxRowId)
throw new IllegalStateException("Expected results strictly ordered by RowId but got " + rowId + " after " + _maxRowId);
_maxRowId = rowId;
_count++;
}

/** Returns the maximum RowId seen so far, suitable for passing as {@code minRowId} to the next batch. */
public long getMaxRowId()
{
return _maxRowId;
}

/**
* Adds {@code RowId > current max} to {@code filter} and returns a {@link TableSelector} ordered by RowId
* and limited to {@link #INDEXING_LIMIT} rows. Callers should pass the result to {@link #forEach} and
* then call {@link #wasFull} to decide whether to requeue.
*/
public TableSelector createSelector(TableInfo tableInfo, SimpleFilter filter)
{
filter.addCondition(FieldKey.fromParts("RowId"), _maxRowId, CompareType.GT);
TableSelector selector = new TableSelector(tableInfo, filter, new Sort("RowId"));
selector.setMaxRows(INDEXING_LIMIT);
return selector;
}

/**
* Iterates {@code batch}, calling {@link #advance} with the item's RowId and then invoking
* {@code action} on each item. {@code advance} is called unconditionally before each action so the
* count stays accurate even when the action throws.
*/
public <T> void forEach(List<? extends T> batch, ToLongFunction<T> rowIdOf, Consumer<T> action)
{
batch.forEach(item -> {
advance(rowIdOf.applyAsLong(item));
action.accept(item);
});
}

/** Returns {@code true} if the batch was full, meaning more rows may remain and a requeue is needed. */
public boolean wasFull()
{
return _count == INDEXING_LIMIT;
}
}

// an interface that enumerates documents in a container (not recursive)
void addDocumentProvider(DocumentProvider provider);

void addDocumentParser(DocumentParser parser);


//
// helpers
//


/**
* filter for documents modified since the provided date
Expand Down Expand Up @@ -592,7 +664,7 @@ public LastIndexedClause(TableInfo info, java.util.Date modifiedSince, String mo

// Incremental if modifiedSince is set and is more recent than 1967-10-04
boolean incremental = modifiedSince != null && modifiedSince.compareTo(oldDate) > 0;

// no filter
if (!incremental)
return;
Expand Down Expand Up @@ -776,4 +848,5 @@ public SearchOptions build()
}
}
}

}
44 changes: 28 additions & 16 deletions assay/src/org/labkey/assay/AssayManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -710,33 +710,45 @@ public void indexAssayBatches(SearchService.TaskIndexingQueue queue, @Nullable D
for (ExpProtocol protocol : getAssayProtocols(queue.getContainer()))
{
if (shouldIndexProtocolBatches(protocol))
indexAssayBatches(queue, protocol, modifiedSince);
queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, 0));
}
}

private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince)
private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol,
@Nullable Date modifiedSince, long minRowId)
{
if (shouldIndexProtocolBatches(protocol))
{
for (ExpExperiment batch : protocol.getBatches(queue.getContainer()))
{
if (modifiedSince == null || modifiedSince.before(batch.getModified()))
indexAssayBatch(queue, batch);
}
}
List<? extends ExpExperiment> batches = ExperimentService.get().getExpBatches(
queue.getContainer(), protocol, modifiedSince, minRowId, SearchService.INDEXING_LIMIT);

SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId);
tracker.forEach(batches, ExpExperiment::getRowId, b -> indexAssayBatch(queue, b));

if (tracker.wasFull())
queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, tracker.getMaxRowId()));
}

public void indexAssayRuns(SearchService.TaskIndexingQueue queue, @Nullable Date modifiedSince)
{
for (ExpProtocol protocol : getAssayProtocols(queue.getContainer()))
indexAssayRuns(queue, protocol, modifiedSince);
queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, 0));
}

private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince)
private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol,
@Nullable Date modifiedSince, long minRowId)
{
ExperimentService.get().getExpRuns(queue.getContainer(), protocol, null, run ->
modifiedSince == null || modifiedSince.before(run.getModified())
).forEach(r -> indexAssayRun(queue, r));
SQLFragment filterSQL = new SQLFragment("ER.ProtocolLSID = ? AND ER.RowId > ?")
.add(protocol.getLSID())
.add(minRowId);
if (modifiedSince != null)
filterSQL.append(" AND ER.Modified > ?").add(modifiedSince);

List<? extends ExpRun> runs = ExperimentService.get().getExpRuns(filterSQL, _ -> true, queue.getContainer(), SearchService.INDEXING_LIMIT);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this uses ORDER BY RowId, but this contract isn't very explicit. I think maxRowIdProcessed is implicit final, I like the readability of making it explicit.


SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId);
tracker.forEach(runs, ExpRun::getRowId, r -> indexAssayRun(queue, r));

if (tracker.wasFull())
queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, tracker.getMaxRowId()));
}

@Override
Expand All @@ -745,7 +757,7 @@ public void indexAssayRun(SearchService.TaskIndexingQueue queue, long expRunRowI
ExpRun expRun = ExperimentService.get().getExpRun(expRunRowId);
if (expRun == null)
return;

if (shouldIndexRun(expRun))
indexAssayRun(queue, ExperimentService.get().getExpRun(expRunRowId));
}
Expand Down
Loading
Loading