diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index bbfa508e..11498ba7 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1881,18 +1881,131 @@ void testDeleteAllReturnsFalseOnSQLException() throws Exception { class UpdateTests { @Test - @DisplayName("Should throw UnsupportedOperationException for update with condition") - void testUpdateWithCondition() { - Key key = new SingleValueKey("default", "1"); + @DisplayName("Should update document when condition matches (optimistic locking)") + void testUpdateWithConditionMatches() throws Exception { + // Use key "1" directly to match test data (not SingleValueKey which adds tenant prefix) + Key key = + new Key() { + @Override + public String toString() { + return "1"; + } + }; ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); - updatedNode.put("_id", 1); - updatedNode.put("item", "Soap"); + updatedNode.put("item", "UpdatedSoap"); + updatedNode.put("price", 999); Document document = new JSONDocument(updatedNode); - Filter condition = Filter.eq("price", 10); - assertThrows( - UnsupportedOperationException.class, - () -> flatCollection.update(key, document, condition)); + // Condition: price = 10 (matches the existing document) + Filter condition = new Filter(Filter.Op.EQ, "price", 10); + + UpdateResult result = flatCollection.update(key, document, condition); + + assertEquals(1, result.getUpdatedCount(), "Should update 1 document when condition matches"); + + // Verify the update in database using direct JDBC + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"item\", \"price\" FROM \"%s\" WHERE \"id\" = '1'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("UpdatedSoap", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + } + } + + @Test + @DisplayName("Should not update document when condition does not match (optimistic locking)") + void testUpdateWithConditionNotMatches() throws Exception { + Key key = + new Key() { + @Override + public String toString() { + return "1"; + } + }; + ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); + updatedNode.put("item", "ShouldNotUpdate"); + updatedNode.put("price", 888); + Document document = new JSONDocument(updatedNode); + + // Condition: price = 999 (does NOT match the existing document which has price=10) + Filter condition = new Filter(Filter.Op.EQ, "price", 999); + + UpdateResult result = flatCollection.update(key, document, condition); + + assertEquals(0, result.getUpdatedCount(), "Should not update when condition doesn't match"); + + // Verify document is unchanged using direct JDBC + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"item\", \"price\" FROM \"%s\" WHERE \"id\" = '1'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Soap", rs.getString("item"), "Item should remain unchanged"); + assertEquals(10, rs.getInt("price"), "Price should remain unchanged"); + } + } + + @Test + @DisplayName("Should update document without condition") + void testUpdateWithoutCondition() throws Exception { + Key key = + new Key() { + @Override + public String toString() { + return "1"; + } + }; + ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); + updatedNode.put("item", "NoConditionUpdate"); + updatedNode.put("price", 777); + Document document = new JSONDocument(updatedNode); + + UpdateResult result = flatCollection.update(key, document, null); + + assertEquals(1, result.getUpdatedCount(), "Should update 1 document without condition"); + + // Verify the update using direct JDBC + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"item\", \"price\" FROM \"%s\" WHERE \"id\" = '1'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("NoConditionUpdate", rs.getString("item")); + assertEquals(777, rs.getInt("price")); + } + } + + @Test + @DisplayName("Should return 0 when updating non-existent key") + void testUpdateNonExistentKey() throws Exception { + Key key = + new Key() { + @Override + public String toString() { + return "non-existent-key"; + } + }; + ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); + updatedNode.put("item", "Ghost"); + Document document = new JSONDocument(updatedNode); + + UpdateResult result = flatCollection.update(key, document, null); + + assertEquals(0, result.getUpdatedCount(), "Should return 0 for non-existent key"); } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 007776f9..ffc54f39 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -495,7 +495,152 @@ public BulkUpdateResult bulkUpdate(List bulkUpdateRequests) { @Override public UpdateResult update(Key key, Document document, Filter condition) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + return updateWithRetry(key, document, condition, false); + } + + private UpdateResult updateWithRetry( + Key key, Document document, Filter condition, boolean isRetry) throws IOException { + String tableName = tableIdentifier.getTableName(); + List skippedFields = new ArrayList<>(); + + try { + TypedDocument parsed = parseDocument(document, tableName, skippedFields); + + // If no fields to update (all skipped), return 0 + if (parsed.getColumns().isEmpty()) { + LOGGER.warn("No valid columns to update for key: {}", key); + return new UpdateResult(0); + } + + // Build UPDATE SQL + String pkColumn = getPKForTable(tableName); + String sql = buildUpdateSql(parsed.getColumns(), pkColumn, condition, tableName); + LOGGER.debug("Update SQL: {}", sql); + + int result = executeConditionalUpdate(sql, parsed, key, condition, tableName, pkColumn); + LOGGER.debug("Update result for key {}: {}", key, result); + return new UpdateResult(result); + + } catch (PSQLException e) { + if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) { + LOGGER.info( + "Schema mismatch detected during update (SQLState: {}), refreshing schema and retrying. key: {}", + e.getSQLState(), + key); + schemaRegistry.invalidate(tableName); + return updateWithRetry(key, document, condition, true); + } + LOGGER.error("SQLException updating document. key: {} content: {}", key, document, e); + throw new IOException(e); + } catch (SQLException e) { + LOGGER.error("SQLException updating document. key: {} content: {}", key, document, e); + throw new IOException(e); + } + } + + /** + * Builds an UPDATE SQL with SET clause and WHERE clause including key and optional condition. + * + *

Generated SQL pattern: + * + *

{@code
+   * UPDATE table SET col1 = ?, col2 = ? WHERE pk_col = ? [AND ]
+   * }
+ */ + private String buildUpdateSql( + List columns, String pkColumn, Filter condition, String tableName) { + + String setClause = columns.stream().map(col -> col + " = ?").collect(Collectors.joining(", ")); + + StringBuilder sql = new StringBuilder(); + sql.append("UPDATE ") + .append(tableIdentifier) + .append(" SET ") + .append(setClause) + .append(" WHERE ") + .append(PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn)) + .append(" = ?"); + + // Add condition clause if present + if (condition != null) { + LegacyFilterToQueryFilterTransformer transformer = + new LegacyFilterToQueryFilterTransformer(schemaRegistry, tableName); + org.hypertrace.core.documentstore.query.Filter queryFilter = transformer.transform(condition); + + if (queryFilter != null) { + Query filterQuery = Query.builder().setFilter(queryFilter).build(); + PostgresQueryParser filterParser = + new PostgresQueryParser( + tableIdentifier, filterQuery, new FlatPostgresFieldTransformer()); + String filterClause = filterParser.buildFilterClause(); + // filterClause starts with "WHERE ", we need to append as "AND ..." + if (filterClause != null && !filterClause.isEmpty()) { + // Remove "WHERE " prefix and append as AND because the WHERE clause was already added + // before in the SQL + String conditionOnly = filterClause.replaceFirst("^\\s*WHERE\\s+", ""); + if (!conditionOnly.isEmpty()) { + sql.append(" AND ").append(conditionOnly); + } + } + } + } + + return sql.toString(); + } + + private int executeConditionalUpdate( + String sql, + TypedDocument parsed, + Key key, + Filter condition, + String tableName, + String pkColumn) + throws SQLException { + + try (Connection conn = client.getPooledConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + int index = 1; + + // Set column values (SET clause) + for (String column : parsed.getColumns()) { + setParameter( + conn, + ps, + index++, + parsed.getValue(column), + parsed.getType(column), + parsed.isArray(column)); + } + + // Set key parameter (WHERE pk = ?) + PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); + setParameter(conn, ps, index++, key.toString(), pkType, false); + + // Set condition parameters if present + if (condition != null) { + LegacyFilterToQueryFilterTransformer transformer = + new LegacyFilterToQueryFilterTransformer(schemaRegistry, tableName); + org.hypertrace.core.documentstore.query.Filter queryFilter = + transformer.transform(condition); + + if (queryFilter != null) { + Query filterQuery = Query.builder().setFilter(queryFilter).build(); + PostgresQueryParser filterParser = + new PostgresQueryParser( + tableIdentifier, filterQuery, new FlatPostgresFieldTransformer()); + // Build filter to populate params + filterParser.buildFilterClause(); + Params filterParams = filterParser.getParamsBuilder().build(); + + for (Object param : filterParams.getObjectParams().values()) { + ps.setObject(index++, param); + } + } + } + + return ps.executeUpdate(); + } } @Override diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index 6ff30b64..ee40f134 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -23,6 +23,7 @@ import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; import org.hypertrace.core.documentstore.Key; +import org.hypertrace.core.documentstore.UpdateResult; import org.hypertrace.core.documentstore.expression.impl.DataType; import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; @@ -96,14 +97,12 @@ private PSQLException createPSQLException(PSQLState state) { } private void setupCommonMocks(Map schema) throws SQLException { - // Mock getColumnOrRefresh for each field in the document when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) .thenAnswer( invocation -> { String columnName = invocation.getArgument(1); return Optional.ofNullable(schema.get(columnName)); }); - // Mock getPrimaryKeyColumn when(mockSchemaRegistry.getPrimaryKeyColumn(COLLECTION_NAME)).thenReturn(Optional.of("id")); when(mockClient.getPooledConnection()).thenReturn(mockConnection); when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); @@ -128,7 +127,6 @@ void testCreateOrReplaceRetriesOnUndefinedColumn() throws Exception { Map schema = createBasicSchema(); setupCreateOrReplaceMocks(schema); - // First call throws UNDEFINED_COLUMN, second call succeeds PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); @@ -152,7 +150,6 @@ void testCreateOrReplaceRetriesOnDatatypeMismatch() throws Exception { Map schema = createBasicSchema(); setupCreateOrReplaceMocks(schema); - // First call throws DATATYPE_MISMATCH, second call succeeds PSQLException psqlException = createPSQLException(PSQLState.DATATYPE_MISMATCH); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); @@ -176,7 +173,6 @@ void testCreateOrReplaceThrowsOnNonRetryablePSQLException() throws Exception { Map schema = createBasicSchema(); setupCreateOrReplaceMocks(schema); - // Throw a non-retryable PSQLException (e.g., UNIQUE_VIOLATION) PSQLException psqlException = createPSQLException(PSQLState.UNIQUE_VIOLATION); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); @@ -197,7 +193,6 @@ void testCreateOrReplaceThrowsOnSQLException() throws Exception { Map schema = createBasicSchema(); setupCreateOrReplaceMocks(schema); - // Throw a generic SQLException (not PSQLException) SQLException sqlException = new SQLException("Connection lost"); when(mockPreparedStatement.executeQuery()).thenThrow(sqlException); @@ -218,7 +213,6 @@ void testCreateOrReplaceThrowsWhenRetryFails() throws Exception { Map schema = createBasicSchema(); setupCreateOrReplaceMocks(schema); - // Both calls throw UNDEFINED_COLUMN - first triggers retry, second should throw PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); @@ -247,7 +241,6 @@ void testUpsertRetriesOnUndefinedColumn() throws Exception { Map schema = createBasicSchema(); setupCommonMocks(schema); - // First call throws UNDEFINED_COLUMN, second call succeeds PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); @@ -271,7 +264,6 @@ void testUpsertRetriesOnDatatypeMismatch() throws Exception { Map schema = createBasicSchema(); setupCommonMocks(schema); - // First call throws DATATYPE_MISMATCH, second call succeeds PSQLException psqlException = createPSQLException(PSQLState.DATATYPE_MISMATCH); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); @@ -295,7 +287,6 @@ void testUpsertThrowsOnNonRetryablePSQLException() throws Exception { Map schema = createBasicSchema(); setupCommonMocks(schema); - // Throw a non-retryable PSQLException PSQLException psqlException = createPSQLException(PSQLState.UNIQUE_VIOLATION); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); @@ -315,7 +306,6 @@ void testUpsertThrowsOnSQLException() throws Exception { Map schema = createBasicSchema(); setupCommonMocks(schema); - // Throw a generic SQLException (not PSQLException) SQLException sqlException = new SQLException("Connection lost"); when(mockPreparedStatement.executeQuery()).thenThrow(sqlException); @@ -335,7 +325,6 @@ void testUpsertThrowsWhenRetryFails() throws Exception { Map schema = createBasicSchema(); setupCommonMocks(schema); - // Both calls throw UNDEFINED_COLUMN - first triggers retry, second should throw PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); @@ -349,4 +338,128 @@ void testUpsertThrowsWhenRetryFails() throws Exception { verify(mockPreparedStatement, times(2)).executeQuery(); } } + + @Nested + @DisplayName("update Exception Handling Tests") + class UpdateExceptionTests { + + @Test + @DisplayName("Should return UpdateResult(0) when all fields are unknown/skipped (L511-512)") + void testUpdateReturnsZeroWhenAllFieldsSkipped() throws Exception { + Key key = Key.from("test-key"); + Document document = + new JSONDocument("{\"unknown_field1\": \"value\", \"unknown_field2\": 123}"); + + Map schema = createBasicSchema(); + when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) + .thenReturn(Optional.empty()); + + UpdateResult result = flatPostgresCollection.update(key, document, null); + + assertEquals(0, result.getUpdatedCount()); + verify(mockClient, never()).getPooledConnection(); + } + + @Test + @DisplayName("Should retry on UNDEFINED_COLUMN PSQLException and succeed") + void testUpdateRetriesOnUndefinedColumn() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); + when(mockPreparedStatement.executeUpdate()).thenThrow(psqlException).thenReturn(1); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + UpdateResult result = flatPostgresCollection.update(key, document, null); + + assertEquals(1, result.getUpdatedCount()); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeUpdate(); + } + + @Test + @DisplayName("Should retry on DATATYPE_MISMATCH PSQLException and succeed") + void testUpdateRetriesOnDatatypeMismatch() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + PSQLException psqlException = createPSQLException(PSQLState.DATATYPE_MISMATCH); + when(mockPreparedStatement.executeUpdate()).thenThrow(psqlException).thenReturn(1); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + UpdateResult result = flatPostgresCollection.update(key, document, null); + + assertEquals(1, result.getUpdatedCount()); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeUpdate(); + } + + @Test + @DisplayName("Should throw IOException on non-retryable PSQLException") + void testUpdateThrowsOnNonRetryablePSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + PSQLException psqlException = createPSQLException(PSQLState.UNIQUE_VIOLATION); + when(mockPreparedStatement.executeUpdate()).thenThrow(psqlException); + + IOException thrown = + assertThrows(IOException.class, () -> flatPostgresCollection.update(key, document, null)); + + assertEquals(psqlException, thrown.getCause()); + verify(mockSchemaRegistry, never()).invalidate(anyString()); + } + + @Test + @DisplayName("Should throw IOException on generic SQLException (L535-537)") + void testUpdateThrowsOnSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + SQLException sqlException = new SQLException("Connection lost"); + when(mockPreparedStatement.executeUpdate()).thenThrow(sqlException); + + IOException thrown = + assertThrows(IOException.class, () -> flatPostgresCollection.update(key, document, null)); + + assertEquals(sqlException, thrown.getCause()); + verify(mockSchemaRegistry, never()).invalidate(anyString()); + } + + @Test + @DisplayName("Should throw IOException when retry also fails (L533-534)") + void testUpdateThrowsWhenRetryFails() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); + when(mockPreparedStatement.executeUpdate()).thenThrow(psqlException); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + IOException thrown = + assertThrows(IOException.class, () -> flatPostgresCollection.update(key, document, null)); + + assertEquals(psqlException, thrown.getCause()); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeUpdate(); + } + } }