Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/elasticsearch/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *mockESClientForConfigure) IndexExists(_ string) (bool, error) {
return false, fmt.Errorf("not implemented")
}

func (m *mockESClientForConfigure) RestoreSnapshot(_, _, _ string) error {
func (m *mockESClientForConfigure) RestoreSnapshot(_, _, _ string, _ bool) error {
return fmt.Errorf("not implemented")
}

Expand Down
62 changes: 62 additions & 0 deletions cmd/elasticsearch/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"os"

"github.com/spf13/cobra"
"github.com/stackvista/stackstate-backup-cli/cmd/cmdutils"
"github.com/stackvista/stackstate-backup-cli/internal/app"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/config"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward"
)

var describeSnapshotName string

func describeCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command {
cmd := &cobra.Command{
Use: "describe",
Short: "Show detailed information about an Elasticsearch snapshot",
Run: func(_ *cobra.Command, _ []string) {
cmdutils.Run(globalFlags, runDescribeSnapshot, cmdutils.StorageIsRequired)
},
}

cmd.Flags().StringVarP(&describeSnapshotName, "snapshot", "s", "", "Snapshot name to describe")
_ = cmd.MarkFlagRequired("snapshot")

return cmd
}

func runDescribeSnapshot(appCtx *app.Context) error {
serviceName := appCtx.Config.Elasticsearch.Service.Name
remotePort := appCtx.Config.Elasticsearch.Service.Port

pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger)
if err != nil {
return err
}
defer close(pf.StopChan)

esClient, err := appCtx.NewESClient(pf.LocalPort)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

repository := appCtx.Config.Elasticsearch.Restore.Repository
appCtx.Logger.Infof("Fetching snapshot '%s' from repository '%s'...", describeSnapshotName, repository)

snapshot, err := esClient.GetSnapshot(repository, describeSnapshotName)
if err != nil {
return fmt.Errorf("failed to get snapshot: %w", err)
}

data, err := json.MarshalIndent(snapshot, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal snapshot: %w", err)
}

_, err = fmt.Fprintln(os.Stdout, string(data))
return err
}
28 changes: 28 additions & 0 deletions cmd/elasticsearch/describe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package elasticsearch

import (
"testing"

"github.com/stackvista/stackstate-backup-cli/internal/foundation/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDescribeCmd_Unit(t *testing.T) {
flags := config.NewCLIGlobalFlags()
flags.Namespace = testNamespace
flags.ConfigMapName = testConfigMapName
cmd := describeCmd(flags)

assert.Equal(t, "describe", cmd.Use)
assert.Equal(t, "Show detailed information about an Elasticsearch snapshot", cmd.Short)
assert.NotNil(t, cmd.Run)

snapshotFlag := cmd.Flags().Lookup("snapshot")
require.NotNil(t, snapshotFlag)
assert.Equal(t, "s", snapshotFlag.Shorthand)

// Verify snapshot flag is required
annotations := snapshotFlag.Annotations
require.Contains(t, annotations, "cobra_annotation_bash_completion_one_required_flag")
}
1 change: 1 addition & 0 deletions cmd/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func Cmd(globalFlags *config.CLIGlobalFlags) *cobra.Command {

cmd.AddCommand(listCmd(globalFlags))
cmd.AddCommand(listIndicesCmd(globalFlags))
cmd.AddCommand(describeCmd(globalFlags))
cmd.AddCommand(restoreCmd(globalFlags))
cmd.AddCommand(checkAndFinalizeCmd(globalFlags))
cmd.AddCommand(configureCmd(globalFlags))
Expand Down
2 changes: 1 addition & 1 deletion cmd/elasticsearch/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (m *mockESClient) IndexExists(_ string) (bool, error) {
return false, fmt.Errorf("not implemented")
}

func (m *mockESClient) RestoreSnapshot(_, _, _ string) error {
func (m *mockESClient) RestoreSnapshot(_, _, _ string, _ bool) error {
return fmt.Errorf("not implemented")
}

Expand Down
50 changes: 49 additions & 1 deletion cmd/elasticsearch/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
useLatest bool
runBackground bool
skipConfirmation bool
allowPartial bool
)

func restoreCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command {
Expand All @@ -45,6 +46,7 @@ func restoreCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command {
cmd.Flags().BoolVar(&useLatest, "latest", false, "Restore from the most recent snapshot (mutually exclusive with --snapshot)")
cmd.Flags().BoolVar(&runBackground, "background", false, "Run restore in background without waiting for completion")
cmd.Flags().BoolVarP(&skipConfirmation, "yes", "y", false, "Skip confirmation prompt")
cmd.Flags().BoolVar(&allowPartial, "allow-partial", false, "Allow restoring from a PARTIAL snapshot without extra confirmation")
cmd.MarkFlagsMutuallyExclusive("snapshot", "latest")
cmd.MarkFlagsOneRequired("snapshot", "latest")
return cmd
Expand Down Expand Up @@ -81,13 +83,25 @@ func runRestore(appCtx *app.Context) error {
appCtx.Logger.Successf("Latest snapshot found: %s", selectedSnapshot)
}

// Fetch snapshot details to check state
snapshotDetails, err := esClient.GetSnapshot(repository, selectedSnapshot)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}

// Validate snapshot state
if err := validateSnapshotState(snapshotDetails, appCtx, skipConfirmation, allowPartial); err != nil {
return err
}

// Confirm with user before starting destructive operation
if !skipConfirmation {
appCtx.Logger.Println()
appCtx.Logger.Warningf("WARNING: Restoring from snapshot will DELETE all existing STS indices!")
appCtx.Logger.Warningf("This operation cannot be undone.")
appCtx.Logger.Println()
appCtx.Logger.Infof("Snapshot to restore: %s", selectedSnapshot)
appCtx.Logger.Infof("Snapshot state: %s", snapshotDetails.State)
appCtx.Logger.Infof("Namespace: %s", appCtx.Namespace)
appCtx.Logger.Println()

Expand Down Expand Up @@ -119,8 +133,9 @@ func runRestore(appCtx *app.Context) error {

// Trigger async restore
appCtx.Logger.Println()
isPartial := snapshotDetails.State == "PARTIAL"
appCtx.Logger.Infof("Triggering restore for snapshot: %s", selectedSnapshot)
if err := esClient.RestoreSnapshot(repository, selectedSnapshot, appCtx.Config.Elasticsearch.Restore.IndicesPattern); err != nil {
if err := esClient.RestoreSnapshot(repository, selectedSnapshot, appCtx.Config.Elasticsearch.Restore.IndicesPattern, isPartial); err != nil {
return fmt.Errorf("failed to trigger restore: %w", err)
}
appCtx.Logger.Successf("Restore triggered successfully")
Expand Down Expand Up @@ -152,6 +167,39 @@ func getLatestSnapshot(esClient es.Interface, repository string) (string, error)
return snapshots[0].Snapshot, nil
}

// validateSnapshotState checks the snapshot state and handles PARTIAL snapshots
func validateSnapshotState(snapshot *es.Snapshot, appCtx *app.Context, skipConfirm, allowPartialFlag bool) error {
switch snapshot.State {
case es.StatusSuccess:
return nil
case "PARTIAL":
failureCount := len(snapshot.Failures)
if allowPartialFlag {
appCtx.Logger.Warningf("Snapshot '%s' is PARTIAL (%d shard failure(s)), proceeding due to --allow-partial flag",
snapshot.Snapshot, failureCount)
return nil
}
if skipConfirm {
return fmt.Errorf("snapshot '%s' is PARTIAL with %d shard failure(s); "+
"use --allow-partial together with --yes to restore a partial snapshot non-interactively",
snapshot.Snapshot, failureCount)
}
// Interactive mode: warn and ask for explicit confirmation
appCtx.Logger.Println()
appCtx.Logger.Warningf("WARNING: Snapshot '%s' is in PARTIAL state!", snapshot.Snapshot)
appCtx.Logger.Warningf(" %d shard(s) failed out of %d total (%d successful)",
snapshot.Shards.Failed, snapshot.Shards.Total, snapshot.Shards.Successful)
appCtx.Logger.Warningf("Restoring this snapshot will result in incomplete data for the failed shards.")
appCtx.Logger.Println()
if !restore.PromptForConfirmation() {
return fmt.Errorf("restore operation cancelled by user")
}
return nil
default:
return fmt.Errorf("snapshot '%s' is in %s state and cannot be restored", snapshot.Snapshot, snapshot.State)
}
}

// deleteAllSTSIndices deletes all STS indices including datastream rollover if needed
func deleteAllSTSIndices(esClient es.Interface, appCtx *app.Context) error {
appCtx.Logger.Infof("Fetching current Elasticsearch indices...")
Expand Down
94 changes: 92 additions & 2 deletions cmd/elasticsearch/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"testing"
"time"

"github.com/stackvista/stackstate-backup-cli/internal/app"
"github.com/stackvista/stackstate-backup-cli/internal/clients/elasticsearch"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/config"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -60,7 +62,7 @@ func (m *mockESClientForRestore) IndexExists(index string) (bool, error) {
return exists, nil
}

func (m *mockESClientForRestore) RestoreSnapshot(_, snapshotName, _ string) error {
func (m *mockESClientForRestore) RestoreSnapshot(_, snapshotName, _ string, _ bool) error {
if m.restoreErr != nil {
return m.restoreErr
}
Expand Down Expand Up @@ -348,7 +350,7 @@ func TestMockESClientForRestore(t *testing.T) {
}

// Test restore
err := mockClient.RestoreSnapshot("backup-repo", "test-snapshot", "sts_*")
err := mockClient.RestoreSnapshot("backup-repo", "test-snapshot", "sts_*", false)
if tt.expectRestoreOK {
assert.NoError(t, err)
assert.Equal(t, "test-snapshot", mockClient.restoredSnapshot)
Expand Down Expand Up @@ -430,6 +432,94 @@ func TestRestoreSnapshot_Integration(t *testing.T) {
assert.Equal(t, 3, len(snapshot.Indices))
}

// TestRestoreCmd_AllowPartialFlag tests that the allow-partial flag is registered
func TestRestoreCmd_AllowPartialFlag(t *testing.T) {
flags := config.NewCLIGlobalFlags()
flags.Namespace = testNamespace
flags.ConfigMapName = testConfigMapName
cmd := restoreCmd(flags)

allowPartialFlag := cmd.Flags().Lookup("allow-partial")
require.NotNil(t, allowPartialFlag)
assert.Equal(t, "false", allowPartialFlag.DefValue)
}

// TestValidateSnapshotState tests snapshot state validation logic
func TestValidateSnapshotState(t *testing.T) {
appCtx := &app.Context{
Logger: logger.New(false, false),
}

tests := []struct {
name string
state string
failures []elasticsearch.SnapshotFailure
shards struct{ Total, Failed, Successful int }
skipConfirm bool
allowPartial bool
expectErr bool
errContains string
}{
{
name: "SUCCESS state passes",
state: "SUCCESS",
},
{
name: "PARTIAL with --allow-partial passes",
state: "PARTIAL",
failures: []elasticsearch.SnapshotFailure{{Index: "idx1", Reason: "conn refused"}},
allowPartial: true,
},
{
name: "PARTIAL with --yes but no --allow-partial fails",
state: "PARTIAL",
failures: []elasticsearch.SnapshotFailure{{Index: "idx1", Reason: "conn refused"}},
skipConfirm: true,
expectErr: true,
errContains: "use --allow-partial together with --yes",
},
{
name: "FAILED state is rejected",
state: "FAILED",
expectErr: true,
errContains: "FAILED state and cannot be restored",
},
{
name: "IN_PROGRESS state is rejected",
state: "IN_PROGRESS",
expectErr: true,
errContains: "IN_PROGRESS state and cannot be restored",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
snapshot := &elasticsearch.Snapshot{
Snapshot: "test-snapshot",
State: tt.state,
Failures: tt.failures,
Shards: struct {
Total int `json:"total"`
Failed int `json:"failed"`
Successful int `json:"successful"`
}{
Total: tt.shards.Total,
Failed: tt.shards.Failed,
Successful: tt.shards.Successful,
},
}

err := validateSnapshotState(snapshot, appCtx, tt.skipConfirm, tt.allowPartial)
if tt.expectErr {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.errContains)
} else {
assert.NoError(t, err)
}
})
}
}

// TestRestoreConstants tests the restore command constants
func TestRestoreConstants(t *testing.T) {
assert.Equal(t, 30, defaultMaxIndexDeleteAttempts)
Expand Down
35 changes: 23 additions & 12 deletions internal/clients/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,29 @@ type IndexInfo struct {
DatasetSize string `json:"dataset.size"`
}

// SnapshotFailure represents a shard-level failure in an Elasticsearch snapshot
type SnapshotFailure struct {
Index string `json:"index"`
IndexUUID string `json:"index_uuid"`
ShardID int `json:"shard_id"`
Reason string `json:"reason"`
NodeID string `json:"node_id"`
Status string `json:"status"`
}

// Snapshot represents an Elasticsearch snapshot
type Snapshot struct {
Snapshot string `json:"snapshot"`
UUID string `json:"uuid"`
Repository string `json:"repository"`
State string `json:"state"`
StartTime string `json:"start_time"`
StartTimeMillis int64 `json:"start_time_in_millis"`
EndTime string `json:"end_time"`
EndTimeMillis int64 `json:"end_time_in_millis"`
DurationInMillis int64 `json:"duration_in_millis"`
Indices []string `json:"indices"`
Failures []string `json:"failures"`
Snapshot string `json:"snapshot"`
UUID string `json:"uuid"`
Repository string `json:"repository"`
State string `json:"state"`
StartTime string `json:"start_time"`
StartTimeMillis int64 `json:"start_time_in_millis"`
EndTime string `json:"end_time"`
EndTimeMillis int64 `json:"end_time_in_millis"`
DurationInMillis int64 `json:"duration_in_millis"`
Indices []string `json:"indices"`
Failures []SnapshotFailure `json:"failures"`
Shards struct {
Total int `json:"total"`
Failed int `json:"failed"`
Expand Down Expand Up @@ -350,9 +360,10 @@ func (c *Client) ConfigureSLMPolicy(name, schedule, snapshotName, repository, in
// RestoreSnapshot restores a snapshot from a repository asynchronously
// The restore is triggered and returns immediately (waitForCompletion=false)
// Use GetRestoreStatus to check the progress of the restore operation
func (c *Client) RestoreSnapshot(repository, snapshotName, indicesPattern string) error {
func (c *Client) RestoreSnapshot(repository, snapshotName, indicesPattern string, partial bool) error {
body := map[string]interface{}{
"indices": indicesPattern,
"partial": partial,
}

bodyJSON, err := json.Marshal(body)
Expand Down
2 changes: 1 addition & 1 deletion internal/clients/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestClient_RestoreSnapshot(t *testing.T) {
require.NoError(t, err)

// Execute test
err = client.RestoreSnapshot(tt.repository, tt.snapshotName, tt.indicesPattern)
err = client.RestoreSnapshot(tt.repository, tt.snapshotName, tt.indicesPattern, false)

// Assertions
if tt.expectError {
Expand Down
Loading
Loading