Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:1.26.0 AS builder
FROM golang:1.26.4 AS builder

ARG VERSION=9.2.0
ARG VERSION=9.3.0
ENV VERSION="${VERSION}"

# Default value
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
GC=go build

BUILD_DIR=build
DEFAULT_VERSION=9.2.0
DEFAULT_VERSION=9.3.0
VERSION := $(or $(VERSION),$(DEFAULT_VERSION))

cmd: build
Expand Down
4 changes: 3 additions & 1 deletion docs/env-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

Plugins might require certain environment variables to be in order initialize the components it needs for its functioning. Those variables can be declared in any file. The path to that file must be provided via the `--env` flag.

**Note:** `ES_CLUSTER_URL` is used by all the plugins that are interacting with elasticsearch. `USERNAME` and `PASSWORD` are temporary entry point master credentials in order to test the plugins.
**Note:** `ES_CLUSTER_URL` is used by all the plugins that are interacting with elasticsearch. Basic auth credentials may be embedded in `ES_CLUSTER_URL` (for example `https://user:pass@host:443`). Alternatively, set `ES_API_KEY` to use API key auth for upstream elasticsearch (for example Elasticsearch Serverless); `ES_CLUSTER_URL` must not contain credentials when `ES_API_KEY` is set. `USERNAME` and `PASSWORD` are temporary entry point master credentials in order to test the plugins.

**Meta index naming:** ReactiveSearch stores its metadata in dot-prefixed indices (e.g. `.pipelines`, `.users`). Elasticsearch Serverless does not allow creating dot-prefixed indices, so when a Serverless cluster is detected (via `build_flavor` from `GET /`), meta indices are automatically created with the `rs_` prefix instead (e.g. `rs_pipelines`). Set `RS_META_INDEX_PREFIX` to force an alternate prefix on any cluster (the prefix must not start with `_`, `-` or `+`). On Serverless, platform-managed index settings (`index.hidden`, shard/replica counts) are also stripped from index creation requests.

List of specific env vars required by respective plugins are listed below:

Expand Down
2 changes: 1 addition & 1 deletion model/permission/permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (p *Permission) CanAccessIndex(name string) (bool, error) {
if suggestionsIndex == "" {
suggestionsIndex = ".suggestions"
}
indices = append(indices, suggestionsIndex)
indices = append(indices, util.MetaIndexName(suggestionsIndex))
}
for _, pattern := range indices {
matched, err := util.ValidateIndex(pattern, name)
Expand Down
69 changes: 68 additions & 1 deletion model/reindex/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,78 @@ func GetAliasIndexMap(ctx context.Context) (map[string]string, error) {
return res, nil
}

// GetIndexStoreSize returns the primary store size in bytes for an index or alias.
// It uses _cat/indices, which is supported on Elasticsearch Serverless where
// _stats is unavailable.
func GetIndexStoreSize(ctx context.Context, indexName string) (int64, error) {
index := indexName
aliasesIndexMap, err := GetAliasIndexMap(ctx)
if err != nil {
return 0, err
}
if indexNameFromMap, ok := aliasesIndexMap[indexName]; ok {
index = indexNameFromMap
}

if !util.IsServerless() {
stats, err := util.GetClient7().IndexStats(index).Do(ctx)
if err == nil {
if val, ok := stats.Indices[index]; ok {
return val.Primaries.Store.SizeInBytes, nil
}
}
}

return indexStoreSizeFromCat(ctx, index)
}

func indexStoreSizeFromCat(ctx context.Context, index string) (int64, error) {
v := url.Values{}
v.Set("format", "json")
v.Set("bytes", "b")

requestOptions := es7.PerformRequestOptions{
Method: "GET",
Path: "/_cat/indices/" + index,
Params: v,
}
response, err := util.GetClient7().PerformRequest(ctx, requestOptions)
if err != nil {
return 0, err
}
if response.StatusCode > 300 {
return 0, errors.New(string(response.Body))
}

var indices []AliasedIndices
if err := json.Unmarshal(response.Body, &indices); err != nil {
return 0, err
}
if len(indices) == 0 {
return 0, fmt.Errorf(`index "%s" not found`, index)
}

storeSize := indices[0].PriStoreSize
if storeSize == "" {
storeSize = indices[0].StoreSize
}
if isNumericStoreSize(storeSize) {
return strconv.ParseInt(storeSize, 10, 64)
}
return ParseStoreSize(storeSize)
}

func isTaskCompleted(ctx context.Context, taskID string) (bool, error) {
isCompleted := false
url := util.GetESURL() + "/_tasks/" + taskID

response, err := http.Get(url)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
log.Errorln(logTag, " Get task status error", err)
return isCompleted, err
}
util.ApplyESAuth(req)
response, err := util.HTTPClient().Do(req)
if err != nil {
log.Errorln(logTag, " Get task status error", err)
return isCompleted, err
Expand Down
62 changes: 60 additions & 2 deletions model/reindex/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"strconv"
"strings"
"sync"
"unicode"

"github.com/appbaseio/reactivesearch-api/middleware/classify"
"github.com/appbaseio/reactivesearch-api/util"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -70,6 +72,62 @@ var CurrentlyReIndexingProcessMutex = sync.RWMutex{}
// IndexStoreSize to decide whether to use async or sync re-indexing
const IndexStoreSize = int64(100000000)

// ParseStoreSize converts Elasticsearch _cat/indices size strings (e.g. "1.2kb",
// "100mb") or plain byte counts to bytes. A dash or empty value is treated as zero.
func ParseStoreSize(size string) (int64, error) {
size = strings.TrimSpace(strings.ToLower(size))
if size == "" || size == "-" {
return 0, nil
}

multiplier := int64(1)
units := []struct {
suffix string
mult int64
}{
{"pb", 1024 * 1024 * 1024 * 1024 * 1024},
{"tb", 1024 * 1024 * 1024 * 1024},
{"gb", 1024 * 1024 * 1024},
{"mb", 1024 * 1024},
{"kb", 1024},
{"b", 1},
}
for _, unit := range units {
if strings.HasSuffix(size, unit.suffix) {
size = strings.TrimSuffix(size, unit.suffix)
multiplier = unit.mult
break
}
}

size = strings.TrimSpace(size)
if size == "" {
return 0, fmt.Errorf("invalid store size")
}

value, err := strconv.ParseFloat(size, 64)
if err != nil {
return 0, fmt.Errorf("invalid store size %q: %w", size, err)
}
if value < 0 {
return 0, fmt.Errorf("invalid store size %q", size)
}

return int64(value * float64(multiplier)), nil
}

func isNumericStoreSize(size string) bool {
if size == "" {
return false
}
for _, r := range size {
if !unicode.IsDigit(r) {
return false
}
}
return true
}

// reindexedName calculates from the name the number of times an index has been
// reindexed to generate the successive name for the index. For example: for an
// index named "twitter", the funtion returns "twitter_reindexed_1", and for an
Expand Down Expand Up @@ -155,7 +213,7 @@ func getSearchRelevancyIndex() string {
if searchRelevancyIndex == "" {
searchRelevancyIndex = ".searchrelevancy"
}
return searchRelevancyIndex
return util.MetaIndexName(searchRelevancyIndex)
}

// Returns the index name for synonyms
Expand All @@ -164,5 +222,5 @@ func getSynonymsIndex() string {
if synonymsIndex == "" {
synonymsIndex = ".rs-synonyms"
}
return synonymsIndex
return util.MetaIndexName(synonymsIndex)
}
58 changes: 58 additions & 0 deletions model/reindex/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package reindex

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func TestParseStoreSize(t *testing.T) {
Convey("ParseStoreSize", t, func() {
Convey("should treat empty and dash as zero", func() {
size, err := ParseStoreSize("")
So(err, ShouldBeNil)
So(size, ShouldEqual, 0)

size, err = ParseStoreSize("-")
So(err, ShouldBeNil)
So(size, ShouldEqual, 0)
})

Convey("should parse human-readable sizes", func() {
size, err := ParseStoreSize("1.2kb")
So(err, ShouldBeNil)
So(size, ShouldEqual, int64(1228))

size, err = ParseStoreSize("100mb")
So(err, ShouldBeNil)
So(size, ShouldEqual, int64(100*1024*1024))

size, err = ParseStoreSize("2gb")
So(err, ShouldBeNil)
So(size, ShouldEqual, int64(2*1024*1024*1024))
})

Convey("should parse plain byte counts", func() {
size, err := ParseStoreSize("512")
So(err, ShouldBeNil)
So(size, ShouldEqual, 512)

size, err = ParseStoreSize("100b")
So(err, ShouldBeNil)
So(size, ShouldEqual, 100)
})

Convey("should reject invalid values", func() {
_, err := ParseStoreSize("not-a-size")
So(err, ShouldNotBeNil)
})
})
}

func TestIsNumericStoreSize(t *testing.T) {
Convey("isNumericStoreSize", t, func() {
So(isNumericStoreSize("12345"), ShouldBeTrue)
So(isNumericStoreSize("1.2kb"), ShouldBeFalse)
So(isNumericStoreSize(""), ShouldBeFalse)
})
}
11 changes: 11 additions & 0 deletions plugins/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ func (a *Analytics) InitFunc() error {
recentDocumentsIndex = defaultRecentSearchesEsIndex
}

// resolve cluster-safe meta index names (e.g. serverless)
analyticsIndex = util.MetaIndexName(analyticsIndex)
logsIndex = util.MetaIndexName(logsIndex)
userSessionIndex = util.MetaIndexName(userSessionIndex)
analyticsInsightsIndex = util.MetaIndexName(analyticsInsightsIndex)
usersIndex = util.MetaIndexName(usersIndex)
savedSearchedIndex = util.MetaIndexName(savedSearchedIndex)
favoritesIndex = util.MetaIndexName(favoritesIndex)
preferencesIndex = util.MetaIndexName(preferencesIndex)
recentDocumentsIndex = util.MetaIndexName(recentDocumentsIndex)

// initialize the dao
var err error
a.es, err = initPlugin(analyticsIndex, logsIndex, usersIndex, userSessionIndex, analyticsInsightsIndex, savedSearchedIndex, favoritesIndex, mapping, analyticsMapping,
Expand Down
14 changes: 7 additions & 7 deletions plugins/analytics/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func initPlugin(analyticsAlias, logsIndex, usersIndex, userSessionIndex, insight
replicas := util.GetReplicas()
// Analytics index does not exists, create a new one
if !isAnalyticsIndexExist {
settings := fmt.Sprintf(analyticsMapping, analyticsAlias, getAnalyticsMappings(), util.HiddenIndexSettings(), replicas)
settings := util.AdaptIndexBody(fmt.Sprintf(analyticsMapping, analyticsAlias, getAnalyticsMappings(), util.HiddenIndexSettings(), replicas))
analyticsIndex := analyticsAlias + `-000001`
_, err = util.GetClient7().CreateIndex(analyticsIndex).
Body(settings).
Expand Down Expand Up @@ -142,11 +142,11 @@ func initPlugin(analyticsAlias, logsIndex, usersIndex, userSessionIndex, insight
log.Println(logTag, ": successfully created index named", analyticsAlias)
}

settings := fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas)
settings := util.AdaptIndexBody(fmt.Sprintf(mapping, util.HiddenIndexSettings(), replicas))

// User session index does not exists, create a new one
if !isUserSessionIndexExist {
settings := fmt.Sprintf(userSessionMapping, getUserSessionMappings(), util.HiddenIndexSettings(), replicas)
settings := util.AdaptIndexBody(fmt.Sprintf(userSessionMapping, getUserSessionMappings(), util.HiddenIndexSettings(), replicas))
_, err = util.GetClient7().CreateIndex(userSessionIndex).Body(settings).Do(ctx)
if err != nil {
return nil, fmt.Errorf("error while creating index named %s: %v", userSessionIndex, err)
Expand Down Expand Up @@ -183,7 +183,7 @@ func initPlugin(analyticsAlias, logsIndex, usersIndex, userSessionIndex, insight

// If preferences index doesn't exist, create it.
if !isPreferencesIndexExist {
prefsSettings := fmt.Sprintf(preferencesMapping, util.HiddenIndexSettings(), replicas)
prefsSettings := util.AdaptIndexBody(fmt.Sprintf(preferencesMapping, util.HiddenIndexSettings(), replicas))
_, err = util.GetClient7().CreateIndex(preferencesIndex).Body(prefsSettings).Do(ctx)
if err != nil {
return nil, fmt.Errorf("error while creating index named %s: %v", preferencesIndex, err)
Expand Down Expand Up @@ -3043,7 +3043,7 @@ func (es *elasticsearch) rolloverIndexJob(alias string) {
}

json.Unmarshal([]byte(rolloverConfiguration), &rolloverConditions)
settingsString := fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas())
settingsString := util.AdaptIndexBody(fmt.Sprintf(`{%s "index.number_of_shards": 2, "index.number_of_replicas": %d}`, util.HiddenIndexSettings(), util.GetReplicas()))
settings := make(map[string]interface{})
json.Unmarshal([]byte(settingsString), &settings)
rolloverService, err := es7.NewIndicesRolloverService(util.GetClient7()).
Expand Down Expand Up @@ -3787,7 +3787,7 @@ func createRecentSearchesIndex(indexWithSuffix, indexConfig string) (*recentDocu

mappings = fmt.Sprintf(mappings, mappingForType)

settings := fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, mappings)
settings := util.AdaptIndexBody(fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, mappings))

// index does not exists, create a new one
_, err = util.GetClient7().CreateIndex(indexWithSuffix).Body(settings).Do(context.Background())
Expand All @@ -3796,7 +3796,7 @@ func createRecentSearchesIndex(indexWithSuffix, indexConfig string) (*recentDocu
}

// Use fallback method to create the index without the mappings
fallbackSettings := fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, "{}")
fallbackSettings := util.AdaptIndexBody(fmt.Sprintf(indexConfig, util.HiddenIndexSettings(), replicas, "{}"))
_, fallbackErr := util.GetClient7().CreateIndex(indexWithSuffix).Body(fallbackSettings).Do(context.Background())
if fallbackErr != nil {
return nil, fmt.Errorf("error while creating index named %s: %v", indexWithSuffix, err)
Expand Down
4 changes: 2 additions & 2 deletions plugins/analytics/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,15 +915,15 @@ func GetAnalyticsIndex() string {
if analyticsIndex == "" {
analyticsIndex = defaultAnalyticsEsIndex
}
return analyticsIndex
return util.MetaIndexName(analyticsIndex)
}

func GetDocumentSuggestionsIndex() string {
recentDocumentsIndex := os.Getenv(envRecentSearchesEsIndex)
if recentDocumentsIndex == "" {
recentDocumentsIndex = defaultRecentSearchesEsIndex
}
return recentDocumentsIndex
return util.MetaIndexName(recentDocumentsIndex)
}

func getQueryTypeByID(id string, request querytranslate.RSQuery) *querytranslate.QueryType {
Expand Down
3 changes: 2 additions & 1 deletion plugins/applycache/applycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/appbaseio/reactivesearch-api/middleware"
"github.com/appbaseio/reactivesearch-api/plugins"
"github.com/appbaseio/reactivesearch-api/util"
)

const (
Expand Down Expand Up @@ -42,7 +43,7 @@ func Instance() *Cache {
// InitFunc is a part of Plugin interface that gets executed only once, and initializes
// the dao, i.e. elasticsearch before the plugin is operational.
func (c *Cache) InitFunc() error {
indexPrefix := cacheEsIndex
indexPrefix := util.MetaIndexName(cacheEsIndex)

// initialize the dao
var err error
Expand Down
2 changes: 1 addition & 1 deletion plugins/applycache/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func initPlugin(rulesIndex, mapping string) (*elasticsearch, error) {
}

replicas := util.GetReplicas()
settings := fmt.Sprintf(mapping, cacheMapping, util.HiddenIndexSettings(), replicas)
settings := util.AdaptIndexBody(fmt.Sprintf(mapping, cacheMapping, util.HiddenIndexSettings(), replicas))

// Meta index does not exists, create a new one
_, err = util.GetClient7().CreateIndex(rulesIndex).Body(settings).Do(ctx)
Expand Down
Loading
Loading