Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -1881,18 +1881,131 @@ void testDeleteAllReturnsFalseOnSQLException() throws Exception {
class UpdateTests {

@Test
@DisplayName("Should throw UnsupportedOperationException for update with condition")
void testUpdateWithCondition() {
Key key = new SingleValueKey("default", "1");
@DisplayName("Should update document when condition matches (optimistic locking)")
void testUpdateWithConditionMatches() throws Exception {
// Use key "1" directly to match test data (not SingleValueKey which adds tenant prefix)
Key key =
new Key() {
@Override
public String toString() {
return "1";
}
};
ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode();
updatedNode.put("_id", 1);
updatedNode.put("item", "Soap");
updatedNode.put("item", "UpdatedSoap");
updatedNode.put("price", 999);
Document document = new JSONDocument(updatedNode);
Filter condition = Filter.eq("price", 10);

assertThrows(
UnsupportedOperationException.class,
() -> flatCollection.update(key, document, condition));
// Condition: price = 10 (matches the existing document)
Filter condition = new Filter(Filter.Op.EQ, "price", 10);

UpdateResult result = flatCollection.update(key, document, condition);

assertEquals(1, result.getUpdatedCount(), "Should update 1 document when condition matches");

// Verify the update in database using direct JDBC
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"item\", \"price\" FROM \"%s\" WHERE \"id\" = '1'",
FLAT_COLLECTION_NAME));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());
assertEquals("UpdatedSoap", rs.getString("item"));
assertEquals(999, rs.getInt("price"));
}
}

@Test
@DisplayName("Should not update document when condition does not match (optimistic locking)")
void testUpdateWithConditionNotMatches() throws Exception {
Key key =
new Key() {
@Override
public String toString() {
return "1";
}
};
ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode();
updatedNode.put("item", "ShouldNotUpdate");
updatedNode.put("price", 888);
Document document = new JSONDocument(updatedNode);

// Condition: price = 999 (does NOT match the existing document which has price=10)
Filter condition = new Filter(Filter.Op.EQ, "price", 999);

UpdateResult result = flatCollection.update(key, document, condition);

assertEquals(0, result.getUpdatedCount(), "Should not update when condition doesn't match");

// Verify document is unchanged using direct JDBC
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"item\", \"price\" FROM \"%s\" WHERE \"id\" = '1'",
FLAT_COLLECTION_NAME));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());
assertEquals("Soap", rs.getString("item"), "Item should remain unchanged");
assertEquals(10, rs.getInt("price"), "Price should remain unchanged");
}
}

@Test
@DisplayName("Should update document without condition")
void testUpdateWithoutCondition() throws Exception {
Key key =
new Key() {
@Override
public String toString() {
return "1";
}
};
ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode();
updatedNode.put("item", "NoConditionUpdate");
updatedNode.put("price", 777);
Document document = new JSONDocument(updatedNode);

UpdateResult result = flatCollection.update(key, document, null);

assertEquals(1, result.getUpdatedCount(), "Should update 1 document without condition");

// Verify the update using direct JDBC
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"item\", \"price\" FROM \"%s\" WHERE \"id\" = '1'",
FLAT_COLLECTION_NAME));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());
assertEquals("NoConditionUpdate", rs.getString("item"));
assertEquals(777, rs.getInt("price"));
}
}

@Test
@DisplayName("Should return 0 when updating non-existent key")
void testUpdateNonExistentKey() throws Exception {
Key key =
new Key() {
@Override
public String toString() {
return "non-existent-key";
}
};
ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode();
updatedNode.put("item", "Ghost");
Document document = new JSONDocument(updatedNode);

UpdateResult result = flatCollection.update(key, document, null);

assertEquals(0, result.getUpdatedCount(), "Should return 0 for non-existent key");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,152 @@ public BulkUpdateResult bulkUpdate(List<BulkUpdateRequest> bulkUpdateRequests) {

@Override
public UpdateResult update(Key key, Document document, Filter condition) throws IOException {
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
return updateWithRetry(key, document, condition, false);
}

private UpdateResult updateWithRetry(
Key key, Document document, Filter condition, boolean isRetry) throws IOException {
String tableName = tableIdentifier.getTableName();
List<String> skippedFields = new ArrayList<>();

try {
TypedDocument parsed = parseDocument(document, tableName, skippedFields);

// If no fields to update (all skipped), return 0
if (parsed.getColumns().isEmpty()) {
LOGGER.warn("No valid columns to update for key: {}", key);
return new UpdateResult(0);
}

// Build UPDATE SQL
String pkColumn = getPKForTable(tableName);
String sql = buildUpdateSql(parsed.getColumns(), pkColumn, condition, tableName);
LOGGER.debug("Update SQL: {}", sql);

int result = executeConditionalUpdate(sql, parsed, key, condition, tableName, pkColumn);
LOGGER.debug("Update result for key {}: {}", key, result);
return new UpdateResult(result);

} catch (PSQLException e) {
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
LOGGER.info(
"Schema mismatch detected during update (SQLState: {}), refreshing schema and retrying. key: {}",
e.getSQLState(),
key);
schemaRegistry.invalidate(tableName);
return updateWithRetry(key, document, condition, true);
}
LOGGER.error("SQLException updating document. key: {} content: {}", key, document, e);
throw new IOException(e);
} catch (SQLException e) {
LOGGER.error("SQLException updating document. key: {} content: {}", key, document, e);
throw new IOException(e);
}
}

/**
* Builds an UPDATE SQL with SET clause and WHERE clause including key and optional condition.
*
* <p>Generated SQL pattern:
*
* <pre>{@code
* UPDATE table SET col1 = ?, col2 = ? WHERE pk_col = ? [AND <condition>]
* }</pre>
*/
private String buildUpdateSql(
List<String> columns, String pkColumn, Filter condition, String tableName) {

String setClause = columns.stream().map(col -> col + " = ?").collect(Collectors.joining(", "));

StringBuilder sql = new StringBuilder();
sql.append("UPDATE ")
.append(tableIdentifier)
.append(" SET ")
.append(setClause)
.append(" WHERE ")
.append(PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn))
.append(" = ?");

// Add condition clause if present
if (condition != null) {
LegacyFilterToQueryFilterTransformer transformer =
new LegacyFilterToQueryFilterTransformer(schemaRegistry, tableName);
org.hypertrace.core.documentstore.query.Filter queryFilter = transformer.transform(condition);

if (queryFilter != null) {
Query filterQuery = Query.builder().setFilter(queryFilter).build();
PostgresQueryParser filterParser =
new PostgresQueryParser(
tableIdentifier, filterQuery, new FlatPostgresFieldTransformer());
String filterClause = filterParser.buildFilterClause();
// filterClause starts with "WHERE ", we need to append as "AND ..."
if (filterClause != null && !filterClause.isEmpty()) {
// Remove "WHERE " prefix and append as AND because the WHERE clause was already added
// before in the SQL
String conditionOnly = filterClause.replaceFirst("^\\s*WHERE\\s+", "");
if (!conditionOnly.isEmpty()) {
sql.append(" AND ").append(conditionOnly);
}
}
}
}

return sql.toString();
}

private int executeConditionalUpdate(
String sql,
TypedDocument parsed,
Key key,
Filter condition,
String tableName,
String pkColumn)
throws SQLException {

try (Connection conn = client.getPooledConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {

int index = 1;

// Set column values (SET clause)
for (String column : parsed.getColumns()) {
setParameter(
conn,
ps,
index++,
parsed.getValue(column),
parsed.getType(column),
parsed.isArray(column));
}

// Set key parameter (WHERE pk = ?)
PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn);
setParameter(conn, ps, index++, key.toString(), pkType, false);

// Set condition parameters if present
if (condition != null) {
LegacyFilterToQueryFilterTransformer transformer =
new LegacyFilterToQueryFilterTransformer(schemaRegistry, tableName);
org.hypertrace.core.documentstore.query.Filter queryFilter =
transformer.transform(condition);

if (queryFilter != null) {
Query filterQuery = Query.builder().setFilter(queryFilter).build();
PostgresQueryParser filterParser =
new PostgresQueryParser(
tableIdentifier, filterQuery, new FlatPostgresFieldTransformer());
// Build filter to populate params
filterParser.buildFilterClause();
Params filterParams = filterParser.getParamsBuilder().build();

for (Object param : filterParams.getObjectParams().values()) {
ps.setObject(index++, param);
}
}
}

return ps.executeUpdate();
}
}

@Override
Expand Down
Loading
Loading