Skip to content

Commit 4103e44

Browse files
committed
MLE-26918 Added fromView support for incremental write
The plan is to dump eval support and just offer fromLexicons and fromView, but need to test out fromView a bit first.
1 parent 06d2f80 commit 4103e44

File tree

4 files changed

+171
-0
lines changed

4 files changed

+171
-0
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public static class Builder {
5353
private String[] jsonExclusions;
5454
private String[] xmlExclusions;
5555
private Map<String, String> xmlNamespaces;
56+
private String schemaName;
57+
private String viewName;
5658

5759
/**
5860
* @param keyName the name of the MarkLogic metadata key that will hold the hash value; defaults to "incrementalWriteHash".
@@ -128,9 +130,36 @@ public Builder xmlNamespaces(Map<String, String> namespaces) {
128130
return this;
129131
}
130132

133+
/**
134+
* Configures the filter to use a TDE view for retrieving hash values instead of field range indexes.
135+
* This approach requires a TDE template to be deployed that extracts the URI and hash metadata.
136+
*
137+
* @param schemaName the schema name of the TDE view
138+
* @param viewName the view name of the TDE view
139+
* @return this builder
140+
*/
141+
public Builder fromView(String schemaName, String viewName) {
142+
boolean schemaEmpty = schemaName == null || schemaName.trim().isEmpty();
143+
boolean viewEmpty = viewName == null || viewName.trim().isEmpty();
144+
145+
if (schemaEmpty && !viewEmpty) {
146+
throw new IllegalArgumentException("Schema name cannot be null or empty when view name is provided");
147+
}
148+
if (!schemaEmpty && viewEmpty) {
149+
throw new IllegalArgumentException("View name cannot be null or empty when schema name is provided");
150+
}
151+
152+
this.schemaName = schemaName;
153+
this.viewName = viewName;
154+
return this;
155+
}
156+
131157
public IncrementalWriteFilter build() {
132158
validateJsonExclusions();
133159
validateXmlExclusions();
160+
if (schemaName != null && viewName != null) {
161+
return new IncrementalWriteViewFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces, schemaName, viewName);
162+
}
134163
if (useEvalQuery) {
135164
return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces);
136165
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.FailedRequestException;
7+
import com.marklogic.client.document.DocumentWriteOperation;
8+
import com.marklogic.client.document.DocumentWriteSet;
9+
import com.marklogic.client.row.RowTemplate;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import java.util.function.Consumer;
14+
15+
/**
16+
* Uses an Optic query with fromView to get the existing hash values for a set of URIs from a TDE view.
17+
* This implementation requires a TDE template to be deployed that extracts the URI and hash metadata.
18+
*
19+
* @since 8.1.0
20+
*/
21+
class IncrementalWriteViewFilter extends IncrementalWriteFilter {
22+
23+
private final String schemaName;
24+
private final String viewName;
25+
26+
IncrementalWriteViewFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
27+
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions,
28+
String[] xmlExclusions, Map<String, String> xmlNamespaces,
29+
String schemaName, String viewName) {
30+
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces);
31+
this.schemaName = schemaName;
32+
this.viewName = viewName;
33+
}
34+
35+
@Override
36+
public DocumentWriteSet apply(Context context) {
37+
final String[] uris = context.getDocumentWriteSet().stream()
38+
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
39+
.map(DocumentWriteOperation::getUri)
40+
.toArray(String[]::new);
41+
42+
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
43+
44+
try {
45+
Map<String, String> existingHashes = rowTemplate.query(op ->
46+
op.fromView(schemaName, viewName)
47+
.where(op.in(op.col("uri"), op.xs.stringSeq(uris))),
48+
49+
rows -> {
50+
Map<String, String> map = new HashMap<>();
51+
rows.forEach(row -> {
52+
String uri = row.getString("uri");
53+
String existingHash = row.getString("hash");
54+
map.put(uri, existingHash);
55+
});
56+
return map;
57+
}
58+
);
59+
60+
return filterDocuments(context, uri -> existingHashes.get(uri));
61+
} catch (FailedRequestException e) {
62+
String message = "Unable to query for existing incremental write hashes from view " + schemaName + "." + viewName + "; cause: " + e.getMessage();
63+
throw new FailedRequestException(message, e.getFailedRequest());
64+
}
65+
}
66+
}

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,57 @@ void binaryDocument() {
228228
"expected. Exclusions cannot be specified for them.");
229229
}
230230

231+
@Test
232+
void fromView() {
233+
filter = IncrementalWriteFilter.newBuilder()
234+
.fromView("javaClient", "incrementalWriteHash")
235+
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
236+
.build();
237+
238+
verifyIncrementalWriteWorks();
239+
}
240+
241+
@Test
242+
void emptyValuesForFromView() {
243+
filter = IncrementalWriteFilter.newBuilder()
244+
// Empty/null values are ignored, as long as both schema/view are empty/null. This makes life a little
245+
// easier for a connector in that the connector does not need to check for empty/null values.
246+
.fromView("", null)
247+
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
248+
.build();
249+
250+
verifyIncrementalWriteWorks();
251+
}
252+
253+
@Test
254+
void invalidSchemaArg() {
255+
IncrementalWriteFilter.Builder builder = IncrementalWriteFilter.newBuilder();
256+
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder.fromView(null, "theView"));
257+
assertEquals("Schema name cannot be null or empty when view name is provided", ex.getMessage());
258+
}
259+
260+
@Test
261+
void invalidViewArg() {
262+
IncrementalWriteFilter.Builder builder = IncrementalWriteFilter.newBuilder();
263+
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder.fromView("javaClient", null));
264+
assertEquals("View name cannot be null or empty when schema name is provided", ex.getMessage());
265+
}
266+
267+
@Test
268+
void invalidView() {
269+
filter = IncrementalWriteFilter.newBuilder()
270+
.fromView("javaClient", "this-view-doesnt-exist")
271+
.build();
272+
273+
writeTenDocuments();
274+
275+
assertNotNull(batchFailure.get());
276+
String message = batchFailure.get().getMessage();
277+
assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("SQL-TABLENOTFOUND"),
278+
"When the user tries to use the incremental write feature with an invalid view, " +
279+
"we should fail with a helpful error message. Actual message: " + message);
280+
}
281+
231282
private void verifyIncrementalWriteWorks() {
232283
writeTenDocuments();
233284
verifyDocumentsHasHashInMetadataKey();
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"template": {
3+
"description": "For incremental write that uses op.fromView instead of op.fromLexicons",
4+
"context": "/doc",
5+
"rows": [
6+
{
7+
"schemaName": "javaClient",
8+
"viewName": "incrementalWriteHash",
9+
"columns": [
10+
{
11+
"name": "uri",
12+
"scalarType": "string",
13+
"val": "xdmp:node-uri(.)"
14+
},
15+
{
16+
"name": "hash",
17+
"scalarType": "string",
18+
"val": "xdmp:node-metadata-value(., 'incrementalWriteHash')",
19+
"nullable": true
20+
}
21+
]
22+
}
23+
]
24+
}
25+
}

0 commit comments

Comments
 (0)