diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 456df867..6db4925a 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -127,6 +127,20 @@ jobs: --health-retries 15 --health-start-period 60s + meilisearch: + image: getmeili/meilisearch:v1.12 + env: + MEILI_NO_ANALYTICS: true + MEILI_EXPERIMENTAL_VECTOR_STORE: true + ports: + - 7700:7700 + # Health check for Meilisearch + options: >- + --health-cmd "curl -s http://127.0.0.1:7700/health || exit 1" + --health-interval 10s + --health-timeout 5s + --health-retries 10 + steps: - uses: actions/checkout@v4 diff --git a/acronis-db-bench/acronis-db-bench.go b/acronis-db-bench/acronis-db-bench.go index d69803e7..41ff5672 100644 --- a/acronis-db-bench/acronis-db-bench.go +++ b/acronis-db-bench/acronis-db-bench.go @@ -10,6 +10,7 @@ import ( // List of database drivers _ "github.com/acronis/perfkit/db/es" // es drivers + _ "github.com/acronis/perfkit/db/ms" // meilisearch drivers _ "github.com/acronis/perfkit/db/sql" // sql drivers // List of test groups diff --git a/acronis-db-bench/engine/tests.go b/acronis-db-bench/engine/tests.go index 6a96f8c1..a35550b3 100644 --- a/acronis-db-bench/engine/tests.go +++ b/acronis-db-bench/engine/tests.go @@ -85,7 +85,7 @@ func (t *TestDesc) getDBs() string { var ( // ALL is a list of all supported databases - ALL = []db.DialectName{db.POSTGRES, db.MYSQL, db.MSSQL, db.SQLITE, db.CLICKHOUSE, db.CASSANDRA, db.ELASTICSEARCH, db.OPENSEARCH} + ALL = []db.DialectName{db.POSTGRES, db.MYSQL, db.MSSQL, db.SQLITE, db.CLICKHOUSE, db.CASSANDRA, db.ELASTICSEARCH, db.OPENSEARCH, db.MEILISEARCH} // RELATIONAL is a list of all supported relational databases RELATIONAL = []db.DialectName{db.POSTGRES, db.MYSQL, db.MSSQL, db.SQLITE} // PMWSA is a list of all supported databases except ClickHouse diff --git a/acronis-db-bench/engine/workers.go b/acronis-db-bench/engine/workers.go index a94fd826..35e31a6e 100644 --- a/acronis-db-bench/engine/workers.go +++ b/acronis-db-bench/engine/workers.go @@ -455,7 +455,7 @@ func TestInsertGeneric(b *benchmark.Benchmark, testDesc *TestDesc) { if txErr := sess.Transact(func(tx db.DatabaseAccessor) error { for i := 0; i < batch; i++ { - columns, values, err := worker.Randomizer.GenFakeData(colConfs, false) + columns, values, err := worker.Randomizer.GenFakeData(colConfs, db.WithAutoInc(dialectName)) if err != nil { return err } diff --git a/acronis-db-bench/go.mod b/acronis-db-bench/go.mod index 5249a26f..acfca0ba 100644 --- a/acronis-db-bench/go.mod +++ b/acronis-db-bench/go.mod @@ -18,7 +18,7 @@ require ( github.com/denisenkom/go-mssqldb v0.12.3 github.com/google/uuid v1.6.0 github.com/lib/pq v1.10.9 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.11.1 ) require ( @@ -41,6 +41,7 @@ require ( github.com/goccy/go-json v0.10.3 // indirect github.com/gocql/gocql v1.6.0 // indirect github.com/gocraft/dbr/v2 v2.7.6 // indirect + github.com/golang-jwt/jwt/v5 v5.3.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/snappy v0.0.4 // indirect @@ -51,6 +52,7 @@ require ( github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/meilisearch/meilisearch-go v0.36.1 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/opensearch-project/opensearch-go/v4 v4.2.0 // indirect diff --git a/acronis-db-bench/go.sum b/acronis-db-bench/go.sum index 4a838634..fd430645 100644 --- a/acronis-db-bench/go.sum +++ b/acronis-db-bench/go.sum @@ -54,6 +54,8 @@ github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJr github.com/gocraft/dbr/v2 v2.7.6 h1:ASHKFgCbTLODbb9f756Cl8VAlnvQLKqIzx9E1Cfb7eo= github.com/gocraft/dbr/v2 v2.7.6/go.mod h1:8IH98S8M8J0JSEiYk0MPH26ZDUKemiQ/GvmXL5jo+Uw= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= +github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= @@ -99,6 +101,8 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/meilisearch/meilisearch-go v0.36.1 h1:mJTCJE5g7tRvaqKco6DfqOuJEjX+rRltDEnkEC02Y0M= +github.com/meilisearch/meilisearch-go v0.36.1/go.mod h1:hWcR0MuWLSzHfbz9GGzIr3s9rnXLm1jqkmHkJPbUSvM= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= @@ -129,8 +133,8 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= diff --git a/acronis-db-bench/tenants-cache/tenant_generator.go b/acronis-db-bench/tenants-cache/tenant_generator.go index 845d680b..5687f29d 100644 --- a/acronis-db-bench/tenants-cache/tenant_generator.go +++ b/acronis-db-bench/tenants-cache/tenant_generator.go @@ -307,12 +307,6 @@ engine = MergeTree() ORDER BY (tenant_id, cti_entity_uuid);`, TableNameCtiProvis // Init initializes tenants cache and creates tables if needed func (tc *TenantsCache) Init(database db.Database) error { - var dialect = database.DialectName() - if dialect != db.CLICKHOUSE && dialect != db.CASSANDRA && dialect != db.MYSQL && dialect != db.POSTGRES && dialect != db.SQLITE { - tc.logger.Error("unsupported dialect: %s", dialect) - return fmt.Errorf("unsupported dialect: %s", dialect) - } - var eventData []tenantStructureData if err := json.Unmarshal(tenantStructure, &eventData); err != nil { return fmt.Errorf("error unmarshalling tenant_structure.json: %v", err) @@ -321,6 +315,18 @@ func (tc *TenantsCache) Init(database db.Database) error { tc.logger.Trace("tenants probablity config: %v", eventData) tc.tenantStructureRandomizer = newTenantStructureRandomizer(eventData) + // Document-store engines (ES, OpenSearch, Meilisearch) don't use SQL tenant tables + var dialect = database.DialectName() + if dialect == db.ELASTICSEARCH || dialect == db.OPENSEARCH || dialect == db.MEILISEARCH { + tc.logger.Trace("skipping tenant SQL tables for %s", dialect) + return nil + } + + if dialect != db.CLICKHOUSE && dialect != db.CASSANDRA && dialect != db.MYSQL && dialect != db.POSTGRES && dialect != db.SQLITE { + tc.logger.Error("unsupported dialect: %s", dialect) + return fmt.Errorf("unsupported dialect: %s", dialect) + } + tc.logger.Trace("init") tc.CreateTables(database) tc.PopulateUuidsFromDB(database) diff --git a/db/db.go b/db/db.go index b007a191..382b842b 100644 --- a/db/db.go +++ b/db/db.go @@ -22,6 +22,7 @@ const ( CASSANDRA DialectName = "cassandra" // CASSANDRA is the Cassandra driver name ELASTICSEARCH DialectName = "elasticsearch" // ELASTICSEARCH is the Elasticsearch driver name OPENSEARCH DialectName = "opensearch" // OPENSEARCH is the OpenSearch driver name + MEILISEARCH DialectName = "meilisearch" // MEILISEARCH is the Meilisearch driver name ) // Special conditions for searching @@ -1986,6 +1987,7 @@ func GetDatabases() []DBType { ret = append(ret, DBType{Driver: CASSANDRA, Symbol: "A", Name: "Cassandra"}) ret = append(ret, DBType{Driver: ELASTICSEARCH, Symbol: "E", Name: "Elasticsearch"}) ret = append(ret, DBType{Driver: OPENSEARCH, Symbol: "O", Name: "OpenSearch"}) + ret = append(ret, DBType{Driver: MEILISEARCH, Symbol: "R", Name: "Meilisearch"}) return ret } diff --git a/db/go.mod b/db/go.mod index de7a543a..20bcc4f3 100644 --- a/db/go.mod +++ b/db/go.mod @@ -7,7 +7,6 @@ toolchain go1.21.4 require ( github.com/ClickHouse/clickhouse-go/v2 v2.26.0 github.com/MichaelS11/go-cql-driver v0.1.1 - github.com/acronis/perfkit/logger v0.0.0-20250402063049-20a8306d6a02 github.com/denisenkom/go-mssqldb v0.12.3 github.com/elastic/go-elasticsearch/v8 v8.15.0 github.com/fergusstrange/embedded-postgres v1.27.0 @@ -17,21 +16,23 @@ require ( github.com/google/uuid v1.6.0 github.com/lib/pq v1.10.9 github.com/mattn/go-sqlite3 v1.14.22 + github.com/meilisearch/meilisearch-go v0.36.1 github.com/opensearch-project/opensearch-go/v4 v4.2.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.11.1 go.uber.org/atomic v1.11.0 ) require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/ClickHouse/ch-go v0.61.5 // indirect - github.com/andybalholm/brotli v1.1.0 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.7.1 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang-jwt/jwt/v5 v5.3.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/db/go.sum b/db/go.sum index ebdd5828..2d25545d 100644 --- a/db/go.sum +++ b/db/go.sum @@ -11,10 +11,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20O github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/MichaelS11/go-cql-driver v0.1.1 h1:ntFKov/39Tl36HckP4tzld3XMeyDYHHO00MiZNdoL1A= github.com/MichaelS11/go-cql-driver v0.1.1/go.mod h1:rMwGk5bMWiYI/If6r6dbqEfZG6nQLvqJHTplv5yTDaw= -github.com/acronis/perfkit/logger v0.0.0-20250402063049-20a8306d6a02 h1:0O57X12c2IOUEGEM9o05ODUaByqAj4Ri5XrhciEkDz0= -github.com/acronis/perfkit/logger v0.0.0-20250402063049-20a8306d6a02/go.mod h1:4D15/pV290YlzVA7cxS0Cg+s0QCXDOd0Iiq8rUyJo9U= -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= @@ -48,6 +46,8 @@ github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJr github.com/gocraft/dbr/v2 v2.7.6 h1:ASHKFgCbTLODbb9f756Cl8VAlnvQLKqIzx9E1Cfb7eo= github.com/gocraft/dbr/v2 v2.7.6/go.mod h1:8IH98S8M8J0JSEiYk0MPH26ZDUKemiQ/GvmXL5jo+Uw= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= +github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= @@ -84,6 +84,8 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/meilisearch/meilisearch-go v0.36.1 h1:mJTCJE5g7tRvaqKco6DfqOuJEjX+rRltDEnkEC02Y0M= +github.com/meilisearch/meilisearch-go v0.36.1/go.mod h1:hWcR0MuWLSzHfbz9GGzIr3s9rnXLm1jqkmHkJPbUSvM= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/opensearch-project/opensearch-go/v4 v4.2.0 h1:uaBexfVdeSU15yOUPYF+IY059koVP0oNQPyoSde6N/A= @@ -108,8 +110,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -126,6 +128,8 @@ github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23n github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/db/helpers.go b/db/helpers.go index 926a5d45..f94133a7 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -207,7 +207,7 @@ func PrintStack() { // WithAutoInc returns true if DBDriver should support 'autoinc' field as current time nanoseconds func WithAutoInc(name DialectName) bool { switch name { - case CASSANDRA: + case CASSANDRA, MEILISEARCH: return true default: return false diff --git a/db/ms/insert.go b/db/ms/insert.go new file mode 100644 index 00000000..e69988d2 --- /dev/null +++ b/db/ms/insert.go @@ -0,0 +1,57 @@ +package ms + +import ( + "fmt" + "time" +) + +// @cpt-perfkit-db-algo-ms-adapter-bulk-insert +func (g *msGateway) BulkInsert(tableName string, rows [][]interface{}, columnNames []string) error { + if len(rows) == 0 { + return nil + } + + var documents []map[string]interface{} + + for _, row := range rows { + if len(row) != len(columnNames) { + return fmt.Errorf("row length doesn't match column names length") + } + + doc := make(map[string]interface{}) + vectors := make(map[string]interface{}) + + for i, col := range row { + // Vector columns are stored in _vectors under the embedder name (column name) + if vec, ok := col.([]float32); ok { + vectors[columnNames[i]] = vec + } else { + doc[columnNames[i]] = col + } + } + + if len(vectors) > 0 { + doc["_vectors"] = vectors + } + + documents = append(documents, doc) + } + + index := g.client.Index(tableName) + + taskInfo, err := index.AddDocuments(documents, nil) + if err != nil { + return fmt.Errorf("failed to add documents to meilisearch index %s: %v", tableName, err) + } + + task, err := g.client.WaitForTask(taskInfo.TaskUID, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to wait for meilisearch task %d: %v", taskInfo.TaskUID, err) + } + + if task.Status == "failed" { + return fmt.Errorf("meilisearch bulk insert task %d failed: %s", task.UID, task.Error.Message) + } + + return nil +} diff --git a/db/ms/insert_test.go b/db/ms/insert_test.go new file mode 100644 index 00000000..716c8b32 --- /dev/null +++ b/db/ms/insert_test.go @@ -0,0 +1,154 @@ +package ms + +import ( + "time" + + "github.com/acronis/perfkit/db" +) + +func (suite *TestingSuite) TestInsert() { + d, s, c := suite.makeTestSession() + defer logDbTime(suite.T(), c) + defer cleanup(suite.T(), d) + + var toInsert [][]interface{} + toInsert = append(toInsert, []interface{}{ + int64(1), + "00000000-0000-0000-0000-000000000001", + "test", + "policy_a", + "resource_a", + int64(100), + }) + toInsert = append(toInsert, []interface{}{ + int64(2), + "00000000-0000-0000-0000-000000000002", + "test_type", + "policy_b", + "resource_b", + int64(200), + }) + + columnNames := []string{"id", "uuid", "type", "policy_name", "resource_name", "score"} + + if err := s.BulkInsert("ms_perf_table", toInsert, columnNames); err != nil { + suite.T().Error(err) + return + } + + // Meilisearch indexes asynchronously; wait for indexing + time.Sleep(2 * time.Second) + + rows, err := s.Select("ms_perf_table", &db.SelectCtrl{ + Fields: []string{"id", "uuid", "type", "policy_name", "score"}, + Order: []string{"desc(score)"}, + }) + if err != nil { + suite.T().Error(err) + return + } + defer rows.Close() + + var count int + for rows.Next() { + var id, score int64 + var uuid, testType, policyName string + + if scanErr := rows.Scan(&id, &uuid, &testType, &policyName, &score); scanErr != nil { + suite.T().Error(scanErr) + return + } + suite.T().Log("row", id, uuid, testType, policyName, score) + count++ + } + + suite.Equal(2, count, "expected 2 rows") +} + +func (suite *TestingSuite) TestSelectWithFilter() { + d, s, c := suite.makeTestSession() + defer logDbTime(suite.T(), c) + defer cleanup(suite.T(), d) + + var toInsert [][]interface{} + toInsert = append(toInsert, []interface{}{int64(10), "uuid-a", "typeA", "policy_x", "res_x", int64(50)}) + toInsert = append(toInsert, []interface{}{int64(20), "uuid-b", "typeB", "policy_y", "res_y", int64(150)}) + toInsert = append(toInsert, []interface{}{int64(30), "uuid-c", "typeA", "policy_z", "res_z", int64(250)}) + + columnNames := []string{"id", "uuid", "type", "policy_name", "resource_name", "score"} + + if err := s.BulkInsert("ms_perf_table", toInsert, columnNames); err != nil { + suite.T().Error(err) + return + } + + time.Sleep(2 * time.Second) + + rows, err := s.Select("ms_perf_table", &db.SelectCtrl{ + Fields: []string{"id", "type", "score"}, + Where: map[string][]string{ + "type": {"typeA"}, + }, + Order: []string{"asc(id)"}, + }) + if err != nil { + suite.T().Error(err) + return + } + defer rows.Close() + + var count int + for rows.Next() { + var id, score int64 + var tp string + + if scanErr := rows.Scan(&id, &tp, &score); scanErr != nil { + suite.T().Error(scanErr) + return + } + suite.T().Log("row", id, tp, score) + suite.Equal("typeA", tp) + count++ + } + + suite.Equal(2, count, "expected 2 rows matching type=typeA") +} + +func (suite *TestingSuite) TestSelectCount() { + d, s, c := suite.makeTestSession() + defer logDbTime(suite.T(), c) + defer cleanup(suite.T(), d) + + var toInsert [][]interface{} + toInsert = append(toInsert, []interface{}{int64(1), "u1", "t1", "p1", "r1", int64(10)}) + toInsert = append(toInsert, []interface{}{int64(2), "u2", "t2", "p2", "r2", int64(20)}) + toInsert = append(toInsert, []interface{}{int64(3), "u3", "t1", "p3", "r3", int64(30)}) + + columnNames := []string{"id", "uuid", "type", "policy_name", "resource_name", "score"} + + if err := s.BulkInsert("ms_perf_table", toInsert, columnNames); err != nil { + suite.T().Error(err) + return + } + + time.Sleep(2 * time.Second) + + rows, err := s.Select("ms_perf_table", &db.SelectCtrl{ + Fields: []string{"COUNT(0)"}, + }) + if err != nil { + suite.T().Error(err) + return + } + defer rows.Close() + + suite.True(rows.Next()) + + var count int64 + if scanErr := rows.Scan(&count); scanErr != nil { + suite.T().Error(scanErr) + return + } + + suite.Equal(int64(3), count, "expected count of 3") +} diff --git a/db/ms/maintenance.go b/db/ms/maintenance.go new file mode 100644 index 00000000..40c03ca4 --- /dev/null +++ b/db/ms/maintenance.go @@ -0,0 +1,233 @@ +package ms + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/meilisearch/meilisearch-go" //nolint:depguard + + "github.com/acronis/perfkit/db" +) + +// enableVectorStore enables the vectorStore experimental feature via the REST API. +// The meilisearch-go SDK v0.36.1 removed SetVectorStore since newer Meilisearch +// versions promoted it out of experimental, but older servers still need it. +func enableVectorStore(host string) error { + endpoint := host + "/experimental-features" + + // Check current state + getResp, err := http.Get(endpoint) //nolint:gosec // host is from config + if err != nil { + return nil // endpoint might not exist in newer versions + } + defer getResp.Body.Close() + + body, err := io.ReadAll(getResp.Body) + if err != nil { + return fmt.Errorf("failed to read experimental features response: %v", err) + } + + var features map[string]interface{} + if err = json.Unmarshal(body, &features); err != nil { + return nil // cannot parse, skip + } + + // Check if vectorStore exists and is already enabled + if vs, ok := features["vectorStore"]; ok { + if enabled, ok := vs.(bool); ok && enabled { + return nil // already enabled + } + } else { + return nil // field doesn't exist, vector store is built-in + } + + // Enable vectorStore + payload := []byte(`{"vectorStore": true}`) + req, err := http.NewRequest(http.MethodPatch, endpoint, bytes.NewReader(payload)) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + patchResp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to enable vector store: %v", err) + } + defer patchResp.Body.Close() + + if patchResp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(patchResp.Body) + return fmt.Errorf("failed to enable vector store, status %d: %s", patchResp.StatusCode, string(respBody)) + } + + return nil +} + +// @cpt-perfkit-db-algo-ms-adapter-index-lifecycle + +// TableExists checks if a Meilisearch index exists +func (d *msDatabase) TableExists(tableName string) (bool, error) { + _, err := d.client.GetIndex(tableName) + if err != nil { + // Meilisearch returns an error when the index doesn't exist + return false, nil + } + return true, nil +} + +// CreateTable creates a Meilisearch index with primary key and filterable/sortable attributes +func (d *msDatabase) CreateTable(tableName string, tableDefinition *db.TableDefinition, tableMigrationDDL string) error { + // Determine primary key from table definition + var primaryKey string + for _, row := range tableDefinition.TableRows { + if row.IsPrimaryKey() { + primaryKey = row.GetName() + break + } + } + + if primaryKey == "" && len(tableDefinition.PrimaryKey) > 0 { + primaryKey = tableDefinition.PrimaryKey[0] + } + + if primaryKey == "" { + primaryKey = "id" + } + + taskInfo, err := d.client.CreateIndex(&meilisearch.IndexConfig{ + Uid: tableName, + PrimaryKey: primaryKey, + }) + if err != nil { + return fmt.Errorf("failed to create meilisearch index %s: %v", tableName, err) + } + + task, err := d.client.WaitForTask(taskInfo.TaskUID, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to wait for meilisearch create index task: %v", err) + } + + if task.Status == "failed" { + return fmt.Errorf("meilisearch create index task failed: %s", task.Error.Message) + } + + // Configure filterable and sortable attributes from column definitions + var filterableAttrs []interface{} + var sortableAttrs []string + embedders := make(map[string]meilisearch.Embedder) + + for _, row := range tableDefinition.TableRows { + // Vector columns are configured as embedders, not filterable/sortable + switch row.GetType() { + case db.DataTypeVector3Float32: + embedders[row.GetName()] = meilisearch.Embedder{ + Source: meilisearch.UserProvidedEmbedderSource, + Dimensions: 3, + } + continue + case db.DataTypeVector768Float32: + embedders[row.GetName()] = meilisearch.Embedder{ + Source: meilisearch.UserProvidedEmbedderSource, + Dimensions: 768, + } + continue + } + + // All non-vector columns are made filterable for benchmark flexibility + filterableAttrs = append(filterableAttrs, interface{}(row.GetName())) + + // Numeric and datetime types are sortable (including primary key) + switch row.GetType() { + case db.DataTypeId, db.DataTypeInt, db.DataTypeBigInt, db.DataTypeBigIntAutoInc, + db.DataTypeSmallInt, db.DataTypeTinyInt, db.DataTypeDateTime, db.DataTypeDateTime6, + db.DataTypeTimestamp, db.DataTypeTimestamp6: + sortableAttrs = append(sortableAttrs, row.GetName()) + } + } + + index := d.client.Index(tableName) + + // Configure vector embedders if any vector columns exist + if len(embedders) > 0 { + // Enable vector store experimental feature if needed + if err = enableVectorStore(d.host); err != nil { + return fmt.Errorf("failed to enable vector store for index %s: %v", tableName, err) + } + + taskInfo, err = index.UpdateEmbedders(embedders) + if err != nil { + return fmt.Errorf("failed to update embedders for index %s: %v", tableName, err) + } + + task, err = d.client.WaitForTask(taskInfo.TaskUID, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to wait for embedders task: %v", err) + } + + if task.Status == "failed" { + return fmt.Errorf("update embedders task failed: %s", task.Error.Message) + } + } + + if len(filterableAttrs) > 0 { + taskInfo, err = index.UpdateFilterableAttributes(&filterableAttrs) + if err != nil { + return fmt.Errorf("failed to update filterable attributes for index %s: %v", tableName, err) + } + + task, err = d.client.WaitForTask(taskInfo.TaskUID, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to wait for filterable attributes task: %v", err) + } + + if task.Status == "failed" { + return fmt.Errorf("update filterable attributes task failed: %s", task.Error.Message) + } + } + + if len(sortableAttrs) > 0 { + taskInfo, err = index.UpdateSortableAttributes(&sortableAttrs) + if err != nil { + return fmt.Errorf("failed to update sortable attributes for index %s: %v", tableName, err) + } + + task, err = d.client.WaitForTask(taskInfo.TaskUID, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to wait for sortable attributes task: %v", err) + } + + if task.Status == "failed" { + return fmt.Errorf("update sortable attributes task failed: %s", task.Error.Message) + } + } + + return nil +} + +// DropTable deletes a Meilisearch index +func (d *msDatabase) DropTable(tableName string) error { + taskInfo, err := d.client.DeleteIndex(tableName) + if err != nil { + return fmt.Errorf("failed to delete meilisearch index %s: %v", tableName, err) + } + + task, err := d.client.WaitForTask(taskInfo.TaskUID, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to wait for meilisearch delete index task: %v", err) + } + + if task.Status == "failed" { + // Ignore "index not found" errors for idempotent cleanup + if strings.Contains(task.Error.Message, "not found") { + return nil + } + return fmt.Errorf("meilisearch delete index task failed: %s", task.Error.Message) + } + + return nil +} diff --git a/db/ms/maintenance_test.go b/db/ms/maintenance_test.go new file mode 100644 index 00000000..7a851f44 --- /dev/null +++ b/db/ms/maintenance_test.go @@ -0,0 +1,63 @@ +package ms + +import ( + "time" + + "github.com/acronis/perfkit/db" +) + +func (suite *TestingSuite) TestMeilisearchSchemaInit() { + dbo, err := db.Open(db.Config{ + ConnString: suite.ConnString, + MaxOpenConns: 16, + MaxConnLifetime: 1000 * time.Millisecond, + }) + + if err != nil { + suite.T().Error("db create", err) + return + } + + var exists bool + if exists, err = dbo.TableExists("ms_perf_table"); err != nil { + suite.T().Error(err) + return + } else if exists { + // Clean up from previous test run + if err = dbo.DropTable("ms_perf_table"); err != nil { + suite.T().Error("drop leftover table", err) + return + } + } + + var tableSpec = testTableDefinition() + + if err = dbo.CreateTable("ms_perf_table", tableSpec, ""); err != nil { + suite.T().Error("create table", err) + return + } + + if exists, err = dbo.TableExists("ms_perf_table"); err != nil { + suite.T().Error(err) + return + } else if !exists { + suite.T().Error("table not exists after create") + return + } + + if err = dbo.DropTable("ms_perf_table"); err != nil { + suite.T().Error("drop table", err) + return + } + + // After drop, give Meilisearch time to process + time.Sleep(1 * time.Second) + + if exists, err = dbo.TableExists("ms_perf_table"); err != nil { + suite.T().Error(err) + return + } else if exists { + suite.T().Error("table still exists after drop") + return + } +} diff --git a/db/ms/ms.go b/db/ms/ms.go new file mode 100644 index 00000000..fd5df456 --- /dev/null +++ b/db/ms/ms.go @@ -0,0 +1,210 @@ +// Package ms provides an implementation of the db.Database interface for Meilisearch. +// @cpt-perfkit-db-component-ms-adapter +package ms + +import ( + "context" + "fmt" + "net/url" + + "github.com/meilisearch/meilisearch-go" + "go.uber.org/atomic" + + "github.com/acronis/perfkit/db" +) + +// @cpt-perfkit-db-algo-ms-adapter-register +// nolint: gochecknoinits // remove init() when we will have a better way to register connectors +func init() { + for _, msNameStyle := range []string{"ms", "meilisearch"} { + if err := db.Register(msNameStyle, &msConnector{}); err != nil { + panic(err) + } + } +} + +type msConnector struct{} + +// @cpt-perfkit-db-algo-ms-adapter-connect +func (c *msConnector) ConnectionPool(cfg db.Config) (db.Database, error) { + host, apiKey, err := parseMeilisearchURI(cfg.ConnString) + if err != nil { + return nil, fmt.Errorf("db: meilisearch: %v", err) + } + + var opts []meilisearch.Option + if apiKey != "" { + opts = append(opts, meilisearch.WithAPIKey(apiKey)) + } + + client := meilisearch.New(host, opts...) + + if _, err := client.Health(); err != nil { + return nil, fmt.Errorf("db: failed ping meilisearch at %v, err: %v", host, err) + } + + return &msDatabase{ + client: client, + host: host, + logTime: cfg.LogOperationsTime, + readRowsLogger: cfg.ReadRowsLogger, + }, nil +} + +func (c *msConnector) DialectName(scheme string) (db.DialectName, error) { + return db.MEILISEARCH, nil +} + +// parseMeilisearchURI parses a Meilisearch connection URI. +// Supported formats: +// - ms://host:port?apiKey=KEY +// - meilisearch://host:port?apiKey=KEY +func parseMeilisearchURI(cs string) (host string, apiKey string, err error) { + u, err := url.Parse(cs) + if err != nil { + return "", "", fmt.Errorf("cannot parse connection url %v, err: %v", cs, err) + } + + scheme := "http" + if u.Query().Get("tls") == "true" { + scheme = "https" + } + + host = fmt.Sprintf("%s://%s", scheme, u.Host) + apiKey = u.Query().Get("apiKey") + + return host, apiKey, nil +} + +type msGateway struct { + client meilisearch.ServiceManager + ctx *db.Context + logTime bool + + readRowsLogger db.Logger +} + +type msSession struct { + msGateway +} + +// @cpt-perfkit-db-algo-ms-adapter-passthrough-tx +func (s *msSession) Transact(fn func(tx db.DatabaseAccessor) error) error { + return fn(s) +} + +type msDatabase struct { + client meilisearch.ServiceManager + host string + logTime bool + readRowsLogger db.Logger +} + +// Ping pings the Meilisearch server +func (d *msDatabase) Ping(ctx context.Context) error { + if _, err := d.client.Health(); err != nil { + return fmt.Errorf("db: failed ping meilisearch, err: %v", err) + } + return nil +} + +func (d *msDatabase) DialectName() db.DialectName { + return db.MEILISEARCH +} + +func (d *msDatabase) UseTruncate() bool { + return false +} + +func (d *msDatabase) GetVersion() (db.DialectName, string, error) { + v, err := d.client.Version() + if err != nil { + return db.MEILISEARCH, "", fmt.Errorf("db: meilisearch: failed to get version: %v", err) + } + return db.MEILISEARCH, v.PkgVersion, nil +} + +func (d *msDatabase) GetInfo(version string) (ret []string, dbInfo *db.Info, err error) { + ret = append(ret, "Meilisearch") + return ret, nil, nil +} + +func (d *msDatabase) ApplyMigrations(tableName, tableMigrationSQL string) error { + return nil +} + +func (d *msDatabase) IndexExists(indexName string, tableName string) (bool, error) { + return true, nil +} + +func (d *msDatabase) CreateIndex(indexName string, tableName string, columns []string, indexType db.IndexType) error { + return nil +} + +func (d *msDatabase) DropIndex(indexName string, tableName string) error { + return nil +} + +func (d *msDatabase) ReadConstraints() ([]db.Constraint, error) { + return nil, nil +} + +func (d *msDatabase) AddConstraints(constraints []db.Constraint) error { + return nil +} + +func (d *msDatabase) DropConstraints(constraints []db.Constraint) error { + return nil +} + +func (d *msDatabase) CreateSequence(sequenceName string) error { + return nil +} + +func (d *msDatabase) DropSequence(sequenceName string) error { + return nil +} + +func (d *msDatabase) GetTablesSchemaInfo(tableNames []string) ([]string, error) { + return nil, nil +} + +func (d *msDatabase) GetTablesVolumeInfo(tableNames []string) ([]string, error) { + return nil, nil +} + +func (d *msDatabase) Context(ctx context.Context, explain bool) *db.Context { + return &db.Context{ + Ctx: ctx, + Explain: explain, + BeginTime: atomic.NewInt64(0), + PrepareTime: atomic.NewInt64(0), + ExecTime: atomic.NewInt64(0), + QueryTime: atomic.NewInt64(0), + DeallocTime: atomic.NewInt64(0), + CommitTime: atomic.NewInt64(0), + } +} + +func (d *msDatabase) Session(c *db.Context) db.Session { + return &msSession{ + msGateway: msGateway{ + client: d.client, + ctx: c, + logTime: d.logTime, + readRowsLogger: d.readRowsLogger, + }, + } +} + +func (d *msDatabase) RawSession() interface{} { + return d.client +} + +func (d *msDatabase) Stats() *db.Stats { + return nil +} + +func (d *msDatabase) Close() error { + return nil +} diff --git a/db/ms/ms_test.go b/db/ms/ms_test.go new file mode 100644 index 00000000..3f596ae9 --- /dev/null +++ b/db/ms/ms_test.go @@ -0,0 +1,151 @@ +package ms + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/acronis/perfkit/db" +) + +const msConnString = "ms://localhost:7700" + +type TestingSuite struct { + suite.Suite + ConnString string +} + +func TestDatabaseSuiteMeilisearch(t *testing.T) { + suite.Run(t, &TestingSuite{ConnString: msConnString}) +} + +type testLogger struct { + t *testing.T +} + +func newTestLogger(t *testing.T) db.Logger { + return &testLogger{t: t} +} + +func (l *testLogger) Log(format string, args ...interface{}) { + l.t.Logf(format, args...) +} + +func testTableDefinition() *db.TableDefinition { + return &db.TableDefinition{ + TableRows: []db.TableRow{ + db.TableRowItem{Name: "id", Type: db.DataTypeId, PrimaryKey: true, Indexed: true}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, Indexed: true}, + db.TableRowItem{Name: "type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "policy_name", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "resource_name", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "score", Type: db.DataTypeInt, Indexed: true}, + }, + } +} + +func (suite *TestingSuite) makeTestSession() (db.Database, db.Session, *db.Context) { + var logger = newTestLogger(suite.T()) + + dbo, err := db.Open(db.Config{ + ConnString: suite.ConnString, + MaxOpenConns: 16, + MaxConnLifetime: 1000 * time.Millisecond, + QueryLogger: logger, + ReadRowsLogger: logger, + LogOperationsTime: true, + }) + + require.NoError(suite.T(), err, "making test msSession") + + var tableSpec = testTableDefinition() + + // Clean up if left over from a previous run + if exists, _ := dbo.TableExists("ms_perf_table"); exists { + _ = dbo.DropTable("ms_perf_table") + } + + if err = dbo.CreateTable("ms_perf_table", tableSpec, ""); err != nil { + require.NoError(suite.T(), err, "init scheme") + } + + var c = dbo.Context(context.Background(), false) + s := dbo.Session(c) + + return dbo, s, c +} + +func logDbTime(t *testing.T, c *db.Context) { + t.Helper() + db.DumpExecutionTime(newTestLogger(t), c) +} + +func cleanup(t *testing.T, dbo db.Database) { + t.Helper() + + exists, err := dbo.TableExists("ms_perf_table") + if err != nil { + t.Error("check table exists", err) + return + } + + if !exists { + return + } + + if err := dbo.DropTable("ms_perf_table"); err != nil { + t.Error("drop table", err) + return + } +} + +func TestParseMeilisearchURI(t *testing.T) { + tests := []struct { + name string + uri string + wantHost string + wantAPIKey string + wantErr bool + }{ + { + name: "basic uri", + uri: "ms://localhost:7700", + wantHost: "http://localhost:7700", + wantAPIKey: "", + }, + { + name: "with api key", + uri: "ms://localhost:7700?apiKey=masterKey123", + wantHost: "http://localhost:7700", + wantAPIKey: "masterKey123", + }, + { + name: "with tls", + uri: "meilisearch://meili.example.com:443?tls=true&apiKey=secret", + wantHost: "https://meili.example.com:443", + wantAPIKey: "secret", + }, + { + name: "meilisearch scheme", + uri: "meilisearch://127.0.0.1:7700", + wantHost: "http://127.0.0.1:7700", + wantAPIKey: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + host, apiKey, err := parseMeilisearchURI(tt.uri) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.wantHost, host) + require.Equal(t, tt.wantAPIKey, apiKey) + } + }) + } +} diff --git a/db/ms/query.go b/db/ms/query.go new file mode 100644 index 00000000..f090952b --- /dev/null +++ b/db/ms/query.go @@ -0,0 +1,17 @@ +package ms + +import "github.com/acronis/perfkit/db" + +// @cpt-perfkit-db-dod-ms-adapter-unsupported-ops + +func (g *msGateway) Exec(format string, args ...interface{}) (db.Result, error) { + return nil, nil +} + +func (g *msGateway) QueryRow(format string, args ...interface{}) db.Row { + return &db.EmptyRows{} +} + +func (g *msGateway) Query(format string, args ...interface{}) (db.Rows, error) { + return &db.EmptyRows{}, nil +} diff --git a/db/ms/rows.go b/db/ms/rows.go new file mode 100644 index 00000000..58e3a911 --- /dev/null +++ b/db/ms/rows.go @@ -0,0 +1,195 @@ +package ms + +import ( + "encoding/json" + "fmt" + "reflect" + "time" + + "github.com/acronis/perfkit/db" +) + +// msRows stores Meilisearch search results and provides db.Rows interface +type msRows struct { + data []map[string]interface{} + idx int + + requestedColumns []string +} + +// Next advances the cursor to the next row +func (r *msRows) Next() bool { + if r.idx < len(r.data) { + r.idx++ + return true + } + return false +} + +// Err returns any error encountered during iteration +func (r *msRows) Err() error { + return nil +} + +// Scan copies the columns in the current row into the values pointed at by dest +func (r *msRows) Scan(dest ...interface{}) error { + if len(dest) != len(r.requestedColumns) { + return fmt.Errorf("number of columns in the result set does not match the number of destination fields") + } + + row := r.data[r.idx-1] + + for i := range dest { + dv := reflect.ValueOf(dest[i]) + if dv.Kind() != reflect.Ptr { + return fmt.Errorf("internal error: msRows.Scan() - non-pointer passed to Scan: %v", dest) + } + + fieldName := r.requestedColumns[i] + val := row[fieldName] + + if val == nil { + continue + } + + switch d := dest[i].(type) { + case *string: + strVal, ok := val.(string) + if !ok { + return fmt.Errorf("%s: not equal type in struct 'string', in map '%T'", fieldName, val) + } + *d = strVal + case *int64: + switch numberType := val.(type) { + case json.Number: + var err error + *d, err = numberType.Int64() + if err != nil { + return fmt.Errorf("%s: failed to cast jsonNumber to int64 '%T': %v", fieldName, numberType, err) + } + case float64: + *d = int64(numberType) + default: + return fmt.Errorf("%s: not equal type, in map '%T'", fieldName, val) + } + case *bool: + boolVal, ok := val.(bool) + if !ok { + return fmt.Errorf("%s: not equal type in struct 'bool', in map '%T'", fieldName, val) + } + *d = boolVal + case *[]byte: + strVal, ok := val.(string) + if !ok { + return fmt.Errorf("%s: not equal type in struct '[]byte', in map '%T'", fieldName, val) + } + *d = []byte(strVal) + case *[]string: + switch v := val.(type) { + case string: + *d = []string{v} + case []interface{}: + strSlc := make([]string, len(v)) + for j, elem := range v { + strSlc[j] = fmt.Sprint(elem) + } + *d = strSlc + default: + return fmt.Errorf("%s: not equal type in struct '[]string', in map '%T'", fieldName, val) + } + case *time.Time: + strVal, ok := val.(string) + if !ok { + return fmt.Errorf("%s: not equal type in struct 'time.Time', in map '%T'", fieldName, val) + } + var err error + *d, err = time.Parse(time.RFC3339Nano, strVal) + if err != nil { + return fmt.Errorf("%s: failed to cast string to time.Time '%T': %v", fieldName, strVal, err) + } + case *[]float32: + sliceVal, ok := val.([]interface{}) + if !ok { + return fmt.Errorf("%s: not equal type in struct '[]float32', in map '%T'", fieldName, val) + } + result := make([]float32, 0, len(sliceVal)) + for _, elem := range sliceVal { + switch n := elem.(type) { + case json.Number: + f64, err := n.Float64() + if err != nil { + return fmt.Errorf("%s: failed to cast jsonNumber to float64 '%T': %v", fieldName, n, err) + } + result = append(result, float32(f64)) + case float64: + result = append(result, float32(n)) + default: + return fmt.Errorf("%s: unsupported element type in vector '%T'", fieldName, elem) + } + } + *d = result + default: + return fmt.Errorf("unsupported type to convert (type=%T)", d) + } + } + + return nil +} + +// Close closes the rows iterator +func (r *msRows) Close() error { + return nil +} + +// wrappedRows wraps msRows with logging support +type wrappedRows struct { + rows *msRows + + logTime bool + readRowsLogger db.Logger + printed int +} + +const maxRowsToPrint = 10 + +// Next advances the cursor to the next row +func (r *wrappedRows) Next() bool { + return r.rows.Next() +} + +// Err returns any error that was encountered during iteration +func (r *wrappedRows) Err() error { + return r.rows.Err() +} + +// Scan copies columns and logs the scanned values +func (r *wrappedRows) Scan(dest ...interface{}) error { + since := time.Now() + err := r.rows.Scan(dest...) + + if r.readRowsLogger != nil { + if r.printed >= maxRowsToPrint { + return err + } else if r.printed == maxRowsToPrint { + r.readRowsLogger.Log("... truncated ...") + r.printed++ + return err + } + + values := db.DumpRecursive(dest, " ") + if r.logTime { + dur := time.Since(since) + r.readRowsLogger.Log("Row: %s -- %s", values, fmt.Sprintf("parse duration: %v", dur)) + } else { + r.readRowsLogger.Log("Row: %s", values) + } + r.printed++ + } + + return err +} + +// Close closes the rows iterator +func (r *wrappedRows) Close() error { + return r.rows.Close() +} diff --git a/db/ms/rows_test.go b/db/ms/rows_test.go new file mode 100644 index 00000000..ab6721b4 --- /dev/null +++ b/db/ms/rows_test.go @@ -0,0 +1,261 @@ +package ms + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMsRowsScanString(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"name": "alice"}, + }, + requestedColumns: []string{"name"}, + } + + require.True(t, rows.Next()) + + var name string + require.NoError(t, rows.Scan(&name)) + assert.Equal(t, "alice", name) + + assert.False(t, rows.Next()) + assert.NoError(t, rows.Close()) +} + +func TestMsRowsScanInt64FromFloat64(t *testing.T) { + // JSON unmarshals numbers as float64 by default + rows := &msRows{ + data: []map[string]interface{}{ + {"id": float64(42)}, + }, + requestedColumns: []string{"id"}, + } + + require.True(t, rows.Next()) + + var id int64 + require.NoError(t, rows.Scan(&id)) + assert.Equal(t, int64(42), id) +} + +func TestMsRowsScanInt64FromJsonNumber(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"id": json.Number("99")}, + }, + requestedColumns: []string{"id"}, + } + + require.True(t, rows.Next()) + + var id int64 + require.NoError(t, rows.Scan(&id)) + assert.Equal(t, int64(99), id) +} + +func TestMsRowsScanBool(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"active": true}, + }, + requestedColumns: []string{"active"}, + } + + require.True(t, rows.Next()) + + var active bool + require.NoError(t, rows.Scan(&active)) + assert.True(t, active) +} + +func TestMsRowsScanBytes(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"data": "binary_content"}, + }, + requestedColumns: []string{"data"}, + } + + require.True(t, rows.Next()) + + var data []byte + require.NoError(t, rows.Scan(&data)) + assert.Equal(t, []byte("binary_content"), data) +} + +func TestMsRowsScanStringSlice(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"tags": []interface{}{"tag1", "tag2", "tag3"}}, + }, + requestedColumns: []string{"tags"}, + } + + require.True(t, rows.Next()) + + var tags []string + require.NoError(t, rows.Scan(&tags)) + assert.Equal(t, []string{"tag1", "tag2", "tag3"}, tags) +} + +func TestMsRowsScanStringSliceFromString(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"tags": "single_tag"}, + }, + requestedColumns: []string{"tags"}, + } + + require.True(t, rows.Next()) + + var tags []string + require.NoError(t, rows.Scan(&tags)) + assert.Equal(t, []string{"single_tag"}, tags) +} + +func TestMsRowsScanTime(t *testing.T) { + now := time.Now().UTC().Truncate(time.Nanosecond) + timeStr := now.Format(time.RFC3339Nano) + + rows := &msRows{ + data: []map[string]interface{}{ + {"created_at": timeStr}, + }, + requestedColumns: []string{"created_at"}, + } + + require.True(t, rows.Next()) + + var ts time.Time + require.NoError(t, rows.Scan(&ts)) + assert.Equal(t, timeStr, ts.Format(time.RFC3339Nano)) +} + +func TestMsRowsScanNilValue(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"name": nil}, + }, + requestedColumns: []string{"name"}, + } + + require.True(t, rows.Next()) + + var name string + require.NoError(t, rows.Scan(&name)) + assert.Equal(t, "", name) // stays zero value +} + +func TestMsRowsScanMultipleColumns(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"id": float64(1), "name": "alice", "active": true}, + {"id": float64(2), "name": "bob", "active": false}, + }, + requestedColumns: []string{"id", "name", "active"}, + } + + require.True(t, rows.Next()) + var id int64 + var name string + var active bool + require.NoError(t, rows.Scan(&id, &name, &active)) + assert.Equal(t, int64(1), id) + assert.Equal(t, "alice", name) + assert.True(t, active) + + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&id, &name, &active)) + assert.Equal(t, int64(2), id) + assert.Equal(t, "bob", name) + assert.False(t, active) + + assert.False(t, rows.Next()) +} + +func TestMsRowsScanColumnCountMismatch(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"id": float64(1), "name": "alice"}, + }, + requestedColumns: []string{"id", "name"}, + } + + require.True(t, rows.Next()) + + var id int64 + err := rows.Scan(&id) // only 1 dest for 2 columns + require.Error(t, err) + assert.Contains(t, err.Error(), "does not match") +} + +func TestMsRowsScanNonPointer(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"name": "alice"}, + }, + requestedColumns: []string{"name"}, + } + + require.True(t, rows.Next()) + + err := rows.Scan("not_a_pointer") // non-pointer + require.Error(t, err) + assert.Contains(t, err.Error(), "non-pointer") +} + +func TestMsRowsScanTypeMismatch(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"name": float64(42)}, // float64, not string + }, + requestedColumns: []string{"name"}, + } + + require.True(t, rows.Next()) + + var name string + err := rows.Scan(&name) + require.Error(t, err) + assert.Contains(t, err.Error(), "not equal type") +} + +func TestMsRowsScanFloat32Slice(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"embedding": []interface{}{float64(0.5), float64(10), float64(6)}}, + }, + requestedColumns: []string{"embedding"}, + } + + require.True(t, rows.Next()) + + var embedding []float32 + require.NoError(t, rows.Scan(&embedding)) + assert.Equal(t, []float32{0.5, 10, 6}, embedding) +} + +func TestMsRowsScanFloat32SliceFromJsonNumber(t *testing.T) { + rows := &msRows{ + data: []map[string]interface{}{ + {"embedding": []interface{}{json.Number("1.5"), json.Number("2.5"), json.Number("3.5")}}, + }, + requestedColumns: []string{"embedding"}, + } + + require.True(t, rows.Next()) + + var embedding []float32 + require.NoError(t, rows.Scan(&embedding)) + assert.Equal(t, []float32{1.5, 2.5, 3.5}, embedding) +} + +func TestMsRowsErr(t *testing.T) { + rows := &msRows{} + assert.NoError(t, rows.Err()) +} diff --git a/db/ms/select.go b/db/ms/select.go new file mode 100644 index 00000000..836bd1f5 --- /dev/null +++ b/db/ms/select.go @@ -0,0 +1,273 @@ +package ms + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/meilisearch/meilisearch-go" + + "github.com/acronis/perfkit/db" +) + +// @cpt-perfkit-db-algo-ms-adapter-build-filter +func buildMeilisearchFilter(where map[string][]string) (string, error) { + if len(where) == 0 { + return "", nil + } + + var parts []string + + for _, c := range db.SortFields(where) { + if c.Col == "" { + return "", fmt.Errorf("empty condition field") + } + + for _, v := range c.Vals { + if v == db.SpecialConditionIsNull { + parts = append(parts, fmt.Sprintf("%s IS NULL", c.Col)) + continue + } + if v == db.SpecialConditionIsNotNull { + parts = append(parts, fmt.Sprintf("%s IS NOT NULL", c.Col)) + continue + } + + fnc, val, err := db.ParseFunc(v) + if err != nil { + return "", fmt.Errorf("%v on field '%v'", err, c.Col) + } + + switch fnc { + case "": + parts = append(parts, fmt.Sprintf("%s = %s", c.Col, quoteFilterValue(val))) + case "lt": + parts = append(parts, fmt.Sprintf("%s < %s", c.Col, quoteFilterValue(val))) + case "le": + parts = append(parts, fmt.Sprintf("%s <= %s", c.Col, quoteFilterValue(val))) + case "gt": + parts = append(parts, fmt.Sprintf("%s > %s", c.Col, quoteFilterValue(val))) + case "ge": + parts = append(parts, fmt.Sprintf("%s >= %s", c.Col, quoteFilterValue(val))) + case "ne": + parts = append(parts, fmt.Sprintf("%s != %s", c.Col, quoteFilterValue(val))) + case "like", "hlike", "tlike": + return "", fmt.Errorf("like functions are not supported by Meilisearch on field '%v'", c.Col) + default: + return "", fmt.Errorf("unsupported function '%v' on field '%v'", fnc, c.Col) + } + } + } + + return strings.Join(parts, " AND "), nil +} + +// quoteFilterValue returns the value quoted for Meilisearch filter expressions. +// Numeric values are left unquoted; string values are single-quoted. +func quoteFilterValue(val string) string { + // Try to detect numeric values — if it parses as a number, leave unquoted + dotCount := 0 + for i, ch := range val { + if ch == '-' || ch == '+' { + if i != 0 { + return fmt.Sprintf("'%s'", strings.ReplaceAll(val, "'", "\\'")) + } + continue + } + if ch == '.' { + dotCount++ + if dotCount > 1 { + return fmt.Sprintf("'%s'", strings.ReplaceAll(val, "'", "\\'")) + } + continue + } + if ch < '0' || ch > '9' { + return fmt.Sprintf("'%s'", strings.ReplaceAll(val, "'", "\\'")) + } + } + return val +} + +type vectorSearch struct { + field string + vector []float32 +} + +func buildSortParams(order []string) ([]string, *vectorSearch, error) { + var sortParams []string + var vs *vectorSearch + + for _, value := range order { + fnc, args, err := db.ParseFuncMultipleArgs(value, ";") + if err != nil { + return nil, nil, fmt.Errorf("failed to parse order function: %v", err) + } + + if len(args) == 0 { + return nil, nil, fmt.Errorf("empty order field") + } + + switch fnc { + case "asc": + if len(args) != 1 { + return nil, nil, fmt.Errorf("asc expects 1 argument, got %d", len(args)) + } + sortParams = append(sortParams, fmt.Sprintf("%s:asc", args[0])) + case "desc": + if len(args) != 1 { + return nil, nil, fmt.Errorf("desc expects 1 argument, got %d", len(args)) + } + sortParams = append(sortParams, fmt.Sprintf("%s:desc", args[0])) + case "nearest": + if len(args) != 3 { + return nil, nil, fmt.Errorf("nearest expects 3 arguments (field;metric;vector), got %d", len(args)) + } + if vs != nil { + return nil, nil, fmt.Errorf("only one nearest() function is allowed") + } + + rawVector, err := db.ParseVector(args[2], ",") + if err != nil { + return nil, nil, fmt.Errorf("failed to parse vector: %v", err) + } + + vector := make([]float32, 0, len(rawVector)) + for _, v := range rawVector { + f, err := strconv.ParseFloat(strings.TrimSpace(v), 32) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse vector value '%s': %v", v, err) + } + vector = append(vector, float32(f)) + } + + vs = &vectorSearch{field: args[0], vector: vector} + default: + return nil, nil, fmt.Errorf("unsupported order function '%v'", fnc) + } + } + + if len(sortParams) != 0 && vs != nil { + return nil, nil, fmt.Errorf("sort and nearest() are mutually exclusive") + } + + return sortParams, vs, nil +} + +// @cpt-perfkit-db-algo-ms-adapter-select +func (g *msGateway) Select(tableName string, sc *db.SelectCtrl) (db.Rows, error) { + if sc == nil { + return &db.EmptyRows{}, nil + } + + // Handle COUNT query + if len(sc.Fields) == 1 && sc.Fields[0] == "COUNT(0)" { + return g.selectCount(tableName, sc) + } + + filter, err := buildMeilisearchFilter(sc.Where) + if err != nil { + return nil, fmt.Errorf("failed to build filter: %v", err) + } + + sortParams, vs, err := buildSortParams(sc.Order) + if err != nil { + return nil, fmt.Errorf("failed to build sort: %v", err) + } + + var limit int64 = 20 + if sc.Page.Limit > 0 { + limit = sc.Page.Limit + } + + searchReq := &meilisearch.SearchRequest{ + Filter: filter, + Sort: sortParams, + Limit: limit, + Offset: sc.Page.Offset, + AttributesToRetrieve: sc.Fields, + } + + if vs != nil { + searchReq.Vector = vs.vector + searchReq.Hybrid = &meilisearch.SearchRequestHybrid{ + Embedder: vs.field, + } + searchReq.RetrieveVectors = true + } + + index := g.client.Index(tableName) + + resp, err := index.Search("", searchReq) + if err != nil { + return nil, fmt.Errorf("failed to search meilisearch index %s: %v", tableName, err) + } + + if resp.Hits.Len() == 0 { + return &db.EmptyRows{}, nil + } + + var data []map[string]interface{} + for _, hit := range resp.Hits { + m := make(map[string]interface{}) + for k, raw := range hit { + var v interface{} + if err := json.Unmarshal(raw, &v); err != nil { + m[k] = string(raw) + } else { + m[k] = v + } + } + + // Extract vectors from _vectors field if present + if vectorsRaw, ok := m["_vectors"]; ok { + if vectorsMap, ok := vectorsRaw.(map[string]interface{}); ok { + for embedderName, embedderData := range vectorsMap { + if embedderObj, ok := embedderData.(map[string]interface{}); ok { + if embeddings, ok := embedderObj["embeddings"]; ok { + if embList, ok := embeddings.([]interface{}); ok && len(embList) > 0 { + m[embedderName] = embList[0] + } + } + } + } + } + delete(m, "_vectors") + } + + data = append(data, m) + } + + rows := &msRows{data: data, requestedColumns: sc.Fields} + + if g.readRowsLogger != nil { + return &wrappedRows{ + rows: rows, + logTime: g.logTime, + readRowsLogger: g.readRowsLogger, + }, nil + } + + return rows, nil +} + +func (g *msGateway) selectCount(tableName string, sc *db.SelectCtrl) (db.Rows, error) { + filter, err := buildMeilisearchFilter(sc.Where) + if err != nil { + return nil, fmt.Errorf("failed to build filter for count: %v", err) + } + + searchReq := &meilisearch.SearchRequest{ + Filter: filter, + Limit: 0, + } + + index := g.client.Index(tableName) + + resp, err := index.Search("", searchReq) + if err != nil { + return nil, fmt.Errorf("failed to count meilisearch index %s: %v", tableName, err) + } + + return &db.CountRows{Count: resp.EstimatedTotalHits}, nil +} diff --git a/db/ms/select_test.go b/db/ms/select_test.go new file mode 100644 index 00000000..94c948a2 --- /dev/null +++ b/db/ms/select_test.go @@ -0,0 +1,204 @@ +package ms + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/acronis/perfkit/db" +) + +func TestQuoteFilterValue(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + {"numeric integer", "42", "42"}, + {"numeric negative", "-5", "-5"}, + {"numeric float", "3.14", "3.14"}, + {"string value", "hello", "'hello'"}, + {"string with spaces", "hello world", "'hello world'"}, + {"uuid", "00000000-0000-0000-0000-000000000001", "'00000000-0000-0000-0000-000000000001'"}, + {"string with single quote", "it's", "'it\\'s'"}, + {"empty string", "", ""}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := quoteFilterValue(tt.input) + assert.Equal(t, tt.expected, got) + }) + } +} + +func TestBuildMeilisearchFilter(t *testing.T) { + tests := []struct { + name string + where map[string][]string + expected string + expectError bool + errContains string + }{ + { + name: "nil where", + where: nil, + expected: "", + }, + { + name: "empty where", + where: map[string][]string{}, + expected: "", + }, + { + name: "single equality", + where: map[string][]string{"status": {"active"}}, + expected: "status = 'active'", + }, + { + name: "numeric equality", + where: map[string][]string{"id": {"42"}}, + expected: "id = 42", + }, + { + name: "less than", + where: map[string][]string{"age": {"lt(30)"}}, + expected: "age < 30", + }, + { + name: "greater than or equal", + where: map[string][]string{"score": {"ge(100)"}}, + expected: "score >= 100", + }, + { + name: "less than or equal", + where: map[string][]string{"score": {"le(50)"}}, + expected: "score <= 50", + }, + { + name: "not equal", + where: map[string][]string{"type": {"ne(deleted)"}}, + expected: "type != 'deleted'", + }, + { + name: "is null", + where: map[string][]string{"email": {db.SpecialConditionIsNull}}, + expected: "email IS NULL", + }, + { + name: "is not null", + where: map[string][]string{"email": {db.SpecialConditionIsNotNull}}, + expected: "email IS NOT NULL", + }, + { + name: "multiple conditions on same field", + where: map[string][]string{"age": {"gt(18)", "lt(65)"}}, + expected: "age > 18 AND age < 65", + }, + { + name: "like unsupported", + where: map[string][]string{"name": {"like(john)"}}, + expectError: true, + errContains: "like functions are not supported", + }, + { + name: "hlike unsupported", + where: map[string][]string{"name": {"hlike(john)"}}, + expectError: true, + errContains: "like functions are not supported", + }, + { + name: "empty field name", + where: map[string][]string{"": {"value"}}, + expectError: true, + errContains: "empty condition field", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := buildMeilisearchFilter(tt.where) + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expected, got) + } + }) + } +} + +func TestBuildSortParams(t *testing.T) { + tests := []struct { + name string + order []string + expectedSort []string + expectVector bool + vectorField string + vectorValues []float32 + expectError bool + errContains string + }{ + { + name: "empty order", + order: nil, + expectedSort: nil, + }, + { + name: "asc single", + order: []string{"asc(created_at)"}, + expectedSort: []string{"created_at:asc"}, + }, + { + name: "desc single", + order: []string{"desc(score)"}, + expectedSort: []string{"score:desc"}, + }, + { + name: "multiple sort fields", + order: []string{"asc(name)", "desc(id)"}, + expectedSort: []string{"name:asc", "id:desc"}, + }, + { + name: "nearest vector search", + order: []string{"nearest(embedding;L2;[1,2,3])"}, + expectVector: true, + vectorField: "embedding", + vectorValues: []float32{1, 2, 3}, + }, + { + name: "nearest with wrong arg count", + order: []string{"nearest(embedding;L2)"}, + expectError: true, + errContains: "nearest expects 3 arguments", + }, + { + name: "sort and nearest mutually exclusive", + order: []string{"asc(id)", "nearest(embedding;L2;[1,2,3])"}, + expectError: true, + errContains: "mutually exclusive", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, vs, err := buildSortParams(tt.order) + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + } else { + require.NoError(t, err) + if tt.expectVector { + require.NotNil(t, vs) + assert.Equal(t, tt.vectorField, vs.field) + assert.Equal(t, tt.vectorValues, vs.vector) + } else { + assert.Nil(t, vs) + assert.Equal(t, tt.expectedSort, got) + } + } + }) + } +} diff --git a/db/ms/sequence.go b/db/ms/sequence.go new file mode 100644 index 00000000..e48632d6 --- /dev/null +++ b/db/ms/sequence.go @@ -0,0 +1,7 @@ +package ms + +// GetNextVal returns the next value from a sequence. +// Meilisearch does not support sequences; this is a no-op stub. +func (s *msSession) GetNextVal(sequenceName string) (uint64, error) { + return 0, nil +} diff --git a/db/ms/stmt.go b/db/ms/stmt.go new file mode 100644 index 00000000..394b70bc --- /dev/null +++ b/db/ms/stmt.go @@ -0,0 +1,7 @@ +package ms + +import "github.com/acronis/perfkit/db" + +func (g *msGateway) Prepare(query string) (db.Stmt, error) { + return nil, nil +} diff --git a/db/ms/update.go b/db/ms/update.go new file mode 100644 index 00000000..140d923c --- /dev/null +++ b/db/ms/update.go @@ -0,0 +1,8 @@ +package ms + +import "github.com/acronis/perfkit/db" + +// Update is a no-op implementation for Meilisearch. +func (g *msGateway) Update(tableName string, c *db.UpdateCtrl) (int64, error) { + return 0, nil +} diff --git a/db/ms/vector_test.go b/db/ms/vector_test.go new file mode 100644 index 00000000..4f503306 --- /dev/null +++ b/db/ms/vector_test.go @@ -0,0 +1,105 @@ +package ms + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/acronis/perfkit/db" +) + +func testVectorTableDefinition() *db.TableDefinition { + return &db.TableDefinition{ + TableRows: []db.TableRow{ + db.TableRowItem{Name: "id", Type: db.DataTypeInt, PrimaryKey: true, Indexed: true}, + db.TableRowItem{Name: "embedding", Type: db.DataTypeVector3Float32, Indexed: true}, + db.TableRowItem{Name: "text", Type: db.DataTypeVarChar, Indexed: true}, + }, + } +} + +func (suite *TestingSuite) makeVectorTestSession() (db.Database, db.Session, *db.Context) { + var logger = newTestLogger(suite.T()) + + dbo, err := db.Open(db.Config{ + ConnString: suite.ConnString, + MaxOpenConns: 16, + MaxConnLifetime: 1000 * time.Millisecond, + QueryLogger: logger, + ReadRowsLogger: logger, + LogOperationsTime: true, + }) + + require.NoError(suite.T(), err, "making test msSession") + + var tableSpec = testVectorTableDefinition() + + // Clean up if left over from a previous run + if exists, _ := dbo.TableExists("ms_vector_table"); exists { + _ = dbo.DropTable("ms_vector_table") + } + + if err = dbo.CreateTable("ms_vector_table", tableSpec, ""); err != nil { + require.NoError(suite.T(), err, "init vector scheme") + } + + var c = dbo.Context(context.Background(), false) + s := dbo.Session(c) + + return dbo, s, c +} + +func vectorCleanup(t *testing.T, dbo db.Database) { + t.Helper() + + if err := dbo.DropTable("ms_vector_table"); err != nil { + t.Error("drop vector table", err) + } +} + +func (suite *TestingSuite) TestVectorSearch() { + d, s, c := suite.makeVectorTestSession() + defer logDbTime(suite.T(), c) + defer vectorCleanup(suite.T(), d) + + if err := s.BulkInsert("ms_vector_table", [][]interface{}{ + {int64(1), "text1", []float32{0.5, 10, 6}}, + {int64(2), "text2", []float32{-0.5, 10, 10}}, + }, []string{"id", "text", "embedding"}); err != nil { + suite.T().Error(err) + return + } + + // Wait for async indexing + time.Sleep(3 * time.Second) + + rows, err := s.Select("ms_vector_table", + &db.SelectCtrl{ + Fields: []string{"text", "embedding"}, + Order: []string{"nearest(embedding;L2;[3,1,2])"}, + }) + if err != nil { + suite.T().Error(err) + return + } + defer rows.Close() + + var count int + for rows.Next() { + var text string + var embedding []float32 + if scanErr := rows.Scan(&text, &embedding); scanErr != nil { + suite.T().Error(scanErr) + return + } + suite.T().Log("row", text, fmt.Sprintf("%v", embedding)) + suite.NotEmpty(text) + suite.Len(embedding, 3) + count++ + } + + suite.Equal(2, count, "expected 2 rows from vector search") +} diff --git a/db/testing/docker/meilisearch/docker-compose.yaml b/db/testing/docker/meilisearch/docker-compose.yaml new file mode 100644 index 00000000..e5c750e3 --- /dev/null +++ b/db/testing/docker/meilisearch/docker-compose.yaml @@ -0,0 +1,24 @@ +version: '3.4' + +x-logging: + &logging + driver: gelf + options: + gelf-address: udp://${GELF_IP}:${GELF_PORT} + +services: + meilisearch: + image: getmeili/meilisearch:v1.12 + environment: + - MEILI_ENV=development + - MEILI_NO_ANALYTICS=true + - MEILI_LOG_LEVEL=INFO + - MEILI_EXPERIMENTAL_VECTOR_STORE=true + logging: *logging + volumes: + - meili-data:/meili_data + ports: + - 7700:7700 + +volumes: + meili-data: diff --git a/go.work.sum b/go.work.sum index 6c93f46b..e6b9cd79 100644 --- a/go.work.sum +++ b/go.work.sum @@ -441,6 +441,7 @@ github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8w github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/substrait-io/substrait-go v1.1.0 h1:wUXoXV/ESMXgaOWu/05kvMI/UBUyhtaTRfkT5p1b5Ck= github.com/substrait-io/substrait-go v1.1.0/go.mod h1:LHzL5E0VL620yw4kBQCP+sQPmxhepPTQMDJQRbOe/T4= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI= @@ -530,8 +531,6 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4= golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=