diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 1595799c..cbe680ce 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1,7 +1,11 @@ package org.hypertrace.core.documentstore; import static org.hypertrace.core.documentstore.utils.Utils.readFileFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -10,10 +14,13 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.postgres.PostgresDatastore; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -93,7 +100,11 @@ private static void createFlatCollectionSchema() { + "\"sales\" JSONB," + "\"numbers\" INTEGER[]," + "\"scores\" DOUBLE PRECISION[]," - + "\"flags\" BOOLEAN[]" + + "\"flags\" BOOLEAN[]," + + "\"big_number\" BIGINT," + + "\"rating\" REAL," + + "\"created_date\" DATE," + + "\"weight\" DOUBLE PRECISION" + ");", FLAT_COLLECTION_NAME); @@ -200,42 +211,484 @@ void testUpsertAndReturn() { class CreateTests { @Test - @DisplayName("Should throw UnsupportedOperationException for create") - void testCreate() { + @DisplayName("Should create a new document with all field types") + void testCreateNewDocument() throws Exception { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("_id", 300); + objectNode.put("id", "new-doc-100"); objectNode.put("item", "Brand New Item"); + objectNode.put("price", 999); + objectNode.put("quantity", 50); + objectNode.put("in_stock", true); + objectNode.set("tags", OBJECT_MAPPER.createArrayNode().add("electronics").add("sale")); + + // Add JSONB field + ObjectNode propsNode = OBJECT_MAPPER.createObjectNode(); + propsNode.put("color", "blue"); + propsNode.put("weight", 2.5); + propsNode.put("warranty", true); + objectNode.set("props", propsNode); + Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "300"); + Key key = new SingleValueKey("default", "new-doc-100"); + + CreateResult result = flatCollection.create(key, document); + + assertTrue(result.isSucceed()); + + // Verify the data was inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = 'new-doc-100'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Brand New Item", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertEquals(50, rs.getInt("quantity")); + assertTrue(rs.getBoolean("in_stock")); + + // Verify tags array + java.sql.Array tagsArray = rs.getArray("tags"); + assertNotNull(tagsArray); + String[] tags = (String[]) tagsArray.getArray(); + assertEquals(2, tags.length); + assertEquals("electronics", tags[0]); + assertEquals("sale", tags[1]); + + // Verify JSONB props + String propsJson = rs.getString("props"); + assertNotNull(propsJson); + JsonNode propsResult = OBJECT_MAPPER.readTree(propsJson); + assertEquals("blue", propsResult.get("color").asText()); + assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); + assertTrue(propsResult.get("warranty").asBoolean()); + } + } - assertThrows(UnsupportedOperationException.class, () -> flatCollection.create(key, document)); + @Test + @DisplayName("Should throw DuplicateDocumentException when creating with existing key") + void testCreateDuplicateDocument() throws Exception { + // First create succeeds + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "dup-doc-200"); + objectNode.put("item", "First Item"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "dup-doc-200"); + + flatCollection.create(key, document); + + // Second create with same key should fail + ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); + objectNode2.put("id", "dup-doc-200"); + objectNode2.put("item", "Second Item"); + Document document2 = new JSONDocument(objectNode2); + + assertThrows(DuplicateDocumentException.class, () -> flatCollection.create(key, document2)); } @Test - @DisplayName("Should throw UnsupportedOperationException for createOrReplace") - void testCreateOrReplace() { + @DisplayName("Should create document with JSONB field") + void testCreateWithJsonbField() throws Exception { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("_id", 200); - objectNode.put("item", "NewMirror"); + objectNode.put("id", "jsonb-doc-300"); + objectNode.put("item", "Item with Props"); + ObjectNode propsNode = OBJECT_MAPPER.createObjectNode(); + propsNode.put("color", "blue"); + propsNode.put("size", "large"); + objectNode.set("props", propsNode); Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "200"); + Key key = new SingleValueKey("default", "jsonb-doc-300"); + + CreateResult result = flatCollection.create(key, document); + + assertTrue(result.isSucceed()); + + // Verify JSONB data + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT props->>'color' as color FROM \"%s\" WHERE \"id\" = 'jsonb-doc-300'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("blue", rs.getString("color")); + } + } - assertThrows( - UnsupportedOperationException.class, () -> flatCollection.createOrReplace(key, document)); + @Test + @DisplayName("Should skip unknown fields and insert known fields") + void testCreateSkipsUnknownFields() throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "unknown-field-doc-400"); + objectNode.put("item", "Item"); + objectNode.put("unknown_column", "should be skipped"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "unknown-field-doc-400"); + + CreateResult result = flatCollection.create(key, document); + + assertTrue(result.isSucceed()); + + // Verify only known columns were inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = 'unknown-field-doc-400'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Item", rs.getString("item")); + } } @Test - @DisplayName("Should throw UnsupportedOperationException for createOrReplaceAndReturn") - void testCreateOrReplaceAndReturn() { + @DisplayName("Should return skipped fields in CreateResult when columns are missing") + void testCreateReturnsSkippedFieldsInResult() throws Exception { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("_id", 200); - objectNode.put("item", "NewMirror"); + objectNode.put("id", "skipped-fields-doc-500"); + objectNode.put("item", "Valid Item"); + objectNode.put("price", 100); + objectNode.put("nonexistent_field1", "value1"); + objectNode.put("nonexistent_field2", "value2"); Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "200"); + Key key = new SingleValueKey("default", "skipped-fields-doc-500"); + + CreateResult result = flatCollection.create(key, document); + + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertNotNull(result.getSkippedFields()); + assertEquals(2, result.getSkippedFields().size()); + assertTrue( + result + .getSkippedFields() + .containsAll(List.of("nonexistent_field1", "nonexistent_field2"))); + + // Verify the valid fields were inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = 'skipped-fields-doc-500'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Valid Item", rs.getString("item")); + assertEquals(100, rs.getInt("price")); + } + } - assertThrows( - UnsupportedOperationException.class, - () -> flatCollection.createOrReplaceAndReturn(key, document)); + @Test + @DisplayName("Should return empty skipped fields when all columns exist") + void testCreateReturnsEmptySkippedFieldsWhenAllColumnsExist() throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "all-valid-doc-600"); + objectNode.put("item", "Valid Item"); + objectNode.put("price", 200); + objectNode.put("quantity", 10); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "all-valid-doc-600"); + + CreateResult result = flatCollection.create(key, document); + + assertTrue(result.isSucceed()); + assertFalse(result.isPartial()); + assertTrue(result.getSkippedFields().isEmpty()); + } + + @Test + @DisplayName("Should return failure when all fields are unknown (parsed.isEmpty)") + void testCreateFailsWhenAllFieldsAreUnknown() throws Exception { + // Document with only unknown fields - no valid columns will be found + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("completely_unknown_field1", "value1"); + objectNode.put("completely_unknown_field2", "value2"); + objectNode.put("another_nonexistent_column", 123); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "all-unknown-doc-700"); + + CreateResult result = flatCollection.create(key, document); + + // Should fail because no valid columns found (parsed.isEmpty() == true) + assertFalse(result.isSucceed()); + assertEquals(3, result.getSkippedFields().size()); + assertTrue( + result + .getSkippedFields() + .containsAll( + List.of( + "completely_unknown_field1", + "completely_unknown_field2", + "another_nonexistent_column"))); + + // Verify no row was inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = 'all-unknown-doc-700'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + } + } + + @Test + @DisplayName("Should refresh schema and retry on UNDEFINED_COLUMN error") + void testCreateRefreshesSchemaOnUndefinedColumnError() throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + + // Step 1: Add a temporary column and do a create to cache the schema + String addColumnSQL = + String.format("ALTER TABLE \"%s\" ADD COLUMN \"temp_col\" TEXT", FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(addColumnSQL)) { + ps.execute(); + LOGGER.info("Added temporary column 'temp_col' to table"); + } + + // Step 2: Create a document with the temp column to cache the schema + ObjectNode objectNode1 = OBJECT_MAPPER.createObjectNode(); + objectNode1.put("id", "cache-schema-doc"); + objectNode1.put("item", "Item to cache schema"); + objectNode1.put("temp_col", "temp value"); + flatCollection.create( + new SingleValueKey("default", "cache-schema-doc"), new JSONDocument(objectNode1)); + LOGGER.info("Schema cached with temp_col"); + + // Step 3: DROP the column - now the cached schema is stale + String dropColumnSQL = + String.format("ALTER TABLE \"%s\" DROP COLUMN \"temp_col\"", FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(dropColumnSQL)) { + ps.execute(); + LOGGER.info("Dropped temp_col - schema cache is now stale"); + } + + // Step 4: Try to create with the dropped column + // Schema registry still thinks temp_col exists, so it will include it in INSERT + // INSERT will fail with UNDEFINED_COLUMN, triggering handlePSQLExceptionForCreate + // which will refresh schema and retry + ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); + objectNode2.put("id", "retry-doc-800"); + objectNode2.put("item", "Item after schema refresh"); + objectNode2.put("temp_col", "this column no longer exists"); + Document document = new JSONDocument(objectNode2); + Key key = new SingleValueKey("default", "retry-doc-800"); + + CreateResult result = flatCollection.create(key, document); + + // Should succeed - temp_col will be skipped (either via retry or schema refresh) + assertTrue(result.isSucceed()); + // The dropped column should be skipped + assertTrue(result.getSkippedFields().contains("temp_col")); + + // Verify the valid fields were inserted + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = 'retry-doc-800'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Item after schema refresh", rs.getString("item")); + } + } + + @Test + @DisplayName("Should skip column with unparseable value and add to skippedFields") + void testCreateSkipsUnparseableValues() throws Exception { + // Try to insert a string value into an integer column with wrong type + // The unparseable column should be skipped, not throw an exception + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "datatype-mismatch-doc-900"); + objectNode.put("item", "Valid Item"); + objectNode.put("price", "not_a_number_at_all"); // price is INTEGER, this will fail parsing + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "datatype-mismatch-doc-900"); + + CreateResult result = flatCollection.create(key, document); + + // Should succeed with the valid columns, skipping the unparseable one + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertEquals(1, result.getSkippedFields().size()); + assertTrue(result.getSkippedFields().contains("price")); + + // Verify the valid fields were inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = 'datatype-mismatch-doc-900'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Valid Item", rs.getString("item")); + // price should be null since it was skipped + assertEquals(0, rs.getInt("price")); + assertTrue(rs.wasNull()); + } + } + + @Test + @DisplayName("Should handle all scalar data types including string parsing and nulls") + void testCreateWithAllDataTypes() throws Exception { + // Test 1: All types with native values (number nodes, boolean nodes, etc.) + ObjectNode nativeTypesNode = OBJECT_MAPPER.createObjectNode(); + nativeTypesNode.put("id", "native-types-doc"); + nativeTypesNode.put("item", "Native Types"); // TEXT + nativeTypesNode.put("price", 100); // INTEGER (number node) + nativeTypesNode.put("big_number", 9223372036854775807L); // BIGINT (number node) + nativeTypesNode.put("rating", 4.5f); // REAL (number node) + nativeTypesNode.put("weight", 123.456789); // DOUBLE PRECISION (number node) + nativeTypesNode.put("in_stock", true); // BOOLEAN (boolean node) + nativeTypesNode.put("date", "2024-01-15T10:30:00Z"); // TIMESTAMPTZ (textual) + nativeTypesNode.put("created_date", "2024-01-15"); // DATE (textual) + nativeTypesNode.putObject("props").put("key", "value"); // JSONB + + CreateResult result1 = + flatCollection.create( + new SingleValueKey("default", "native-types-doc"), new JSONDocument(nativeTypesNode)); + assertTrue(result1.isSucceed()); + + // Test 2: String representations of numbers (covers parseInt, parseLong, etc.) + ObjectNode stringTypesNode = OBJECT_MAPPER.createObjectNode(); + stringTypesNode.put("id", "string-types-doc"); + stringTypesNode.put("item", "String Types"); + stringTypesNode.put("price", "200"); // INTEGER from string + stringTypesNode.put("big_number", "1234567890123"); // BIGINT from string + stringTypesNode.put("rating", "3.75"); // REAL from string + stringTypesNode.put("weight", "987.654"); // DOUBLE PRECISION from string + stringTypesNode.put("in_stock", "true"); // BOOLEAN from string + + CreateResult result2 = + flatCollection.create( + new SingleValueKey("default", "string-types-doc"), new JSONDocument(stringTypesNode)); + assertTrue(result2.isSucceed()); + + // Test 3: TIMESTAMPTZ from epoch milliseconds + long epochMillis = 1705315800000L; + ObjectNode epochNode = OBJECT_MAPPER.createObjectNode(); + epochNode.put("id", "epoch-doc"); + epochNode.put("item", "Epoch Timestamp"); + epochNode.put("date", epochMillis); // TIMESTAMPTZ from number + + CreateResult result3 = + flatCollection.create( + new SingleValueKey("default", "epoch-doc"), new JSONDocument(epochNode)); + assertTrue(result3.isSucceed()); + + // Test 4: Null values (covers setParameter null handling) + ObjectNode nullNode = OBJECT_MAPPER.createObjectNode(); + nullNode.put("id", "null-doc"); + nullNode.put("item", "Null Values"); + nullNode.putNull("price"); + nullNode.putNull("date"); + nullNode.putNull("in_stock"); + + CreateResult result4 = + flatCollection.create( + new SingleValueKey("default", "null-doc"), new JSONDocument(nullNode)); + assertTrue(result4.isSucceed()); + + // Verify all inserts + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" IN ('native-types-doc', 'string-types-doc', 'epoch-doc', 'null-doc') ORDER BY \"id\"", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + + // epoch-doc + assertTrue(rs.next()); + assertEquals(epochMillis, rs.getTimestamp("date").getTime()); + + // native-types-doc + assertTrue(rs.next()); + assertEquals(100, rs.getInt("price")); + assertEquals(9223372036854775807L, rs.getLong("big_number")); + assertEquals(4.5f, rs.getFloat("rating"), 0.01f); + assertEquals(123.456789, rs.getDouble("weight"), 0.0001); + assertTrue(rs.getBoolean("in_stock")); + + // null-doc + assertTrue(rs.next()); + rs.getInt("price"); + assertTrue(rs.wasNull()); + + // string-types-doc + assertTrue(rs.next()); + assertEquals(200, rs.getInt("price")); + assertEquals(1234567890123L, rs.getLong("big_number")); + assertEquals(3.75f, rs.getFloat("rating"), 0.01f); + } + } + + @Test + @DisplayName("Should handle array types and single-to-array conversion") + void testCreateWithArrayTypes() throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + + // Test 1: Proper arrays + ObjectNode arrayNode = OBJECT_MAPPER.createObjectNode(); + arrayNode.put("id", "array-doc"); + arrayNode.put("item", "Array Types"); + arrayNode.putArray("tags").add("tag1").add("tag2"); // TEXT[] + arrayNode.putArray("numbers").add(10).add(20); // INTEGER[] + arrayNode.putArray("scores").add(1.5).add(2.5); // DOUBLE PRECISION[] + arrayNode.putArray("flags").add(true).add(false); // BOOLEAN[] + + CreateResult result1 = + flatCollection.create( + new SingleValueKey("default", "array-doc"), new JSONDocument(arrayNode)); + assertTrue(result1.isSucceed()); + + // Test 2: Single values auto-converted to arrays + ObjectNode singleNode = OBJECT_MAPPER.createObjectNode(); + singleNode.put("id", "single-to-array-doc"); + singleNode.put("item", "Single to Array"); + singleNode.put("tags", "single-tag"); // TEXT[] from single value + singleNode.put("numbers", 42); // INTEGER[] from single value + + CreateResult result2 = + flatCollection.create( + new SingleValueKey("default", "single-to-array-doc"), new JSONDocument(singleNode)); + assertTrue(result2.isSucceed()); + + // Verify + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" IN ('array-doc', 'single-to-array-doc') ORDER BY \"id\"", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + + // array-doc + assertTrue(rs.next()); + assertEquals(2, ((String[]) rs.getArray("tags").getArray()).length); + assertEquals(2, ((Integer[]) rs.getArray("numbers").getArray()).length); + + // single-to-array-doc + assertTrue(rs.next()); + String[] tags = (String[]) rs.getArray("tags").getArray(); + assertEquals(1, tags.length); + assertEquals("single-tag", tags[0]); + } } } @@ -396,4 +849,116 @@ void testBulkOperationOnArrayValue() throws IOException { () -> flatCollection.bulkOperationOnArrayValue(request)); } } + + @Nested + @DisplayName("Strict Mode (bestEffortWrites=false)") + class StrictModeTests { + + private Datastore strictDatastore; + private Collection strictCollection; + + @BeforeEach + void setupStrictModeDatastore() { + // Create a datastore with bestEffortWrites=false (strict mode) + String postgresConnectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); + + Map strictConfig = new HashMap<>(); + strictConfig.put("url", postgresConnectionUrl); + strictConfig.put("user", "postgres"); + strictConfig.put("password", "postgres"); + // Configure strict mode via customParams + Map customParams = new HashMap<>(); + customParams.put("bestEffortWrites", "false"); + strictConfig.put("customParams", customParams); + + strictDatastore = + DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(strictConfig)); + strictCollection = + strictDatastore.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); + } + + @Test + @DisplayName("Should throw SchemaMismatchException when column not in schema (strict mode)") + void testStrictModeThrowsOnUnknownColumn() { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "strict-unknown-col-doc"); + objectNode.put("item", "Valid Item"); + objectNode.put("unknown_column", "this should fail"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "strict-unknown-col-doc"); + + SchemaMismatchException exception = + assertThrows(SchemaMismatchException.class, () -> strictCollection.create(key, document)); + + assertTrue(exception.getMessage().contains("unknown_column")); + assertTrue(exception.getMessage().contains("not found in schema")); + } + + @Test + @DisplayName( + "Should throw SchemaMismatchException when value type doesn't match schema (strict mode)") + void testStrictModeThrowsOnTypeMismatch() { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "strict-type-mismatch-doc"); + objectNode.put("item", "Valid Item"); + objectNode.put("price", "not_a_number_at_all"); // price is INTEGER + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "strict-type-mismatch-doc"); + + SchemaMismatchException exception = + assertThrows(SchemaMismatchException.class, () -> strictCollection.create(key, document)); + + assertTrue(exception.getMessage().contains("price")); + assertTrue(exception.getMessage().contains("Failed to parse value")); + } + + @Test + @DisplayName("Should succeed in strict mode when all fields match schema") + void testStrictModeSucceedsWithValidDocument() throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "strict-valid-doc"); + objectNode.put("item", "Valid Item"); + objectNode.put("price", 100); + objectNode.put("quantity", 5); + objectNode.put("in_stock", true); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "strict-valid-doc"); + + CreateResult result = strictCollection.create(key, document); + + assertTrue(result.isSucceed()); + assertFalse(result.isPartial()); + assertTrue(result.getSkippedFields().isEmpty()); + + // Verify data was inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = 'strict-valid-doc'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Valid Item", rs.getString("item")); + assertEquals(100, rs.getInt("price")); + } + } + + @Test + @DisplayName("Should throw SchemaMismatchException on first unknown field (strict mode)") + void testStrictModeFailsFastOnFirstUnknownField() { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "strict-multi-unknown-doc"); + objectNode.put("unknown_field_1", "value1"); + objectNode.put("unknown_field_2", "value2"); + objectNode.put("item", "Valid Item"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "strict-multi-unknown-doc"); + + // Should throw on the first unknown field encountered + assertThrows(SchemaMismatchException.class, () -> strictCollection.create(key, document)); + } + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/CreateResult.java b/document-store/src/main/java/org/hypertrace/core/documentstore/CreateResult.java index 38934c7f..7b0f7736 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/CreateResult.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/CreateResult.java @@ -1,16 +1,24 @@ package org.hypertrace.core.documentstore; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; + /* * Represent the result object for CREATE operation of document store APIs. * */ +@AllArgsConstructor +@Getter public class CreateResult { - private boolean succeed; + private boolean isSucceed; + private boolean onRetry; + private List skippedFields; - public CreateResult(boolean succeed) { - this.succeed = succeed; + public CreateResult(boolean isSucceed) { + this.isSucceed = isSucceed; } - public boolean isSucceed() { - return succeed; + public boolean isPartial() { + return !skippedFields.isEmpty(); } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java index 475e55be..b52e16e9 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java @@ -18,4 +18,9 @@ public interface ColumnMetadata { * @return whether this column can be set to NULL */ boolean isNullable(); + + /** + * @return whether this column is an array type + */ + boolean isArray(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/exception/SchemaMismatchException.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/exception/SchemaMismatchException.java new file mode 100644 index 00000000..b7f3afca --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/exception/SchemaMismatchException.java @@ -0,0 +1,22 @@ +package org.hypertrace.core.documentstore.model.exception; + +import java.io.IOException; + +/** + * Exception thrown when a document field doesn't match the expected schema. This can occur when: + * + *
    + *
  • A field in the document doesn't exist in the schema + *
  • A field's value type doesn't match the expected schema type + *
+ */ +public class SchemaMismatchException extends IOException { + + public SchemaMismatchException(String message) { + super(message); + } + + public SchemaMismatchException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 747b63bd..2ae0f914 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -1,8 +1,21 @@ package org.hypertrace.core.documentstore.postgres; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest; @@ -15,12 +28,18 @@ import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; +import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; +import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer; import org.hypertrace.core.documentstore.query.Query; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,17 +53,29 @@ public class FlatPostgresCollection extends PostgresCollection { private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; + private static final String BEST_EFFORT_WRITES_CONFIG = "bestEffortWrites"; private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry; + /** + * When true (default), fields that don't match the schema are skipped. When false (strict mode), + * all fields must be present in the schema with correct types (any fields present in the doc that + * are not present in the schema would make this check fail) + */ + private final boolean bestEffortWrites; + FlatPostgresCollection( final PostgresClient client, final String collectionName, final PostgresLazyilyLoadedSchemaRegistry schemaRegistry) { super(client, collectionName); this.schemaRegistry = schemaRegistry; + this.bestEffortWrites = + Boolean.parseBoolean( + client.getCustomParameters().getOrDefault(BEST_EFFORT_WRITES_CONFIG, "true")); } @Override @@ -148,7 +179,7 @@ public void drop() { @Override public CreateResult create(Key key, Document document) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + return createWithRetry(key, document, false); } @Override @@ -188,4 +219,277 @@ public CloseableIterator bulkUpdate( throws IOException { throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); } + + /*isRetry: Whether this is a retry attempt*/ + private CreateResult createWithRetry(Key key, Document document, boolean isRetry) + throws IOException { + String tableName = tableIdentifier.getTableName(); + + List skippedFields = new ArrayList<>(); + + try { + TypedDocument parsed = parseDocument(document, tableName, skippedFields); + // if there are no valid columns in the document + if (parsed.isEmpty()) { + LOGGER.warn("No valid columns found in the document for table: {}", tableName); + return new CreateResult(false, isRetry, skippedFields); + } + + String sql = buildInsertSql(parsed.getColumns()); + LOGGER.debug("Insert SQL: {}", sql); + + int result = executeUpdate(sql, parsed); + LOGGER.debug("Create result: {}", result); + return new CreateResult(result > 0, isRetry, skippedFields); + + } catch (PSQLException e) { + if (PSQLState.UNIQUE_VIOLATION.getState().equals(e.getSQLState())) { + throw new DuplicateDocumentException(); + } + return handlePSQLExceptionForCreate(e, key, document, tableName, isRetry); + } catch (SQLException e) { + LOGGER.error("SQLException creating document. key: {} content: {}", key, document, e); + throw new IOException(e); + } + } + + private TypedDocument parseDocument( + Document document, String tableName, List skippedColumns) throws IOException { + JsonNode jsonNode = MAPPER.readTree(document.toJson()); + TypedDocument typedDocument = new TypedDocument(); + + Iterator> fields = jsonNode.fields(); + while (fields.hasNext()) { + Entry field = fields.next(); + String fieldName = field.getKey(); + JsonNode fieldValue = field.getValue(); + + Optional columnMetadata = + schemaRegistry.getColumnOrRefresh(tableName, fieldName); + + if (columnMetadata.isEmpty()) { + if (!bestEffortWrites) { + throw new SchemaMismatchException( + "Column '" + fieldName + "' not found in schema for table: " + tableName); + } + LOGGER.warn("Could not find column metadata for column: {}, skipping it", fieldName); + skippedColumns.add(fieldName); + continue; + } + + PostgresDataType type = columnMetadata.get().getPostgresType(); + boolean isArray = columnMetadata.get().isArray(); + + try { + Object value = extractValue(fieldValue, type, isArray); + typedDocument.add("\"" + fieldName + "\"", value, type, isArray); + } catch (Exception e) { + if (!bestEffortWrites) { + throw new SchemaMismatchException( + "Failed to parse value for column '" + + fieldName + + "' with type " + + type + + ": " + + e.getMessage(), + e); + } + LOGGER.warn( + "Could not parse value for column: {} with type: {}, skipping it. Error: {}", + fieldName, + type, + e.getMessage()); + skippedColumns.add(fieldName); + } + } + + return typedDocument; + } + + private int executeUpdate(String sql, TypedDocument parsed) throws SQLException { + try (Connection conn = client.getPooledConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + int index = 1; + for (String column : parsed.getColumns()) { + setParameter( + conn, + ps, + index++, + parsed.getValue(column), + parsed.getType(column), + parsed.isArray(column)); + } + return ps.executeUpdate(); + } + } + + private CreateResult handlePSQLExceptionForCreate( + PSQLException e, Key key, Document document, String tableName, boolean isRetry) + throws IOException { + if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) { + LOGGER.info( + "Schema mismatch detected (SQLState: {}), refreshing schema and retrying. key: {}", + e.getSQLState(), + key); + schemaRegistry.invalidate(tableName); + return createWithRetry(key, document, true); + } + LOGGER.error("SQLException creating document. key: {} content: {}", key, document, e); + throw new IOException(e); + } + + /** + * Returns true if the SQL state indicates a schema mismatch, i.e. the column does not exist or + * the data type is mismatched. + */ + private boolean shouldRefreshSchemaAndRetry(String sqlState) { + return PSQLState.UNDEFINED_COLUMN.getState().equals(sqlState) + || PSQLState.DATATYPE_MISMATCH.getState().equals(sqlState); + } + + /** + * Typed document contains field information along with the field type. Uses LinkedHashMaps keyed + * by column name. LinkedHashMap preserves insertion order for consistent parameter binding. + */ + private static class TypedDocument { + private final Map values = new HashMap<>(); + private final Map types = new HashMap<>(); + private final Map arrays = new HashMap<>(); + + void add(String column, Object value, PostgresDataType type, boolean isArray) { + values.put(column, value); + types.put(column, type); + arrays.put(column, isArray); + } + + boolean isEmpty() { + return values.isEmpty(); + } + + List getColumns() { + return new ArrayList<>(values.keySet()); + } + + Object getValue(String column) { + return values.get(column); + } + + PostgresDataType getType(String column) { + return types.get(column); + } + + boolean isArray(String column) { + return arrays.getOrDefault(column, false); + } + } + + private String buildInsertSql(List columns) { + String columnList = String.join(", ", columns); + String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); + return String.format( + "INSERT INTO %s (%s) VALUES (%s)", tableIdentifier, columnList, placeholders); + } + + private Object extractValue(JsonNode node, PostgresDataType type, boolean isArray) { + if (node == null || node.isNull()) { + return null; + } + + if (isArray) { + if (!node.isArray()) { + node = MAPPER.createArrayNode().add(node); + } + List values = new ArrayList<>(); + for (JsonNode element : node) { + values.add(extractScalarValue(element, type)); + } + return values.toArray(); + } + + return extractScalarValue(node, type); + } + + private Object extractScalarValue(JsonNode node, PostgresDataType type) { + switch (type) { + case INTEGER: + return node.isNumber() ? node.intValue() : Integer.parseInt(node.asText()); + case BIGINT: + return node.isNumber() ? node.longValue() : Long.parseLong(node.asText()); + case REAL: + return node.isNumber() ? node.floatValue() : Float.parseFloat(node.asText()); + case DOUBLE_PRECISION: + return node.isNumber() ? node.doubleValue() : Double.parseDouble(node.asText()); + case BOOLEAN: + return node.isBoolean() ? node.booleanValue() : Boolean.parseBoolean(node.asText()); + case TIMESTAMPTZ: + if (node.isTextual()) { + return Timestamp.from(Instant.parse(node.asText())); + } else if (node.isNumber()) { + return new Timestamp(node.longValue()); + } + return null; + case DATE: + if (node.isTextual()) { + return Date.valueOf(node.asText()); + } + return null; + case JSONB: + return node.toString(); + default: + return node.asText(); + } + } + + private void setParameter( + Connection conn, + PreparedStatement ps, + int index, + Object value, + PostgresDataType type, + boolean isArray) + throws SQLException { + if (value == null) { + ps.setObject(index, null); + return; + } + + if (isArray) { + Object[] arrayValues = (value instanceof Object[]) ? (Object[]) value : new Object[] {value}; + java.sql.Array sqlArray = conn.createArrayOf(type.getSqlType(), arrayValues); + ps.setArray(index, sqlArray); + return; + } + + switch (type) { + case INTEGER: + ps.setInt(index, (Integer) value); + break; + case BIGINT: + ps.setLong(index, (Long) value); + break; + case REAL: + ps.setFloat(index, (Float) value); + break; + case DOUBLE_PRECISION: + ps.setDouble(index, (Double) value); + break; + case BOOLEAN: + ps.setBoolean(index, (Boolean) value); + break; + case TEXT: + ps.setString(index, (String) value); + break; + case TIMESTAMPTZ: + ps.setTimestamp(index, (Timestamp) value); + break; + case DATE: + ps.setDate(index, (java.sql.Date) value); + break; + case JSONB: + ps.setObject(index, value, Types.OTHER); + break; + default: + ps.setString(index, value.toString()); + } + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java index 24b10e07..8e35937b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -37,10 +37,16 @@ public Map fetch(String tableName) { String columnName = rs.getString("column_name"); String udtName = rs.getString("udt_name"); boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable")); + boolean isArray = udtName != null && udtName.startsWith("_"); + String baseType = isArray ? udtName.substring(1) : udtName; metadataMap.put( columnName, new PostgresColumnMetadata( - columnName, mapToCanonicalType(udtName), mapToPostgresType(udtName), isNullable)); + columnName, + mapToCanonicalType(baseType), + mapToPostgresType(baseType), + isNullable, + isArray)); } } return metadataMap; diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java index 3a2d4540..ecc6ca6a 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java @@ -15,6 +15,7 @@ public class PostgresColumnMetadata implements ColumnMetadata { private final DataType canonicalType; @Getter private final PostgresDataType postgresType; private final boolean nullable; + private final boolean isArray; @Override public String getName() { @@ -30,4 +31,9 @@ public DataType getCanonicalType() { public boolean isNullable() { return nullable; } + + @Override + public boolean isArray() { + return isArray; + } } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java index 6fa32d91..7c373eb8 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java @@ -300,6 +300,70 @@ void fetchThrowsRuntimeExceptionOnSqlException() throws SQLException { assertTrue(exception.getCause() instanceof SQLException); } + @Test + void fetchMapsTextArrayToStringArray() throws SQLException { + setupSingleColumnResult("col", "_text", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + assertTrue(meta.isArray()); + } + + @Test + void fetchMapsInt4ArrayToIntegerArray() throws SQLException { + setupSingleColumnResult("col", "_int4", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.INTEGER, meta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, meta.getPostgresType()); + assertTrue(meta.isArray()); + } + + @Test + void fetchMapsInt8ArrayToLongArray() throws SQLException { + setupSingleColumnResult("col", "_int8", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.LONG, meta.getCanonicalType()); + assertEquals(PostgresDataType.BIGINT, meta.getPostgresType()); + assertTrue(meta.isArray()); + } + + @Test + void fetchMapsFloat8ArrayToDoubleArray() throws SQLException { + setupSingleColumnResult("col", "_float8", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.DOUBLE, meta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, meta.getPostgresType()); + assertTrue(meta.isArray()); + } + + @Test + void fetchMapsBoolArrayToBooleanArray() throws SQLException { + setupSingleColumnResult("col", "_bool", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.BOOLEAN, meta.getCanonicalType()); + assertEquals(PostgresDataType.BOOLEAN, meta.getPostgresType()); + assertTrue(meta.isArray()); + } + + @Test + void fetchReturnsIsArrayFalseForNonArrayTypes() throws SQLException { + setupSingleColumnResult("col", "text", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertFalse(meta.isArray()); + } + private void setupSingleColumnResult(String colName, String udtName, String isNullable) throws SQLException { when(resultSet.next()).thenReturn(true, false);