From 5ae61f3f891efd769e685ff9a160d3d047292199 Mon Sep 17 00:00:00 2001 From: Siddharth Kothari Date: Thu, 11 Jun 2026 21:11:34 +0530 Subject: [PATCH 1/2] feat: add Elasticsearch Serverless support for meta indices and API key auth Detect serverless clusters via build_flavor and remap dot-prefixed meta indices to rs_* names, since Serverless rejects user-created hidden indices. Add ES_API_KEY upstream auth, strip unsupported index settings on create, and use _cat/indices for index size where _stats is unavailable. Co-authored-by: Cursor --- Dockerfile | 2 +- docs/env-vars.md | 4 +- model/permission/permission.go | 2 +- model/reindex/dao.go | 69 ++++++++- model/reindex/util.go | 62 ++++++++- model/reindex/util_test.go | 58 ++++++++ plugins/analytics/analytics.go | 11 ++ plugins/analytics/dao.go | 14 +- plugins/analytics/util.go | 4 +- plugins/applycache/applycache.go | 3 +- plugins/applycache/dao.go | 2 +- plugins/auth/auth.go | 3 + plugins/auth/dao.go | 3 +- plugins/auth/handlers.go | 1 + plugins/auth/middleware.go | 2 + plugins/cache/cache.go | 1 + plugins/cache/dao.go | 2 +- plugins/logs/dao.go | 3 +- plugins/logs/logs.go | 2 + plugins/nodes/dao.go | 2 +- plugins/nodes/nodes.go | 3 +- plugins/openai/dao.go | 6 +- plugins/openai/openai.go | 7 +- plugins/permissions/dao.go | 2 +- plugins/permissions/permissions.go | 1 + plugins/pipelines/analytics.go | 2 +- plugins/pipelines/dao.go | 8 +- plugins/pipelines/es_stage.go | 1 + plugins/pipelines/logs.go | 2 +- plugins/pipelines/middleware.go | 2 +- plugins/pipelines/pipelines.go | 17 ++- plugins/reindexer/dao.go | 24 +--- plugins/rules/dao.go | 2 +- plugins/rules/middleware.go | 2 +- plugins/rules/rules.go | 1 + plugins/rules/util.go | 1 + plugins/searchgrader/dao.go | 2 +- plugins/searchgrader/middleware.go | 2 +- plugins/searchgrader/searchgrader.go | 2 + plugins/searchrelevancy/dao.go | 2 +- plugins/searchrelevancy/searchrelevancy.go | 1 + plugins/storedquery/dao.go | 2 +- plugins/storedquery/middleware.go | 2 +- plugins/storedquery/storedquery.go | 1 + plugins/suggestions/dao.go | 8 +- plugins/suggestions/dao_es7.go | 2 +- plugins/suggestions/handlers.go | 1 + plugins/suggestions/suggestions.go | 1 + plugins/sync/dao.go | 2 +- plugins/sync/sync.go | 1 + plugins/synonyms/dao.go | 2 +- plugins/synonyms/synonyms.go | 2 + plugins/uibuilder/dao.go | 4 +- plugins/uibuilder/uibuilder.go | 3 + plugins/users/dao.go | 2 +- plugins/users/middleware.go | 3 +- plugins/users/users.go | 1 + util/esauth.go | 78 +++++++++++ util/esauth_test.go | 78 +++++++++++ util/esclient.go | 33 ++++- util/meta_index.go | 155 +++++++++++++++++++++ util/meta_index_test.go | 49 +++++++ 62 files changed, 683 insertions(+), 87 deletions(-) create mode 100644 model/reindex/util_test.go create mode 100644 util/esauth.go create mode 100644 util/esauth_test.go create mode 100644 util/meta_index.go create mode 100644 util/meta_index_test.go diff --git a/Dockerfile b/Dockerfile index 021ebccd..1e7795c1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.26.0 AS builder +FROM golang:1.26.4 AS builder ARG VERSION=9.2.0 ENV VERSION="${VERSION}" diff --git a/docs/env-vars.md b/docs/env-vars.md index 5fe12920..85f33036 100644 --- a/docs/env-vars.md +++ b/docs/env-vars.md @@ -2,7 +2,9 @@ Plugins might require certain environment variables to be in order initialize the components it needs for its functioning. Those variables can be declared in any file. The path to that file must be provided via the `--env` flag. -**Note:** `ES_CLUSTER_URL` is used by all the plugins that are interacting with elasticsearch. `USERNAME` and `PASSWORD` are temporary entry point master credentials in order to test the plugins. +**Note:** `ES_CLUSTER_URL` is used by all the plugins that are interacting with elasticsearch. Basic auth credentials may be embedded in `ES_CLUSTER_URL` (for example `https://user:pass@host:443`). Alternatively, set `ES_API_KEY` to use API key auth for upstream elasticsearch (for example Elasticsearch Serverless); `ES_CLUSTER_URL` must not contain credentials when `ES_API_KEY` is set. `USERNAME` and `PASSWORD` are temporary entry point master credentials in order to test the plugins. + +**Meta index naming:** ReactiveSearch stores its metadata in dot-prefixed indices (e.g. `.pipelines`, `.users`). Elasticsearch Serverless does not allow creating dot-prefixed indices, so when a Serverless cluster is detected (via `build_flavor` from `GET /`), meta indices are automatically created with the `rs_` prefix instead (e.g. `rs_pipelines`). Set `RS_META_INDEX_PREFIX` to force an alternate prefix on any cluster (the prefix must not start with `_`, `-` or `+`). On Serverless, platform-managed index settings (`index.hidden`, shard/replica counts) are also stripped from index creation requests. List of specific env vars required by respective plugins are listed below: diff --git a/model/permission/permission.go b/model/permission/permission.go index 755d0b31..a4633292 100644 --- a/model/permission/permission.go +++ b/model/permission/permission.go @@ -494,7 +494,7 @@ func (p *Permission) CanAccessIndex(name string) (bool, error) { if suggestionsIndex == "" { suggestionsIndex = ".suggestions" } - indices = append(indices, suggestionsIndex) + indices = append(indices, util.MetaIndexName(suggestionsIndex)) } for _, pattern := range indices { matched, err := util.ValidateIndex(pattern, name) diff --git a/model/reindex/dao.go b/model/reindex/dao.go index c7eed67d..f7cd7f02 100644 --- a/model/reindex/dao.go +++ b/model/reindex/dao.go @@ -522,11 +522,78 @@ func GetAliasIndexMap(ctx context.Context) (map[string]string, error) { return res, nil } +// GetIndexStoreSize returns the primary store size in bytes for an index or alias. +// It uses _cat/indices, which is supported on Elasticsearch Serverless where +// _stats is unavailable. +func GetIndexStoreSize(ctx context.Context, indexName string) (int64, error) { + index := indexName + aliasesIndexMap, err := GetAliasIndexMap(ctx) + if err != nil { + return 0, err + } + if indexNameFromMap, ok := aliasesIndexMap[indexName]; ok { + index = indexNameFromMap + } + + if !util.IsServerless() { + stats, err := util.GetClient7().IndexStats(index).Do(ctx) + if err == nil { + if val, ok := stats.Indices[index]; ok { + return val.Primaries.Store.SizeInBytes, nil + } + } + } + + return indexStoreSizeFromCat(ctx, index) +} + +func indexStoreSizeFromCat(ctx context.Context, index string) (int64, error) { + v := url.Values{} + v.Set("format", "json") + v.Set("bytes", "b") + + requestOptions := es7.PerformRequestOptions{ + Method: "GET", + Path: "/_cat/indices/" + index, + Params: v, + } + response, err := util.GetClient7().PerformRequest(ctx, requestOptions) + if err != nil { + return 0, err + } + if response.StatusCode > 300 { + return 0, errors.New(string(response.Body)) + } + + var indices []AliasedIndices + if err := json.Unmarshal(response.Body, &indices); err != nil { + return 0, err + } + if len(indices) == 0 { + return 0, fmt.Errorf(`index "%s" not found`, index) + } + + storeSize := indices[0].PriStoreSize + if storeSize == "" { + storeSize = indices[0].StoreSize + } + if isNumericStoreSize(storeSize) { + return strconv.ParseInt(storeSize, 10, 64) + } + return ParseStoreSize(storeSize) +} + func isTaskCompleted(ctx context.Context, taskID string) (bool, error) { isCompleted := false url := util.GetESURL() + "/_tasks/" + taskID - response, err := http.Get(url) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + log.Errorln(logTag, " Get task status error", err) + return isCompleted, err + } + util.ApplyESAuth(req) + response, err := util.HTTPClient().Do(req) if err != nil { log.Errorln(logTag, " Get task status error", err) return isCompleted, err diff --git a/model/reindex/util.go b/model/reindex/util.go index c0b25845..fc37afd3 100644 --- a/model/reindex/util.go +++ b/model/reindex/util.go @@ -8,8 +8,10 @@ import ( "strconv" "strings" "sync" + "unicode" "github.com/appbaseio/reactivesearch-api/middleware/classify" + "github.com/appbaseio/reactivesearch-api/util" log "github.com/sirupsen/logrus" ) @@ -70,6 +72,62 @@ var CurrentlyReIndexingProcessMutex = sync.RWMutex{} // IndexStoreSize to decide whether to use async or sync re-indexing const IndexStoreSize = int64(100000000) +// ParseStoreSize converts Elasticsearch _cat/indices size strings (e.g. "1.2kb", +// "100mb") or plain byte counts to bytes. A dash or empty value is treated as zero. +func ParseStoreSize(size string) (int64, error) { + size = strings.TrimSpace(strings.ToLower(size)) + if size == "" || size == "-" { + return 0, nil + } + + multiplier := int64(1) + units := []struct { + suffix string + mult int64 + }{ + {"pb", 1024 * 1024 * 1024 * 1024 * 1024}, + {"tb", 1024 * 1024 * 1024 * 1024}, + {"gb", 1024 * 1024 * 1024}, + {"mb", 1024 * 1024}, + {"kb", 1024}, + {"b", 1}, + } + for _, unit := range units { + if strings.HasSuffix(size, unit.suffix) { + size = strings.TrimSuffix(size, unit.suffix) + multiplier = unit.mult + break + } + } + + size = strings.TrimSpace(size) + if size == "" { + return 0, fmt.Errorf("invalid store size") + } + + value, err := strconv.ParseFloat(size, 64) + if err != nil { + return 0, fmt.Errorf("invalid store size %q: %w", size, err) + } + if value < 0 { + return 0, fmt.Errorf("invalid store size %q", size) + } + + return int64(value * float64(multiplier)), nil +} + +func isNumericStoreSize(size string) bool { + if size == "" { + return false + } + for _, r := range size { + if !unicode.IsDigit(r) { + return false + } + } + return true +} + // reindexedName calculates from the name the number of times an index has been // reindexed to generate the successive name for the index. For example: for an // index named "twitter", the funtion returns "twitter_reindexed_1", and for an @@ -155,7 +213,7 @@ func getSearchRelevancyIndex() string { if searchRelevancyIndex == "" { searchRelevancyIndex = ".searchrelevancy" } - return searchRelevancyIndex + return util.MetaIndexName(searchRelevancyIndex) } // Returns the index name for synonyms @@ -164,5 +222,5 @@ func getSynonymsIndex() string { if synonymsIndex == "" { synonymsIndex = ".rs-synonyms" } - return synonymsIndex + return util.MetaIndexName(synonymsIndex) } diff --git a/model/reindex/util_test.go b/model/reindex/util_test.go new file mode 100644 index 00000000..3b30e57a --- /dev/null +++ b/model/reindex/util_test.go @@ -0,0 +1,58 @@ +package reindex + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestParseStoreSize(t *testing.T) { + Convey("ParseStoreSize", t, func() { + Convey("should treat empty and dash as zero", func() { + size, err := ParseStoreSize("") + So(err, ShouldBeNil) + So(size, ShouldEqual, 0) + + size, err = ParseStoreSize("-") + So(err, ShouldBeNil) + So(size, ShouldEqual, 0) + }) + + Convey("should parse human-readable sizes", func() { + size, err := ParseStoreSize("1.2kb") + So(err, ShouldBeNil) + So(size, ShouldEqual, int64(1228)) + + size, err = ParseStoreSize("100mb") + So(err, ShouldBeNil) + So(size, ShouldEqual, int64(100*1024*1024)) + + size, err = ParseStoreSize("2gb") + So(err, ShouldBeNil) + So(size, ShouldEqual, int64(2*1024*1024*1024)) + }) + + Convey("should parse plain byte counts", func() { + size, err := ParseStoreSize("512") + So(err, ShouldBeNil) + So(size, ShouldEqual, 512) + + size, err = ParseStoreSize("100b") + So(err, ShouldBeNil) + So(size, ShouldEqual, 100) + }) + + Convey("should reject invalid values", func() { + _, err := ParseStoreSize("not-a-size") + So(err, ShouldNotBeNil) + }) + }) +} + +func TestIsNumericStoreSize(t *testing.T) { + Convey("isNumericStoreSize", t, func() { + So(isNumericStoreSize("12345"), ShouldBeTrue) + So(isNumericStoreSize("1.2kb"), ShouldBeFalse) + So(isNumericStoreSize(""), ShouldBeFalse) + }) +} diff --git a/plugins/analytics/analytics.go b/plugins/analytics/analytics.go index 9d7b3eea..d140f6b0 100644 --- a/plugins/analytics/analytics.go +++ b/plugins/analytics/analytics.go @@ -132,6 +132,17 @@ func (a *Analytics) InitFunc() error { recentDocumentsIndex = defaultRecentSearchesEsIndex } + // resolve cluster-safe meta index names (e.g. serverless) + analyticsIndex = util.MetaIndexName(analyticsIndex) + logsIndex = util.MetaIndexName(logsIndex) + userSessionIndex = util.MetaIndexName(userSessionIndex) + analyticsInsightsIndex = util.MetaIndexName(analyticsInsightsIndex) + usersIndex = util.MetaIndexName(usersIndex) + savedSearchedIndex = util.MetaIndexName(savedSearchedIndex) + favoritesIndex = util.MetaIndexName(favoritesIndex) + preferencesIndex = util.MetaIndexName(preferencesIndex) + recentDocumentsIndex = util.MetaIndexName(recentDocumentsIndex) + // initialize the dao var err error a.es, err = initPlugin(analyticsIndex, logsIndex, usersIndex, userSessionIndex, analyticsInsightsIndex, savedSearchedIndex, favoritesIndex, mapping, analyticsMapping, diff --git a/plugins/analytics/dao.go b/plugins/analytics/dao.go index 8de5fde7..4b56d0e1 100644 --- a/plugins/analytics/dao.go +++ b/plugins/analytics/dao.go @@ -96,7 +96,7 @@ func initPlugin(analyticsAlias, logsIndex, usersIndex, userSessionIndex, insight replicas := util.GetReplicas() // Analytics index does not exists, create a new one if !isAnalyticsIndexExist { - settings := fmt.Sprintf(analyticsMapping, analyticsAlias, getAnalyticsMappings(), util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(analyticsMapping, analyticsAlias, getAnalyticsMappings(), util.HiddenIndexSettings(), replicas)) analyticsIndex := analyticsAlias + `-000001` _, err = util.GetClient7().CreateIndex(analyticsIndex). Body(settings). @@ -142,11 +142,11 @@ func initPlugin(analyticsAlias, logsIndex, usersIndex, userSessionIndex, insight log.Println(logTag, ": successfully created index named", analyticsAlias) } - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // User session index does not exists, create a new one if !isUserSessionIndexExist { - settings := fmt.Sprintf(userSessionMapping, getUserSessionMappings(), util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(userSessionMapping, getUserSessionMappings(), util.HiddenIndexSettings(), replicas)) _, err = util.GetClient7().CreateIndex(userSessionIndex).Body(settings).Do(ctx) if err != nil { return nil, fmt.Errorf("error while creating index named %s: %v", userSessionIndex, err) @@ -183,7 +183,7 @@ func initPlugin(analyticsAlias, logsIndex, usersIndex, userSessionIndex, insight // If preferences index doesn't exist, create it. if !isPreferencesIndexExist { - prefsSettings := fmt.Sprintf(preferencesMapping, util.HiddenIndexSettings(), replicas) + prefsSettings := util.AdaptIndexBody(fmt.Sprintf(preferencesMapping, util.HiddenIndexSettings(), replicas)) _, err = util.GetClient7().CreateIndex(preferencesIndex).Body(prefsSettings).Do(ctx) if err != nil { return nil, fmt.Errorf("error while creating index named %s: %v", preferencesIndex, err) @@ -3043,7 +3043,7 @@ func (es *elasticsearch) rolloverIndexJob(alias string) { } json.Unmarshal([]byte(rolloverConfiguration), &rolloverConditions) - settingsString := fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas()) + settingsString := util.AdaptIndexBody(fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas())) settings := make(map[string]interface{}) json.Unmarshal([]byte(settingsString), &settings) rolloverService, err := es7.NewIndicesRolloverService(util.GetClient7()). @@ -3787,7 +3787,7 @@ func createRecentSearchesIndex(indexWithSuffix, indexConfig string) (*recentDocu mappings = fmt.Sprintf(mappings, mappingForType) - settings := fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, mappings) + settings := util.AdaptIndexBody(fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, mappings)) // index does not exists, create a new one _, err = util.GetClient7().CreateIndex(indexWithSuffix).Body(settings).Do(context.Background()) @@ -3796,7 +3796,7 @@ func createRecentSearchesIndex(indexWithSuffix, indexConfig string) (*recentDocu } // Use fallback method to create the index without the mappings - fallbackSettings := fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, "{}") + fallbackSettings := util.AdaptIndexBody(fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, "{}")) _, fallbackErr := util.GetClient7().CreateIndex(indexWithSuffix).Body(fallbackSettings).Do(context.Background()) if fallbackErr != nil { return nil, fmt.Errorf("error while creating index named %s: %v", indexWithSuffix, err) diff --git a/plugins/analytics/util.go b/plugins/analytics/util.go index b0eb2681..93cbedc9 100644 --- a/plugins/analytics/util.go +++ b/plugins/analytics/util.go @@ -915,7 +915,7 @@ func GetAnalyticsIndex() string { if analyticsIndex == "" { analyticsIndex = defaultAnalyticsEsIndex } - return analyticsIndex + return util.MetaIndexName(analyticsIndex) } func GetDocumentSuggestionsIndex() string { @@ -923,7 +923,7 @@ func GetDocumentSuggestionsIndex() string { if recentDocumentsIndex == "" { recentDocumentsIndex = defaultRecentSearchesEsIndex } - return recentDocumentsIndex + return util.MetaIndexName(recentDocumentsIndex) } func getQueryTypeByID(id string, request querytranslate.RSQuery) *querytranslate.QueryType { diff --git a/plugins/applycache/applycache.go b/plugins/applycache/applycache.go index 76b51d1f..6621174b 100644 --- a/plugins/applycache/applycache.go +++ b/plugins/applycache/applycache.go @@ -5,6 +5,7 @@ import ( "github.com/appbaseio/reactivesearch-api/middleware" "github.com/appbaseio/reactivesearch-api/plugins" + "github.com/appbaseio/reactivesearch-api/util" ) const ( @@ -42,7 +43,7 @@ func Instance() *Cache { // InitFunc is a part of Plugin interface that gets executed only once, and initializes // the dao, i.e. elasticsearch before the plugin is operational. func (c *Cache) InitFunc() error { - indexPrefix := cacheEsIndex + indexPrefix := util.MetaIndexName(cacheEsIndex) // initialize the dao var err error diff --git a/plugins/applycache/dao.go b/plugins/applycache/dao.go index 6dcffcb2..a53ff8a4 100644 --- a/plugins/applycache/dao.go +++ b/plugins/applycache/dao.go @@ -28,7 +28,7 @@ func initPlugin(rulesIndex, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, cacheMapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, cacheMapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(rulesIndex).Body(settings).Do(ctx) diff --git a/plugins/auth/auth.go b/plugins/auth/auth.go index 415ccdaf..679e9a51 100644 --- a/plugins/auth/auth.go +++ b/plugins/auth/auth.go @@ -78,14 +78,17 @@ func (a *Auth) InitFunc() error { if userIndex == "" { userIndex = defaultUsersEsIndex } + userIndex = util.MetaIndexName(userIndex) permissionIndex := os.Getenv(envPermissionsEsIndex) if permissionIndex == "" { permissionIndex = defaultPermissionsEsIndex } + permissionIndex = util.MetaIndexName(permissionIndex) publicKeyIndex := os.Getenv(envPublicKeyEsIndex) if publicKeyIndex == "" { publicKeyIndex = defaultPublicKeyEsIndex } + publicKeyIndex = util.MetaIndexName(publicKeyIndex) var err error // initialize the dao diff --git a/plugins/auth/dao.go b/plugins/auth/dao.go index 3d108df0..585d5e4f 100644 --- a/plugins/auth/dao.go +++ b/plugins/auth/dao.go @@ -52,7 +52,7 @@ func (es *elasticsearch) createIndex(indexName, mapping string) (bool, error) { replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(indexName). Body(settings). @@ -88,6 +88,7 @@ func (es *elasticsearch) getPublicKey(ctx context.Context) (publicKey, error) { if publicKeyIndex == "" { publicKeyIndex = defaultPublicKeyEsIndex } + publicKeyIndex = util.MetaIndexName(publicKeyIndex) return es.getPublicKeyEs7(ctx, publicKeyIndex, publicKeyDocID) } diff --git a/plugins/auth/handlers.go b/plugins/auth/handlers.go index 5ecf5f1a..7b98a5bb 100644 --- a/plugins/auth/handlers.go +++ b/plugins/auth/handlers.go @@ -67,6 +67,7 @@ func (a *Auth) setPublicKey() http.HandlerFunc { if publicKeyIndex == "" { publicKeyIndex = defaultPublicKeyEsIndex } + publicKeyIndex = util.MetaIndexName(publicKeyIndex) jwtRsaPublicKey, err := getJWTPublickKey(body) if err != nil { diff --git a/plugins/auth/middleware.go b/plugins/auth/middleware.go index 740e7e4c..e13a4dbc 100644 --- a/plugins/auth/middleware.go +++ b/plugins/auth/middleware.go @@ -19,6 +19,7 @@ import ( "github.com/appbaseio/reactivesearch-api/model/trackplugin" "github.com/appbaseio/reactivesearch-api/model/user" "github.com/appbaseio/reactivesearch-api/plugins/telemetry" + "github.com/appbaseio/reactivesearch-api/util" "github.com/dgrijalva/jwt-go" "github.com/dgrijalva/jwt-go/request" "github.com/gorilla/mux" @@ -51,6 +52,7 @@ func classifyIndices(h http.HandlerFunc) http.HandlerFunc { if publicKeyIndex == "" { publicKeyIndex = defaultPublicKeyEsIndex } + publicKeyIndex = util.MetaIndexName(publicKeyIndex) ctx := index.NewContext(req.Context(), []string{publicKeyIndex}) req = req.WithContext(ctx) h(w, req) diff --git a/plugins/cache/cache.go b/plugins/cache/cache.go index 79388deb..ceb6cebf 100644 --- a/plugins/cache/cache.go +++ b/plugins/cache/cache.go @@ -49,6 +49,7 @@ func (c *Cache) InitFunc() error { if cacheIndex == "" { cacheIndex = defaultCacheEsIndex } + cacheIndex = util.MetaIndexName(cacheIndex) // initialize the dao var err error c.es, err = initPlugin(cacheIndex, mapping) diff --git a/plugins/cache/dao.go b/plugins/cache/dao.go index eacfaa6e..e74b7d27 100644 --- a/plugins/cache/dao.go +++ b/plugins/cache/dao.go @@ -29,7 +29,7 @@ func initPlugin(cacheIndex, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(cacheIndex).Body(settings).Do(ctx) diff --git a/plugins/logs/dao.go b/plugins/logs/dao.go index d0843d88..1851d45f 100644 --- a/plugins/logs/dao.go +++ b/plugins/logs/dao.go @@ -50,6 +50,7 @@ func initPlugin(alias, config string) (*elasticsearch, error) { mappings := fmt.Sprintf(`{"_doc": %s}`, LogsMappings) settings = fmt.Sprintf(config, alias, util.HiddenIndexSettings(), replicas, mappings) } + settings = util.AdaptIndexBody(settings) // Meta index doesn't exist, create one indexName := alias + `-000001` // this works for ES6 client as well @@ -153,7 +154,7 @@ func (es *elasticsearch) rolloverIndexJob(alias string) { rolloverConfiguration = fmt.Sprintf(rolloverConfig, "30d", 1000000, "10gb") } json.Unmarshal([]byte(rolloverConfiguration), &rolloverConditions) - settingsString := fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas()) + settingsString := util.AdaptIndexBody(fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas())) settings := make(map[string]interface{}) json.Unmarshal([]byte(settingsString), &settings) diff --git a/plugins/logs/logs.go b/plugins/logs/logs.go index 31332d76..8ed5fdc0 100644 --- a/plugins/logs/logs.go +++ b/plugins/logs/logs.go @@ -6,6 +6,7 @@ import ( "github.com/appbaseio/reactivesearch-api/middleware" "github.com/appbaseio/reactivesearch-api/plugins" + "github.com/appbaseio/reactivesearch-api/util" "github.com/natefinch/lumberjack" "github.com/robfig/cron" log "github.com/sirupsen/logrus" @@ -112,6 +113,7 @@ func (l *Logs) InitFunc() error { if indexName == "" { indexName = defaultLogsEsIndex } + indexName = util.MetaIndexName(indexName) // initialize the elasticsearch client var err error diff --git a/plugins/nodes/dao.go b/plugins/nodes/dao.go index e2b4a613..81c7d08a 100644 --- a/plugins/nodes/dao.go +++ b/plugins/nodes/dao.go @@ -31,7 +31,7 @@ func initPlugin(indexName, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Create a new meta index _, err = util.GetClient7().CreateIndex(indexName). diff --git a/plugins/nodes/nodes.go b/plugins/nodes/nodes.go index 19852b40..2c392010 100644 --- a/plugins/nodes/nodes.go +++ b/plugins/nodes/nodes.go @@ -5,6 +5,7 @@ import ( "github.com/appbaseio/reactivesearch-api/middleware" "github.com/appbaseio/reactivesearch-api/plugins" + "github.com/appbaseio/reactivesearch-api/util" log "github.com/sirupsen/logrus" ) @@ -39,7 +40,7 @@ func (n *nodes) Name() string { func (n *nodes) InitFunc() error { log.Println(logTag, ": initializing plugin") - indexName := defaultNodesIndex + indexName := util.MetaIndexName(defaultNodesIndex) // initialize the dao var err error diff --git a/plugins/openai/dao.go b/plugins/openai/dao.go index 355df641..50858e0a 100644 --- a/plugins/openai/dao.go +++ b/plugins/openai/dao.go @@ -28,7 +28,7 @@ func initPlugin(openAIIndex, mapping string, actualMapping string) (*elasticsear } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas, actualMapping) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas, actualMapping)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(openAIIndex).Body(settings).Do(ctx) @@ -61,7 +61,7 @@ func initAnalyticsPlugin(openAIAnalyticsIndex, mapping string) (*analyticsElasti } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(openAIAnalyticsIndex).Body(settings).Do(ctx) @@ -121,7 +121,7 @@ func initFAQPluginES(faqIndex, mapping string) (*FAQElasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(faqIndex).Body(settings).Do(ctx) diff --git a/plugins/openai/openai.go b/plugins/openai/openai.go index caa60c3e..458a6207 100644 --- a/plugins/openai/openai.go +++ b/plugins/openai/openai.go @@ -89,6 +89,7 @@ func (r *OpenAI) InitFunc() error { if openAIIndex == "" { openAIIndex = defaultOpenAIEsIndex } + openAIIndex = util.MetaIndexName(openAIIndex) // initialize the dao var err error @@ -97,7 +98,7 @@ func (r *OpenAI) InitFunc() error { return err } - settings := fmt.Sprintf(updatedMappingWithSettings, util.HiddenIndexSettings(), util.GetReplicas(), updatedMapping) + settings := util.AdaptIndexBody(fmt.Sprintf(updatedMappingWithSettings, util.HiddenIndexSettings(), util.GetReplicas(), updatedMapping)) migration := MappingsMigration{ NewMapping: settings, es: r.es.(*elasticsearch), @@ -105,13 +106,13 @@ func (r *OpenAI) InitFunc() error { util.AddMigrationScript(migration) // Initialize the analytics plugin as well - r.analyticsEs, err = initAnalyticsPlugin(defaultAIAnalyticsIndex, analyticsMapping) + r.analyticsEs, err = initAnalyticsPlugin(util.MetaIndexName(defaultAIAnalyticsIndex), analyticsMapping) if err != nil { return err } // Initialize the FAQ index on ES - r.faqEs, err = initFAQPluginES(defaultFAQIndex, FAQMapping) + r.faqEs, err = initFAQPluginES(util.MetaIndexName(defaultFAQIndex), FAQMapping) if err != nil { return err } diff --git a/plugins/permissions/dao.go b/plugins/permissions/dao.go index b41a791c..41c24e15 100644 --- a/plugins/permissions/dao.go +++ b/plugins/permissions/dao.go @@ -33,7 +33,7 @@ func initPlugin(indexName, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Create a new meta index _, err = util.GetClient7().CreateIndex(indexName). diff --git a/plugins/permissions/permissions.go b/plugins/permissions/permissions.go index 54292daa..4e1850ea 100644 --- a/plugins/permissions/permissions.go +++ b/plugins/permissions/permissions.go @@ -47,6 +47,7 @@ func (p *permissions) InitFunc() error { if indexName == "" { indexName = defaultPermissionsEsIndex } + indexName = util.MetaIndexName(indexName) // initialize the dao var err error diff --git a/plugins/pipelines/analytics.go b/plugins/pipelines/analytics.go index a580bae8..c7b11cc0 100644 --- a/plugins/pipelines/analytics.go +++ b/plugins/pipelines/analytics.go @@ -800,7 +800,7 @@ func (es *invocationElasticsearch) rolloverIndexJob(alias string) { rolloverConfiguration = fmt.Sprintf(rolloverConfig, "7d", 10000000, "10gb") } json.Unmarshal([]byte(rolloverConfiguration), &rolloverConditions) - settingsString := fmt.Sprintf(`{%s "index.number_of_shards": 3, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas()) + settingsString := util.AdaptIndexBody(fmt.Sprintf(`{%s "index.number_of_shards": 3, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas())) settings := make(map[string]interface{}) json.Unmarshal([]byte(settingsString), &settings) diff --git a/plugins/pipelines/dao.go b/plugins/pipelines/dao.go index d289a3d7..46a2a690 100644 --- a/plugins/pipelines/dao.go +++ b/plugins/pipelines/dao.go @@ -44,7 +44,7 @@ func initPlugin(pipelinesIndex, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, pipelinesMapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, pipelinesMapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(pipelinesIndex).Body(settings).Do(ctx) @@ -79,7 +79,7 @@ func initInvocationIndex(invocationAlias, _ string) (*invocationElasticsearch, e } replicas := util.GetReplicas() - settings := fmt.Sprintf(invocationConfig, invocationAlias, util.HiddenIndexSettings(), replicas, pipelineInvocationMapping) + settings := util.AdaptIndexBody(fmt.Sprintf(invocationConfig, invocationAlias, util.HiddenIndexSettings(), replicas, pipelineInvocationMapping)) // Create the index name to match the name regex for rollover invocationIndex := invocationAlias + `-000001` @@ -156,7 +156,7 @@ func initLogIndex(logsAlias, mapping string) (*logsElasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, logsAlias, util.HiddenIndexSettings(), replicas, pipelineLogsMapping) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, logsAlias, util.HiddenIndexSettings(), replicas, pipelineLogsMapping)) // Create the index name to match the name regex for logsIndex := logsAlias + `-000001` @@ -227,7 +227,7 @@ func initVarIndex(varIndex, mapping string) (*varElasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, pipelineVarMapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, pipelineVarMapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(varIndex).Body(settings).Do(ctx) diff --git a/plugins/pipelines/es_stage.go b/plugins/pipelines/es_stage.go index b49d4632..5cc378d9 100644 --- a/plugins/pipelines/es_stage.go +++ b/plugins/pipelines/es_stage.go @@ -568,6 +568,7 @@ func executeElasticsearchStage( for k := range requestHeader { esRequest.Header.Set(k, requestHeader.Get(k)) } + util.ApplyESAuth(esRequest) // perform Request log.Debugln("Pipeline Elasticsearch: REQUEST METHOD", esRequest.Method) diff --git a/plugins/pipelines/logs.go b/plugins/pipelines/logs.go index a24d9410..423ba1f6 100644 --- a/plugins/pipelines/logs.go +++ b/plugins/pipelines/logs.go @@ -245,7 +245,7 @@ func (es *logsElasticsearch) rolloverIndexJob(alias string) { rolloverConfiguration = fmt.Sprintf(rolloverConfig, "30d", 1000000, "10gb") } json.Unmarshal([]byte(rolloverConfiguration), &rolloverConditions) - settingsString := fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas()) + settingsString := util.AdaptIndexBody(fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas())) settings := make(map[string]interface{}) json.Unmarshal([]byte(settingsString), &settings) diff --git a/plugins/pipelines/middleware.go b/plugins/pipelines/middleware.go index 0da55457..7cce9c6d 100644 --- a/plugins/pipelines/middleware.go +++ b/plugins/pipelines/middleware.go @@ -103,7 +103,7 @@ func (route *ESPipelineRoutes) classifyRouteACL() func(h http.HandlerFunc) http. func classifyIndices(h http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { - ctx := index.NewContext(req.Context(), []string{defaultPipelinesEsIndex}) + ctx := index.NewContext(req.Context(), []string{util.MetaIndexName(defaultPipelinesEsIndex)}) req = req.WithContext(ctx) h(w, req) } diff --git a/plugins/pipelines/pipelines.go b/plugins/pipelines/pipelines.go index f0859f04..0f19d99b 100644 --- a/plugins/pipelines/pipelines.go +++ b/plugins/pipelines/pipelines.go @@ -119,6 +119,7 @@ func (p *Pipelines) InitFunc() error { if indexPrefix == "" { indexPrefix = defaultPipelinesEsIndex } + indexPrefix = util.MetaIndexName(indexPrefix) // initialize the dao var err error @@ -128,19 +129,19 @@ func (p *Pipelines) InitFunc() error { } // Initialize the dao for invocation - p.invocationEs, err = initInvocationIndex(defaultPipelineInvocationIndex, mapping) + p.invocationEs, err = initInvocationIndex(util.MetaIndexName(defaultPipelineInvocationIndex), mapping) if err != nil { return err } // Init the logs index - p.logEs, err = initLogIndex(defaultPipelinesLogEsIndex, logsConfig) + p.logEs, err = initLogIndex(util.MetaIndexName(defaultPipelinesLogEsIndex), logsConfig) if err != nil { return err } // Init the vars index - p.varEs, err = initVarIndex(defaultPipelineVarsIndex, mapping) + p.varEs, err = initVarIndex(util.MetaIndexName(defaultPipelineVarsIndex), mapping) if err != nil { return err } @@ -214,15 +215,17 @@ func (p *Pipelines) InitFunc() error { p.pipelineSchema = schema // Initiate the logs rollover cronjob + pipelineLogsIndex := util.MetaIndexName(defaultPipelinesLogEsIndex) + pipelineInvocationsIndex := util.MetaIndexName(defaultPipelineInvocationIndex) cronjob := cron.New() - cronjob.AddFunc("@midnight", func() { p.logEs.rolloverIndexJob(defaultPipelinesLogEsIndex) }) + cronjob.AddFunc("@midnight", func() { p.logEs.rolloverIndexJob(pipelineLogsIndex) }) // in addition, run every hour, keeping original midnight job as well - cronjob.AddFunc("@hourly", func() { p.logEs.rolloverIndexJob(defaultPipelinesLogEsIndex) }) + cronjob.AddFunc("@hourly", func() { p.logEs.rolloverIndexJob(pipelineLogsIndex) }) // Initiate the invocations rollover cronjob - cronjob.AddFunc("@midnight", func() { p.invocationEs.rolloverIndexJob(defaultPipelineInvocationIndex) }) + cronjob.AddFunc("@midnight", func() { p.invocationEs.rolloverIndexJob(pipelineInvocationsIndex) }) // in addition, run every hour, keeping original midnight job as well - cronjob.AddFunc("@hourly", func() { p.invocationEs.rolloverIndexJob(defaultPipelineInvocationIndex) }) + cronjob.AddFunc("@hourly", func() { p.invocationEs.rolloverIndexJob(pipelineInvocationsIndex) }) cronjob.Start() diff --git a/plugins/reindexer/dao.go b/plugins/reindexer/dao.go index 23380cae..ba415493 100644 --- a/plugins/reindexer/dao.go +++ b/plugins/reindexer/dao.go @@ -2,32 +2,10 @@ package reindexer import ( "context" - "errors" "github.com/appbaseio/reactivesearch-api/model/reindex" - "github.com/appbaseio/reactivesearch-api/util" ) func getIndexSize(ctx context.Context, indexName string) (int64, error) { - var res int64 - index := indexName - aliasesIndexMap, err := reindex.GetAliasIndexMap(ctx) - if err != nil { - return res, err - } - if indexNameFromMap, ok := aliasesIndexMap[indexName]; ok { - index = indexNameFromMap - } - - stats, err := util.GetClient7().IndexStats(indexName).Do(ctx) - if err != nil { - return res, err - } - - if val, ok := stats.Indices[index]; ok { - res = val.Primaries.Store.SizeInBytes - return res, nil - } - - return res, errors.New(`Invalid index name`) + return reindex.GetIndexStoreSize(ctx, indexName) } diff --git a/plugins/rules/dao.go b/plugins/rules/dao.go index 8b2c4708..85aae2f4 100644 --- a/plugins/rules/dao.go +++ b/plugins/rules/dao.go @@ -28,7 +28,7 @@ func initPlugin(rulesIndex, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, rulesMapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, rulesMapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(rulesIndex).Body(settings).Do(ctx) diff --git a/plugins/rules/middleware.go b/plugins/rules/middleware.go index 00ca089c..c8e7e66d 100644 --- a/plugins/rules/middleware.go +++ b/plugins/rules/middleware.go @@ -88,7 +88,7 @@ func classifyCategory(h http.HandlerFunc) http.HandlerFunc { func classifyIndices(h http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { - ctx := index.NewContext(req.Context(), []string{defaultRulesEsIndex}) + ctx := index.NewContext(req.Context(), []string{util.MetaIndexName(defaultRulesEsIndex)}) req = req.WithContext(ctx) h(w, req) } diff --git a/plugins/rules/rules.go b/plugins/rules/rules.go index 898514fd..70692fd1 100644 --- a/plugins/rules/rules.go +++ b/plugins/rules/rules.go @@ -63,6 +63,7 @@ func (r *Rules) InitFunc() error { if indexPrefix == "" { indexPrefix = defaultRulesEsIndex } + indexPrefix = util.MetaIndexName(indexPrefix) // initialize the dao var err error diff --git a/plugins/rules/util.go b/plugins/rules/util.go index c4033a44..b95ce882 100644 --- a/plugins/rules/util.go +++ b/plugins/rules/util.go @@ -1384,6 +1384,7 @@ func (r *Rules) runScript(scriptContext ScriptContext, script string, executeRSA req.Header.Set(key, value) } } + util.ApplyESAuth(req) res, err := util.HTTPClient().Do(req) if err != nil { diff --git a/plugins/searchgrader/dao.go b/plugins/searchgrader/dao.go index b81867b9..d607c9c4 100644 --- a/plugins/searchgrader/dao.go +++ b/plugins/searchgrader/dao.go @@ -28,7 +28,7 @@ func initPlugin(searchGraderIndex, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(searchGraderIndex).Body(settings).Do(ctx) diff --git a/plugins/searchgrader/middleware.go b/plugins/searchgrader/middleware.go index a8e562ac..19653775 100644 --- a/plugins/searchgrader/middleware.go +++ b/plugins/searchgrader/middleware.go @@ -64,7 +64,7 @@ func classifyCategory(h http.HandlerFunc) http.HandlerFunc { func classifyIndices(h http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { - ctx := index.NewContext(req.Context(), []string{defaultSearchgraderEsIndex}) + ctx := index.NewContext(req.Context(), []string{util.MetaIndexName(defaultSearchgraderEsIndex)}) req = req.WithContext(ctx) h(w, req) } diff --git a/plugins/searchgrader/searchgrader.go b/plugins/searchgrader/searchgrader.go index a671ddcc..0f0025f2 100644 --- a/plugins/searchgrader/searchgrader.go +++ b/plugins/searchgrader/searchgrader.go @@ -6,6 +6,7 @@ import ( "github.com/appbaseio/reactivesearch-api/middleware" "github.com/appbaseio/reactivesearch-api/plugins" + "github.com/appbaseio/reactivesearch-api/util" ) const ( @@ -47,6 +48,7 @@ func (s *SearchGrader) InitFunc() error { if searchgraderIndex == "" { searchgraderIndex = defaultSearchgraderEsIndex } + searchgraderIndex = util.MetaIndexName(searchgraderIndex) // initialize the dao var err error s.es, err = initPlugin(searchgraderIndex, mapping) diff --git a/plugins/searchrelevancy/dao.go b/plugins/searchrelevancy/dao.go index 9e85c678..78f1d130 100644 --- a/plugins/searchrelevancy/dao.go +++ b/plugins/searchrelevancy/dao.go @@ -31,7 +31,7 @@ func initPlugin(searchRelevancyIndex string) (*elasticsearch, error) { mappingData := mapping - settings := fmt.Sprintf(indexSettingMapping, mappingData, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(indexSettingMapping, mappingData, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(searchRelevancyIndex).Body(settings).Do(ctx) diff --git a/plugins/searchrelevancy/searchrelevancy.go b/plugins/searchrelevancy/searchrelevancy.go index 38f6c286..6849aa11 100644 --- a/plugins/searchrelevancy/searchrelevancy.go +++ b/plugins/searchrelevancy/searchrelevancy.go @@ -52,6 +52,7 @@ func (a *SearchRelevancy) InitFunc() error { if searchRelevancyIndex == "" { searchRelevancyIndex = defaultSearchRelevancyEsIndex } + searchRelevancyIndex = util.MetaIndexName(searchRelevancyIndex) // initialize the dao var err error diff --git a/plugins/storedquery/dao.go b/plugins/storedquery/dao.go index d424458c..7edd6e24 100644 --- a/plugins/storedquery/dao.go +++ b/plugins/storedquery/dao.go @@ -29,7 +29,7 @@ func initPlugin(index, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(index).Body(settings).Do(ctx) diff --git a/plugins/storedquery/middleware.go b/plugins/storedquery/middleware.go index 89719b4e..ab14d66a 100644 --- a/plugins/storedquery/middleware.go +++ b/plugins/storedquery/middleware.go @@ -57,7 +57,7 @@ func classifyCategory(h http.HandlerFunc) http.HandlerFunc { func classifyIndices(h http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { - ctx := index.NewContext(req.Context(), []string{defaultStoredQueryEsIndex}) + ctx := index.NewContext(req.Context(), []string{util.MetaIndexName(defaultStoredQueryEsIndex)}) req = req.WithContext(ctx) h(w, req) } diff --git a/plugins/storedquery/storedquery.go b/plugins/storedquery/storedquery.go index 70fed43e..5dda0781 100644 --- a/plugins/storedquery/storedquery.go +++ b/plugins/storedquery/storedquery.go @@ -49,6 +49,7 @@ func (s *StoredQuery) InitFunc() error { if indexPrefix == "" { indexPrefix = defaultStoredQueryEsIndex } + indexPrefix = util.MetaIndexName(indexPrefix) // initialize the dao var err error diff --git a/plugins/suggestions/dao.go b/plugins/suggestions/dao.go index 1336ac7b..e6b0d6fc 100644 --- a/plugins/suggestions/dao.go +++ b/plugins/suggestions/dao.go @@ -70,7 +70,7 @@ func createSuggestionsIndex(indexWithSuffix, indexConfigEs6, indexConfigEs7 stri default: indexConfig = indexConfigEs7 } - settings := fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas)) // index does not exists, create a new one _, err = util.GetClient7().CreateIndex(indexWithSuffix).Body(settings).Do(ctx) @@ -256,7 +256,7 @@ func (es *elasticsearch) getPopularSuggestionsPreferences(ctx context.Context) ( // Returns the custom events from analytics index func getCustomEvents() ([]string, error) { var customEvents []string - var indexName = ".analytics" + var indexName = util.MetaIndexName(".analytics") // Fetch analytics mapping to find the custom events response, err := util.GetIndexMapping(indexName, context.Background()) if err != nil { @@ -447,7 +447,7 @@ func getSuggestionsIndex() string { if suggestionsIndex == "" { suggestionsIndex = defaultSuggestionsEsIndex } - return suggestionsIndex + return util.MetaIndexName(suggestionsIndex) } func syncAnalyticsToSuggestions(s *suggestions, indexToUse string) (interface{}, error) { @@ -745,7 +745,7 @@ func GetFAQSuggestions(config querytranslate.FAQSuggestionsOptions, value string // Execute search against ES res, searchErr := util.GetClient7().Search(). - Index(".ai_faqs"). + Index(util.MetaIndexName(".ai_faqs")). Query(query). Size(size). Do(context.Background()) diff --git a/plugins/suggestions/dao_es7.go b/plugins/suggestions/dao_es7.go index 3982c1d2..ffb547e4 100644 --- a/plugins/suggestions/dao_es7.go +++ b/plugins/suggestions/dao_es7.go @@ -53,7 +53,7 @@ func (es *elasticsearch) querySuggestionsEs7(ctx context.Context, preferences Po field := event + ".keyword" aggr.SubAggregation(event, es7.NewTermsAggregation().Field(field)) } - aggrResult, err := util.GetClient7().Search(".analytics"). + aggrResult, err := util.GetClient7().Search(util.MetaIndexName(".analytics")). Query(query). Aggregation("top-terms", aggr). Size(0). diff --git a/plugins/suggestions/handlers.go b/plugins/suggestions/handlers.go index 478ccf40..7eb9f172 100644 --- a/plugins/suggestions/handlers.go +++ b/plugins/suggestions/handlers.go @@ -338,6 +338,7 @@ func (rx *suggestions) getPopularSuggestionsPreferences() http.HandlerFunc { if suggestionsIndex == "" { suggestionsIndex = defaultSuggestionsEsIndex } + suggestionsIndex = util.MetaIndexName(suggestionsIndex) var finalResponse map[string]interface{} diff --git a/plugins/suggestions/suggestions.go b/plugins/suggestions/suggestions.go index f2006afd..51690a5c 100644 --- a/plugins/suggestions/suggestions.go +++ b/plugins/suggestions/suggestions.go @@ -241,6 +241,7 @@ func (r *suggestions) InitFunc() error { if indexPreferencesSuffix == "" { indexPreferencesSuffix = defaultSuggestionsPreferencesIndex } + indexPreferencesSuffix = util.MetaIndexName(indexPreferencesSuffix) var err error var exists bool diff --git a/plugins/sync/dao.go b/plugins/sync/dao.go index c8fa774e..3d71933d 100644 --- a/plugins/sync/dao.go +++ b/plugins/sync/dao.go @@ -30,7 +30,7 @@ func initPlugin(rulesIndex, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(rulesIndex).Body(settings).Do(ctx) diff --git a/plugins/sync/sync.go b/plugins/sync/sync.go index 539250d7..5355b155 100644 --- a/plugins/sync/sync.go +++ b/plugins/sync/sync.go @@ -50,6 +50,7 @@ func (p *Sync) InitFunc() error { if indexName == "" { indexName = defaultSyncPreferencesIndex } + indexName = util.MetaIndexName(indexName) // initialize the dao var err error diff --git a/plugins/synonyms/dao.go b/plugins/synonyms/dao.go index 8cd6e14f..8065b3fb 100644 --- a/plugins/synonyms/dao.go +++ b/plugins/synonyms/dao.go @@ -28,7 +28,7 @@ func initPlugin(synonymsIndex, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(synonymsIndex).Body(settings).Do(ctx) diff --git a/plugins/synonyms/synonyms.go b/plugins/synonyms/synonyms.go index 1cfd5a30..1efb2464 100644 --- a/plugins/synonyms/synonyms.go +++ b/plugins/synonyms/synonyms.go @@ -7,6 +7,7 @@ import ( "github.com/appbaseio/reactivesearch-api/plugins" "github.com/appbaseio/reactivesearch-api/middleware" + "github.com/appbaseio/reactivesearch-api/util" "github.com/go-playground/validator/v10" ) @@ -50,6 +51,7 @@ func (s *Synonyms) InitFunc() error { if synonymsIndex == "" { synonymsIndex = defaultSynonymsEsIndex } + synonymsIndex = util.MetaIndexName(synonymsIndex) // initialize the dao var err error diff --git a/plugins/uibuilder/dao.go b/plugins/uibuilder/dao.go index a2ea49d9..aa76ebb7 100644 --- a/plugins/uibuilder/dao.go +++ b/plugins/uibuilder/dao.go @@ -28,7 +28,7 @@ func initPlugin(indexName, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(indexName).Body(settings).Do(ctx) @@ -56,7 +56,7 @@ func createSearchBoxIndex(indexName string, indexConfig string) (*elasticsearch, replicas := util.GetReplicas() - settings := fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas)) // index does not exists, create a new one _, err = util.GetClient7().CreateIndex(indexName).Body(settings).Do(ctx) diff --git a/plugins/uibuilder/uibuilder.go b/plugins/uibuilder/uibuilder.go index 0577561c..771e194e 100644 --- a/plugins/uibuilder/uibuilder.go +++ b/plugins/uibuilder/uibuilder.go @@ -65,10 +65,12 @@ func (e *UIBuilder) InitFunc() error { if preferencesIndex == "" { preferencesIndex = defaultUIBuilderPreferencesIndex } + preferencesIndex = util.MetaIndexName(preferencesIndex) searchboxIndex := os.Getenv(envSearchBoxIndex) if searchboxIndex == "" { searchboxIndex = defaultSearchBoxIndex } + searchboxIndex = util.MetaIndexName(searchboxIndex) var err error e.esFeaturedSuggestions, _, err = createSearchBoxIndex(searchboxIndex, mapping) if err != nil { @@ -79,6 +81,7 @@ func (e *UIBuilder) InitFunc() error { if featuredSuggestionsIndex == "" { featuredSuggestionsIndex = defaultFeaturedSuggestionsIndex } + featuredSuggestionsIndex = util.MetaIndexName(featuredSuggestionsIndex) _, featuredSuggestionsIndexExists, err := createSearchBoxIndex(featuredSuggestionsIndex, featuredSuggestionsMapping) if err != nil { return err diff --git a/plugins/users/dao.go b/plugins/users/dao.go index 4bbe6ada..cfa843b5 100644 --- a/plugins/users/dao.go +++ b/plugins/users/dao.go @@ -48,7 +48,7 @@ func initPlugin(indexName, mapping string) (*elasticsearch, error) { } replicas := util.GetReplicas() - settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas) + settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)) // Meta index does not exists, create a new one _, err = util.GetClient7().CreateIndex(indexName). Body(settings). diff --git a/plugins/users/middleware.go b/plugins/users/middleware.go index cdcddfda..23f7a154 100644 --- a/plugins/users/middleware.go +++ b/plugins/users/middleware.go @@ -15,6 +15,7 @@ import ( "github.com/appbaseio/reactivesearch-api/plugins/auth" "github.com/appbaseio/reactivesearch-api/plugins/logs" "github.com/appbaseio/reactivesearch-api/plugins/telemetry" + "github.com/appbaseio/reactivesearch-api/util" ) type chain struct { @@ -52,7 +53,7 @@ func classifyCategory(h http.HandlerFunc) http.HandlerFunc { func classifyIndices(h http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { - ctx := index.NewContext(req.Context(), []string{defaultUsersEsIndex}) + ctx := index.NewContext(req.Context(), []string{util.MetaIndexName(defaultUsersEsIndex)}) req = req.WithContext(ctx) h(w, req) } diff --git a/plugins/users/users.go b/plugins/users/users.go index 341640b1..e4100a5c 100644 --- a/plugins/users/users.go +++ b/plugins/users/users.go @@ -48,6 +48,7 @@ func (u *Users) InitFunc() error { if indexName == "" { indexName = defaultUsersEsIndex } + indexName = util.MetaIndexName(indexName) // initialize the dao var err error diff --git a/util/esauth.go b/util/esauth.go new file mode 100644 index 00000000..e6a19bec --- /dev/null +++ b/util/esauth.go @@ -0,0 +1,78 @@ +package util + +import ( + "fmt" + "net/http" + "net/url" + "os" + "strings" + "sync" + + log "github.com/sirupsen/logrus" +) + +const envESAPIKey = "ES_API_KEY" + +var ( + esAuthOnce sync.Once + esAPIKey string + esAuthErr error +) + +func loadESAuthConfig() { + esAPIKey = strings.TrimSpace(os.Getenv(envESAPIKey)) + if esAPIKey == "" { + return + } + + if hasURLCredentials(os.Getenv("ES_CLUSTER_URL")) { + esAuthErr = fmt.Errorf("ES_API_KEY cannot be used when ES_CLUSTER_URL contains username and password credentials") + } +} + +// validateESAuthConfig validates elasticsearch auth configuration. +func validateESAuthConfig() error { + esAuthOnce.Do(loadESAuthConfig) + return esAuthErr +} + +// UsesESAPIKey reports whether upstream ES auth uses an API key. +func UsesESAPIKey() bool { + if err := validateESAuthConfig(); err != nil { + log.Fatal("Error encountered: ", err) + } + return esAPIKey != "" +} + +// ESAuthHeaders returns request headers for API key auth when configured. +func ESAuthHeaders() http.Header { + headers := make(http.Header) + if UsesESAPIKey() { + headers.Set("Authorization", "ApiKey "+esAPIKey) + } + return headers +} + +// ApplyESAuth sets upstream elasticsearch authorization on an HTTP request. +func ApplyESAuth(req *http.Request) { + if UsesESAPIKey() { + req.Header.Set("Authorization", "ApiKey "+esAPIKey) + } +} + +func hasURLCredentials(rawURL string) bool { + parsedURL, err := url.Parse(rawURL) + if err != nil { + return strings.Contains(rawURL, "@") + } + return parsedURL.User != nil +} + +func stripURLCredentials(rawURL string) (string, error) { + parsedURL, err := url.Parse(rawURL) + if err != nil { + return "", err + } + parsedURL.User = nil + return parsedURL.String(), nil +} diff --git a/util/esauth_test.go b/util/esauth_test.go new file mode 100644 index 00000000..c5431c48 --- /dev/null +++ b/util/esauth_test.go @@ -0,0 +1,78 @@ +package util + +import ( + "net/http" + "sync" + "testing" +) + +func resetESAuthForTest(t *testing.T) { + t.Helper() + esAuthOnce = sync.Once{} + esAPIKey = "" + esAuthErr = nil +} + +func TestValidateESAuthConfig_APIKeyWithURLCredentials(t *testing.T) { + resetESAuthForTest(t) + t.Setenv("ES_CLUSTER_URL", "https://user:pass@cluster.example.com:443") + t.Setenv("ES_API_KEY", "abc123") + + err := validateESAuthConfig() + if err == nil { + t.Fatal("expected error when ES_API_KEY is set with URL credentials") + } +} + +func TestValidateESAuthConfig_APIKeyOnly(t *testing.T) { + resetESAuthForTest(t) + t.Setenv("ES_CLUSTER_URL", "https://cluster.example.com:443") + t.Setenv("ES_API_KEY", "abc123") + + if err := validateESAuthConfig(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !UsesESAPIKey() { + t.Fatal("expected API key auth to be enabled") + } +} + +func TestGetESURL_APIKeyModeReturnsCleanURL(t *testing.T) { + resetESAuthForTest(t) + t.Setenv("ES_CLUSTER_URL", "https://cluster.example.com:443") + t.Setenv("ES_API_KEY", "abc123") + + got := GetESURL() + want := "https://cluster.example.com:443" + if got != want { + t.Fatalf("GetESURL() = %q, want %q", got, want) + } +} + +func TestGetESURL_BasicAuthModePreservesEscapedCredentials(t *testing.T) { + resetESAuthForTest(t) + t.Setenv("ES_CLUSTER_URL", "https://user:pass@cluster.example.com:443") + t.Setenv("ES_API_KEY", "") + + got := GetESURL() + want := "https://user:pass@cluster.example.com:443" + if got != want { + t.Fatalf("GetESURL() = %q, want %q", got, want) + } +} + +func TestApplyESAuth_SetsAuthorizationHeader(t *testing.T) { + resetESAuthForTest(t) + t.Setenv("ES_CLUSTER_URL", "https://cluster.example.com:443") + t.Setenv("ES_API_KEY", "encoded-key") + + req, err := http.NewRequest(http.MethodGet, "https://cluster.example.com:443", nil) + if err != nil { + t.Fatal(err) + } + + ApplyESAuth(req) + if got := req.Header.Get("Authorization"); got != "ApiKey encoded-key" { + t.Fatalf("Authorization header = %q, want %q", got, "ApiKey encoded-key") + } +} diff --git a/util/esclient.go b/util/esclient.go index ac42938b..c767fed4 100644 --- a/util/esclient.go +++ b/util/esclient.go @@ -53,7 +53,9 @@ func GetClient7() *es7.Client { return client7 } -// GetESURL returns elasticsearch url with escaped auth +// GetESURL returns the elasticsearch cluster URL. When ES_API_KEY is set, the URL +// is returned without credentials and auth is applied via request headers instead. +// Otherwise, basic auth credentials embedded in ES_CLUSTER_URL are escaped in place. func GetESURL() string { esURL := os.Getenv("ES_CLUSTER_URL") @@ -61,6 +63,18 @@ func GetESURL() string { log.Fatal("Error encountered: ", fmt.Errorf("ES_CLUSTER_URL must be set in the environment variables")) } + if err := validateESAuthConfig(); err != nil { + log.Fatal("Error encountered: ", err) + } + + if esAPIKey != "" { + cleanURL, err := stripURLCredentials(esURL) + if err != nil { + log.Fatal("Error encountered: ", fmt.Errorf("error parsing ES_CLUSTER_URL: %v", err)) + } + return cleanURL + } + if strings.Contains(esURL, "@") { splitIndex := strings.LastIndex(esURL, "@") protocolWithCredentials := strings.Split(esURL[0:splitIndex], "://") @@ -185,6 +199,10 @@ func GetClusterType() *ClusterType { // HiddenIndexSettings to set plugin indices as hidden index func HiddenIndexSettings() string { + // Serverless rejects the `index.hidden` setting on index creation + if IsServerless() { + return "" + } esVersion, _ := v.NewVersion(GetSemanticVersion()) hiddenIndexVersion, _ := v.NewVersion("7.7.0") if esVersion.GreaterThanOrEqual(hiddenIndexVersion) { @@ -204,6 +222,10 @@ func isSniffingEnabled() bool { } func initClient7() { + if err := validateESAuthConfig(); err != nil { + log.Fatal("Error encountered: ", err) + } + var err error // Initialize the ES v7 client @@ -211,7 +233,7 @@ func initClient7() { wrappedLoggerDebug := &WrapKitLoggerDebug{*loggerT} wrappedLoggerError := &WrapKitLoggerError{*loggerT} - client7, err = es7.NewClient( + clientOptions := []es7.ClientOptionFunc{ es7.SetURL(GetESURL()), es7.SetRetrier(NewRetrier()), es7.SetSniff(isSniffingEnabled()), @@ -220,7 +242,12 @@ func initClient7() { es7.SetErrorLog(wrappedLoggerError), es7.SetInfoLog(wrappedLoggerDebug), es7.SetTraceLog(wrappedLoggerDebug), - ) + } + if UsesESAPIKey() { + clientOptions = append(clientOptions, es7.SetHeaders(ESAuthHeaders())) + } + + client7, err = es7.NewClient(clientOptions...) if err != nil { log.Fatal("Error encountered: ", fmt.Errorf("error while initializing elastic v7 client: %v", err)) } diff --git a/util/meta_index.go b/util/meta_index.go new file mode 100644 index 00000000..5d9c49eb --- /dev/null +++ b/util/meta_index.go @@ -0,0 +1,155 @@ +package util + +import ( + "context" + "encoding/json" + "net/http" + "os" + "strings" + "sync" + + es7 "github.com/olivere/elastic/v7" + log "github.com/sirupsen/logrus" +) + +// envMetaIndexPrefix allows forcing an alternate prefix for the internal +// (meta) indices, e.g. RS_META_INDEX_PREFIX=rs_ maps `.pipelines` to +// `rs_pipelines`. When unset, the prefix is applied automatically on +// Elasticsearch Serverless clusters where dot-prefixed index names are +// not allowed. +const envMetaIndexPrefix = "RS_META_INDEX_PREFIX" + +// defaultServerlessMetaIndexPrefix is the prefix automatically applied to +// meta index names on Elasticsearch Serverless. Note: index names cannot +// start with `_`, `-` or `+`, so the prefix must start with a letter. +const defaultServerlessMetaIndexPrefix = "rs_" + +var ( + serverlessOnce sync.Once + serverlessFlag bool +) + +// IsServerless reports whether the upstream cluster is an Elasticsearch +// Serverless project (detected via `version.build_flavor` from GET /). +// The result is cached after the first successful lookup. +func IsServerless() bool { + serverlessOnce.Do(func() { + requestOptions := es7.PerformRequestOptions{ + Method: "GET", + Path: "/", + } + response, err := GetClient7().PerformRequest(context.Background(), requestOptions) + if err != nil { + log.Warnln("error while detecting cluster build flavor: ", err) + return + } + if response.StatusCode != http.StatusOK { + log.Warnln("error while detecting cluster build flavor, got non OK status: ", response.StatusCode) + return + } + + statusMap := make(map[string]interface{}) + if unmarshalErr := json.Unmarshal(response.Body, &statusMap); unmarshalErr != nil { + log.Warnln("error while detecting cluster build flavor: ", unmarshalErr) + return + } + + versionMap, ok := statusMap["version"].(map[string]interface{}) + if !ok { + return + } + buildFlavor, _ := versionMap["build_flavor"].(string) + serverlessFlag = buildFlavor == "serverless" + if serverlessFlag { + log.Infoln("Detected Elasticsearch Serverless cluster, meta indices will use the '" + metaIndexPrefix() + "' prefix instead of the '.' prefix") + } + }) + return serverlessFlag +} + +// metaIndexPrefix returns the prefix to use in place of the leading dot +// for meta index names. +func metaIndexPrefix() string { + if prefix := strings.TrimSpace(os.Getenv(envMetaIndexPrefix)); prefix != "" { + return prefix + } + return defaultServerlessMetaIndexPrefix +} + +// metaIndexRenamingActive reports whether meta index names should be +// rewritten: either explicitly requested via RS_META_INDEX_PREFIX or +// automatically on Elasticsearch Serverless. +func metaIndexRenamingActive() bool { + if strings.TrimSpace(os.Getenv(envMetaIndexPrefix)) != "" { + return true + } + return IsServerless() +} + +// MetaIndexName resolves the actual name to use for an internal (meta) +// index. Dot-prefixed defaults like `.pipelines` are rewritten to a +// cluster-safe name (e.g. `rs_pipelines`) on Elasticsearch Serverless or +// when RS_META_INDEX_PREFIX is set. Names that don't begin with a dot +// (e.g. user-provided overrides) are returned unchanged. +func MetaIndexName(name string) string { + if !strings.HasPrefix(name, ".") { + return name + } + if !metaIndexRenamingActive() { + return name + } + return metaIndexPrefix() + strings.TrimPrefix(name, ".") +} + +// settings that Elasticsearch Serverless rejects (or that are pointless +// there) when creating an index. +var serverlessUnsupportedSettings = []string{ + "index.hidden", + "index.number_of_shards", + "index.number_of_replicas", + "index.auto_expand_replicas", + "hidden", + "number_of_shards", + "number_of_replicas", + "auto_expand_replicas", +} + +// AdaptIndexBody adapts an index creation body (or bare settings object) +// for the target cluster. On Elasticsearch Serverless it strips settings +// that are managed by the platform and rejected on index creation, such +// as shard/replica counts and `index.hidden`. On other clusters the body +// is returned unchanged. +func AdaptIndexBody(body string) string { + if !IsServerless() { + return body + } + + var parsed map[string]interface{} + if err := json.Unmarshal([]byte(body), &parsed); err != nil { + log.Warnln("error while adapting index body for serverless, returning unchanged: ", err) + return body + } + + stripUnsupportedSettings(parsed) + if settings, ok := parsed["settings"].(map[string]interface{}); ok { + stripUnsupportedSettings(settings) + } + + adapted, err := json.Marshal(parsed) + if err != nil { + log.Warnln("error while adapting index body for serverless, returning unchanged: ", err) + return body + } + return string(adapted) +} + +func stripUnsupportedSettings(settings map[string]interface{}) { + for _, key := range serverlessUnsupportedSettings { + delete(settings, key) + } + if index, ok := settings["index"].(map[string]interface{}); ok { + for _, key := range serverlessUnsupportedSettings { + delete(index, key) + } + } +} diff --git a/util/meta_index_test.go b/util/meta_index_test.go new file mode 100644 index 00000000..eb8a7118 --- /dev/null +++ b/util/meta_index_test.go @@ -0,0 +1,49 @@ +package util + +import ( + "os" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestMetaIndexName(t *testing.T) { + Convey("With RS_META_INDEX_PREFIX set", t, func() { + os.Setenv(envMetaIndexPrefix, "rs_") + defer os.Unsetenv(envMetaIndexPrefix) + + Convey("dot-prefixed defaults are rewritten", func() { + So(MetaIndexName(".pipelines"), ShouldEqual, "rs_pipelines") + So(MetaIndexName(".pipeline_logs"), ShouldEqual, "rs_pipeline_logs") + So(MetaIndexName(".rs-synonyms"), ShouldEqual, "rs_rs-synonyms") + }) + + Convey("non-dot names are returned unchanged", func() { + So(MetaIndexName("custom_index"), ShouldEqual, "custom_index") + So(MetaIndexName("rs_pipelines"), ShouldEqual, "rs_pipelines") + }) + }) + + Convey("With a custom prefix", t, func() { + os.Setenv(envMetaIndexPrefix, "meta_") + defer os.Unsetenv(envMetaIndexPrefix) + + So(MetaIndexName(".users"), ShouldEqual, "meta_users") + }) +} + +func TestAdaptIndexBodyParsing(t *testing.T) { + Convey("stripUnsupportedSettings removes serverless-rejected settings", t, func() { + settings := map[string]interface{}{ + "index.hidden": true, + "index.number_of_shards": 3, + "index.number_of_replicas": 1, + "index.max_result_window": 100000, + } + stripUnsupportedSettings(settings) + So(settings, ShouldNotContainKey, "index.hidden") + So(settings, ShouldNotContainKey, "index.number_of_shards") + So(settings, ShouldNotContainKey, "index.number_of_replicas") + So(settings, ShouldContainKey, "index.max_result_window") + }) +} From ef22702090c0235975c04e6c5dc552d2b8f56ebb Mon Sep 17 00:00:00 2001 From: Siddharth Kothari Date: Thu, 11 Jun 2026 22:10:22 +0530 Subject: [PATCH 2/2] chore (build): upgrade RS API to v9.3.0 --- Dockerfile | 2 +- Makefile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1e7795c1..e6f9e7cd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM golang:1.26.4 AS builder -ARG VERSION=9.2.0 +ARG VERSION=9.3.0 ENV VERSION="${VERSION}" # Default value diff --git a/Makefile b/Makefile index 77e6e9f5..393cc550 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ GC=go build BUILD_DIR=build -DEFAULT_VERSION=9.2.0 +DEFAULT_VERSION=9.3.0 VERSION := $(or $(VERSION),$(DEFAULT_VERSION)) cmd: build