-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrestore.go
More file actions
167 lines (145 loc) · 5.37 KB
/
restore.go
File metadata and controls
167 lines (145 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package clickhouse
import (
"fmt"
"sort"
"github.com/spf13/cobra"
"github.com/stackvista/stackstate-backup-cli/cmd/cmdutils"
"github.com/stackvista/stackstate-backup-cli/internal/app"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/config"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale"
)
// Restore command flags
var (
restoreSnapshotName string
restoreUseLatest bool
restoreBackground bool
restoreSkipConfirmation bool
)
func restoreCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command {
cmd := &cobra.Command{
Use: "restore",
Short: "Restore ClickHouse from a backup archive",
Long: `Restore ClickHouse data from a backup archive via ClickHouse Backup API. Waits for completion by default; use --background to run asynchronously.`,
Run: func(_ *cobra.Command, _ []string) {
cmdutils.Run(globalFlags, runRestore, cmdutils.MinioIsRequired)
},
}
cmd.Flags().StringVar(&restoreSnapshotName, "snapshot", "", "Specific snapshot/archive name to restore (e.g., full_2025-11-18T11-45-04)")
cmd.Flags().BoolVar(&restoreUseLatest, "latest", false, "Restore from the most recent backup")
cmd.Flags().BoolVar(&restoreBackground, "background", false, "Run restore in background without waiting for completion")
cmd.Flags().BoolVarP(&restoreSkipConfirmation, "yes", "y", false, "Skip confirmation prompt")
cmd.MarkFlagsMutuallyExclusive("snapshot", "latest")
cmd.MarkFlagsOneRequired("snapshot", "latest")
return cmd
}
func runRestore(appCtx *app.Context) error {
// Determine which backup to restore
backupName := restoreSnapshotName
if restoreUseLatest {
appCtx.Logger.Infof("Finding latest backup...")
latest, err := getLatestBackupForRestore(appCtx)
if err != nil {
return err
}
backupName = latest
appCtx.Logger.Infof("Using latest backup: %s", backupName)
}
// Warn user and ask for confirmation
if !restoreSkipConfirmation {
appCtx.Logger.Println()
appCtx.Logger.Warningf("WARNING: Restoring from backup will overwrite existing ClickHouse data!")
appCtx.Logger.Warningf("This operation cannot be undone.")
appCtx.Logger.Println()
appCtx.Logger.Infof("Backup to restore: %s", backupName)
appCtx.Logger.Infof("Namespace: %s", appCtx.Namespace)
appCtx.Logger.Println()
if !restore.PromptForConfirmation() {
return fmt.Errorf("restore operation cancelled by user")
}
}
// Scale down deployments/statefulsets before restore (with lock protection)
appCtx.Logger.Println()
scaleDownLabelSelector := appCtx.Config.Clickhouse.Restore.ScaleDownLabelSelector
_, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{
K8sClient: appCtx.K8sClient,
Namespace: appCtx.Namespace,
LabelSelector: scaleDownLabelSelector,
Datastore: config.DatastoreClickhouse,
AllSelectors: appCtx.Config.GetAllScaleDownSelectors(),
Log: appCtx.Logger,
})
if err != nil {
return err
}
// Execute restore workflow
waitForComplete := !restoreBackground
return executeRestore(appCtx, backupName, waitForComplete)
}
// executeRestore orchestrates the complete ClickHouse restore workflow
func executeRestore(appCtx *app.Context, backupName string, waitForComplete bool) error {
// Setup port-forward to ClickHouse Backup API
pf, err := portforward.SetupPortForward(
appCtx.K8sClient,
appCtx.Namespace,
appCtx.Config.Clickhouse.BackupService.Name,
appCtx.Config.Clickhouse.BackupService.Port,
appCtx.Logger,
)
if err != nil {
return err
}
defer close(pf.StopChan)
// Create CH client with backup API port only
chClient, err := appCtx.NewCHClient(pf.LocalPort, 0)
if err != nil {
return fmt.Errorf("failed to create ClickHouse client: %w", err)
}
// Trigger restore
appCtx.Logger.Println()
appCtx.Logger.Infof("Triggering restore for backup: %s", backupName)
operationID, err := chClient.TriggerRestore(appCtx.Context, backupName)
if err != nil {
return fmt.Errorf("failed to trigger restore: %w", err)
}
appCtx.Logger.Successf("Restore triggered successfully (operation ID: %s)", operationID)
if !waitForComplete {
restore.PrintAPIRunningRestoreStatus("clickhouse", operationID, appCtx.Namespace, appCtx.Logger)
return nil
}
return checkAndFinalize(chClient, appCtx, operationID, waitForComplete)
}
// getLatestBackupForRestore retrieves the most recent backup
func getLatestBackupForRestore(appCtx *app.Context) (string, error) {
// Setup port-forward to ClickHouse Backup API
pf, err := portforward.SetupPortForward(
appCtx.K8sClient,
appCtx.Namespace,
appCtx.Config.Clickhouse.BackupService.Name,
appCtx.Config.Clickhouse.BackupService.Port,
appCtx.Logger,
)
if err != nil {
return "", err
}
defer close(pf.StopChan)
// Create CH client with backup API port only
chClient, err := appCtx.NewCHClient(pf.LocalPort, 0)
if err != nil {
return "", fmt.Errorf("failed to create ClickHouse client: %w", err)
}
// List backups
backups, err := chClient.ListBackups(appCtx.Context)
if err != nil {
return "", fmt.Errorf("failed to list backups: %w", err)
}
if len(backups) == 0 {
return "", fmt.Errorf("no backups found")
}
// Sort by created time (most recent first)
sort.Slice(backups, func(i, j int) bool {
return backups[i].Created > backups[j].Created
})
return backups[0].Name, nil
}