diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 69fc6380..bf17a5ff 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -71,7 +71,7 @@ jobs: with: VAULT_PASSWORD: ${{ secrets.VAULT_PASSWORD }} - - uses: actions/upload-artifact@v6 + - uses: actions/upload-artifact@v7 with: name: build-artifacts path: | @@ -82,7 +82,7 @@ jobs: if-no-files-found: error retention-days: 1 - - uses: actions/upload-artifact@v6 + - uses: actions/upload-artifact@v7 with: name: build-test-artifacts path: | @@ -180,7 +180,7 @@ jobs: flag-name: testflows-${{ matrix.clickhouse }} # todo possible failures https://github.com/actions/upload-artifact/issues/270 - name: Upload testflows logs - uses: actions/upload-artifact@v6 + uses: actions/upload-artifact@v7 with: name: testflows-logs-and-reports-${{ matrix.clickhouse }}-${{ github.run_id }} path: | @@ -354,9 +354,9 @@ jobs: echo "docker_tag=${DOCKER_TAG:-dev}" >> $GITHUB_OUTPUT - name: Set up QEMU - uses: docker/setup-qemu-action@v3 + uses: docker/setup-qemu-action@v4 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 + uses: docker/setup-buildx-action@v4 - name: Building docker image env: diff --git a/ChangeLog.md b/ChangeLog.md index 315aa70f..3e742e74 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,8 @@ +# unreleased + +NEW FEATURES +- add `clean_broken_retention` CLI command — walks top-level of remote `path` and `object_disks_path` and batch-deletes (with retry) every entry that is not present in the live backup list and not matched by any `--keep=`. Dry-run by default; pass `--commit` to actually delete. Useful for cleaning up orphans left by failed retention runs + # v2.6.43 NEW FEATURES diff --git a/Manual.md b/Manual.md index b861a58b..6ca8f26f 100644 --- a/Manual.md +++ b/Manual.md @@ -310,6 +310,24 @@ OPTIONS: --config value, -c value Config 'FILE' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG] --environment-override value, --env value override any environment variable via CLI parameter +``` +### CLI command - clean_broken_retention +``` +NAME: + clickhouse-backup clean_broken_retention - Remove orphan entries under remote `path` and `object_disks_path` that are not in the live backup list + +USAGE: + clickhouse-backup clean_broken_retention [--commit] [--keep=glob ...] + +DESCRIPTION: + Walks top-level of remote `path` and `object_disks_path`, batch-deletes (with retry) every entry that is not a live backup and does not match any --keep glob. Runs in dry-run mode unless --commit is set. + +OPTIONS: + --config value, -c value Config 'FILE' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG] + --environment-override value, --env value override any environment variable via CLI parameter + --keep value Glob (path.Match syntax) of backup names to preserve in addition to live backups; can be passed multiple times + --commit Actually delete orphans; without this flag the command only logs what would be deleted + ``` ### CLI command - watch ``` @@ -345,6 +363,14 @@ Look at the system.parts partition and partition_id fields for details https://c --delete, --delete-source, --delete-local explicitly delete local backup during upload ``` +### CLI command - acvp +``` +NAME: + clickhouse-backup acvp - Run ACVP wrapper protocol over stdin/stdout + +USAGE: + clickhouse-backup acvp +``` ### CLI command - server ``` NAME: diff --git a/ReadMe.md b/ReadMe.md index bbd64e2e..be1d415d 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -972,6 +972,24 @@ OPTIONS: --config value, -c value Config 'FILE' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG] --environment-override value, --env value override any environment variable via CLI parameter +``` +### CLI command - clean_broken_retention +``` +NAME: + clickhouse-backup clean_broken_retention - Remove orphan entries under remote `path` and `object_disks_path` that are not in the live backup list + +USAGE: + clickhouse-backup clean_broken_retention [--commit] [--keep=glob ...] + +DESCRIPTION: + Walks top-level of remote `path` and `object_disks_path`, batch-deletes (with retry) every entry that is not a live backup and does not match any --keep glob. Runs in dry-run mode unless --commit is set. + +OPTIONS: + --config value, -c value Config 'FILE' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG] + --environment-override value, --env value override any environment variable via CLI parameter + --keep value Glob (path.Match syntax) of backup names to preserve in addition to live backups; can be passed multiple times + --commit Actually delete orphans; without this flag the command only logs what would be deleted + ``` ### CLI command - watch ``` @@ -1007,6 +1025,14 @@ Look at the system.parts partition and partition_id fields for details https://c --delete, --delete-source, --delete-local explicitly delete local backup during upload ``` +### CLI command - acvp +``` +NAME: + clickhouse-backup acvp - Run ACVP wrapper protocol over stdin/stdout + +USAGE: + clickhouse-backup acvp +``` ### CLI command - server ``` NAME: diff --git a/cmd/clickhouse-backup/main.go b/cmd/clickhouse-backup/main.go index 1302abd1..27f6798e 100644 --- a/cmd/clickhouse-backup/main.go +++ b/cmd/clickhouse-backup/main.go @@ -700,13 +700,19 @@ func main() { Flags: cliapp.Flags, }, { - Name: "clean_remote_broken", - Usage: "Remove all broken remote backups", + Name: "clean_remote_broken", + Usage: "Remove all broken remote backups", + UsageText: "clickhouse-backup clean_remote_broken [--include=glob ...]", Action: func(c *cli.Context) error { b := backup.NewBackuper(config.GetConfigFromCli(c)) - return b.CleanRemoteBroken(status.NotFromAPI) + return b.CleanRemoteBroken(status.NotFromAPI, c.StringSlice("include")) }, - Flags: cliapp.Flags, + Flags: append(cliapp.Flags, + cli.StringSliceFlag{ + Name: "include", + Usage: "Glob (path.Match syntax) to scope cleanup only to broken backup names matching these patterns; can be passed multiple times; if omitted, all broken backups are deleted", + }, + ), }, { Name: "clean_local_broken", @@ -717,6 +723,30 @@ func main() { }, Flags: cliapp.Flags, }, + { + Name: "clean_broken_retention", + Usage: "Remove orphan entries under remote `path` and `object_disks_path` that are not in the live backup list", + UsageText: "clickhouse-backup clean_broken_retention [--commit] [--include=glob ...] [--exclude=glob ...]", + Description: "Walks top-level of remote `path` and `object_disks_path`, batch-deletes (with retry) every entry that is not a live backup and is not excluded by --exclude globs and is matched by --include globs (if provided). Object disk orphans are deleted in parallel with progress tracking. Pass --commit to actually delete; without it the command only logs what would be deleted.", + Action: func(c *cli.Context) error { + b := backup.NewBackuper(config.GetConfigFromCli(c)) + return b.CleanBrokenRetention(status.NotFromAPI, c.StringSlice("include"), c.StringSlice("exclude"), c.Bool("commit")) + }, + Flags: append(cliapp.Flags, + cli.StringSliceFlag{ + Name: "include", + Usage: "Glob (path.Match syntax) to scope cleanup only to backup names matching these patterns; can be passed multiple times; if omitted, all orphans are candidates", + }, + cli.StringSliceFlag{ + Name: "exclude", + Usage: "Glob (path.Match syntax) of backup names to preserve even if they appear as orphans; can be passed multiple times", + }, + cli.BoolFlag{ + Name: "commit", + Usage: "Actually delete orphans; without this flag the command only logs what would be deleted", + }, + ), + }, { Name: "watch", diff --git a/generate_manual.sh b/generate_manual.sh index 4ca7f62f..f515c9f0 100644 --- a/generate_manual.sh +++ b/generate_manual.sh @@ -16,7 +16,9 @@ cmds=( clean clean_remote_broken clean_local_broken + clean_broken_retention watch + acvp server ) for cmd in ${cmds[@]}; do diff --git a/pkg/backup/backuper.go b/pkg/backup/backuper.go index c627c77a..61a13285 100644 --- a/pkg/backup/backuper.go +++ b/pkg/backup/backuper.go @@ -392,6 +392,24 @@ func (b *Backuper) buildEmbeddedLocationAZBLOB() string { return fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;", b.cfg.AzureBlob.EndpointSchema, b.cfg.AzureBlob.AccountName, b.cfg.AzureBlob.AccountKey, azblobBackupURL.String()) } +func (b *Backuper) getBackupPath() (string, error) { + switch b.cfg.General.RemoteStorage { + case "s3": + return b.cfg.S3.Path, nil + case "azblob": + return b.cfg.AzureBlob.Path, nil + case "gcs": + return b.cfg.GCS.Path, nil + case "cos": + return b.cfg.COS.Path, nil + case "ftp": + return b.cfg.FTP.Path, nil + case "sftp": + return b.cfg.SFTP.Path, nil + } + return "", errors.Errorf("getBackupPath: unsupported remote_storage: %s", b.cfg.General.RemoteStorage) +} + func (b *Backuper) getObjectDiskPath() (string, error) { if b.cfg.General.RemoteStorage == "s3" { return b.cfg.S3.ObjectDiskPath, nil diff --git a/pkg/backup/delete.go b/pkg/backup/delete.go index 5009a744..228f7265 100644 --- a/pkg/backup/delete.go +++ b/pkg/backup/delete.go @@ -2,11 +2,13 @@ package backup import ( "context" + "fmt" "io/fs" "os" "path" "path/filepath" "strings" + "sync/atomic" "time" "github.com/Altinity/clickhouse-backup/v2/pkg/common" @@ -14,6 +16,7 @@ import ( "github.com/Altinity/clickhouse-backup/v2/pkg/clickhouse" "github.com/Altinity/clickhouse-backup/v2/pkg/custom" + "github.com/Altinity/clickhouse-backup/v2/pkg/metadata" "github.com/Altinity/clickhouse-backup/v2/pkg/status" "github.com/Altinity/clickhouse-backup/v2/pkg/storage" "github.com/Altinity/clickhouse-backup/v2/pkg/storage/object_disk" @@ -22,6 +25,7 @@ import ( "github.com/eapache/go-resiliency/retrier" "github.com/pkg/errors" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" ) // Clean - removed all data in shadow folder @@ -482,22 +486,35 @@ func (b *Backuper) cleanBackupObjectDisks(ctx context.Context, backupName string return totalDeleted, nil } - // Fallback: one-by-one deletion (should not happen if all storage types implement BatchDeleter) - log.Warn().Msgf("cleanBackupObjectDisks: %s does not implement BatchDeleter, falling back to one-by-one deletion", b.dst.Kind()) + // Fallback: one-by-one deletion (streaming — no in-memory accumulation). + // FTP/SFTP do not implement BatchDeleter (they have DeleteKeysFromObjectDiskBackup + // without the "Batch" suffix required by the interface), so they always hit this path. + log.Warn().Msgf("cleanBackupObjectDisks: %s does not implement BatchDeleter, falling back to streaming one-by-one deletion", b.dst.Kind()) deletedKeys := uint(0) - walkErr := b.dst.WalkAbsolute(ctx, path.Join(objectDiskPath, backupName), true, func(ctx context.Context, f storage.RemoteFile) error { - if b.dst.Kind() == "azblob" { - if f.Size() > 0 || !f.LastModified().IsZero() { - deletedKeys += 1 - return b.dst.DeleteFileFromObjectDiskBackup(ctx, path.Join(backupName, f.Name())) - } - + _ = b.dst.WalkAbsolute(ctx, path.Join(objectDiskPath, backupName), true, func(ctx context.Context, f storage.RemoteFile) error { + if f.Name() == "" || f.Name() == "." { return nil } - deletedKeys += 1 - return b.dst.DeleteFileFromObjectDiskBackup(ctx, path.Join(backupName, f.Name())) + key := path.Join(backupName, f.Name()) + // Use DeleteFileFromObjectDiskBackup which constructs the correct absolute + // path via Config.ObjectDiskPath. For directories this triggers recursive + // deletion (SFTP DeleteDirectory / FTP RemoveDirRecur) which may remove + // entries that the in-progress walker hasn't visited yet, causing the + // walker to later fail with "file does not exist". That's harmless — the + // data is already deleted — so we discard the walk error below. + if err := b.dst.DeleteFileFromObjectDiskBackup(ctx, key); err != nil { + log.Debug().Err(err).Str("key", key).Msg("cleanBackupObjectDisks: delete failed") + } else { + deletedKeys++ + } + return nil }) - return deletedKeys, walkErr + // Walk error is expected when RemoveDirRecur/DeleteDirectory removed children + // that the walker hadn't stepped to yet. Data is already gone — no need to fail. + if deletedKeys > 0 { + _ = b.dst.DeleteFileFromObjectDiskBackup(ctx, backupName) + } + return deletedKeys, nil } func (b *Backuper) skipIfSameLocalBackupPresent(ctx context.Context, backupName, tags string) (bool, error) { @@ -535,7 +552,7 @@ func (b *Backuper) CleanLocalBroken(commandId int) error { return nil } -func (b *Backuper) CleanRemoteBroken(commandId int) error { +func (b *Backuper) CleanRemoteBroken(commandId int, includeGlobs []string) error { ctx, cancel, err := status.Current.GetContextWithCancel(commandId) if err != nil { return errors.WithMessage(err, "status.Current.GetContextWithCancel") @@ -543,20 +560,309 @@ func (b *Backuper) CleanRemoteBroken(commandId int) error { ctx, cancel = context.WithCancel(ctx) defer cancel() + for _, g := range includeGlobs { + if _, err := path.Match(g, ""); err != nil { + return errors.Wrapf(err, "invalid include-glob %q", g) + } + } + remoteBackups, err := b.GetRemoteBackups(ctx, true) if err != nil { return errors.WithMessage(err, "b.GetRemoteBackups") } for _, backup := range remoteBackups { - if backup.Broken != "" { - if err = b.RemoveBackupRemote(ctx, backup.BackupName); err != nil { - return errors.WithMessage(err, "b.RemoveBackupRemote") + if backup.Broken == "" { + continue + } + if len(includeGlobs) > 0 { + matched := false + for _, g := range includeGlobs { + if ok, _ := path.Match(g, backup.BackupName); ok { + matched = true + break + } + } + if !matched { + continue + } + } + if err = b.RemoveBackupRemote(ctx, backup.BackupName); err != nil { + return errors.WithMessage(err, "b.RemoveBackupRemote") + } + } + return nil +} + +// CleanBrokenRetention walks remote `path` and `object_disks_path` top-level entries +// and removes everything that is NOT present in the live BackupList and NOT matched by excludeGlobs. +// Uses BatchDeleter with retry and parallel batch deletion for object_disks_path orphans. +// When commit=false, only logs orphans without deleting (dry-run mode). +// When includeGlobs is non-empty, only orphans matching at least one includeGlob are considered. +// excludeGlobs and includeGlobs follow path.Match syntax (e.g. "prod-*", "snapshot-2026-??-*"). +func (b *Backuper) CleanBrokenRetention(commandId int, includeGlobs, excludeGlobs []string, commit bool) error { + ctx, cancel, err := status.Current.GetContextWithCancel(commandId) + if err != nil { + return errors.WithMessage(err, "status.Current.GetContextWithCancel") + } + ctx, cancel = context.WithCancel(ctx) + defer cancel() + + if b.cfg.General.RemoteStorage == "none" { + return errors.New("aborted: RemoteStorage set to \"none\"") + } + if b.cfg.General.RemoteStorage == "custom" { + return errors.New("aborted: clean_broken_retention does not support custom remote storage") + } + for _, g := range excludeGlobs { + if _, err := path.Match(g, ""); err != nil { + return errors.Wrapf(err, "invalid exclude-glob %q", g) + } + } + for _, g := range includeGlobs { + if _, err := path.Match(g, ""); err != nil { + return errors.Wrapf(err, "invalid include-glob %q", g) + } + } + if err := b.ch.Connect(); err != nil { + return errors.Wrap(err, "can't connect to clickhouse") + } + defer b.ch.Close() + + bd, err := storage.NewBackupDestination(ctx, b.cfg, b.ch, "") + if err != nil { + return errors.WithMessage(err, "storage.NewBackupDestination") + } + if err = bd.Connect(ctx); err != nil { + return errors.Wrap(err, "can't connect to remote storage") + } + defer func() { + if closeErr := bd.Close(ctx); closeErr != nil { + log.Warn().Msgf("can't close BackupDestination error: %v", closeErr) + } + }() + b.dst = bd + + // parseMetadata=true forces a metadata.json stat for every top-level entry. + // Broken backups (e.g. upload still in progress) are still kept — they are + // known backups and not orphans. + backupList, err := bd.BackupList(ctx, true, "") + if err != nil { + return errors.WithMessage(err, "bd.BackupList") + } + keepNames := make(map[string]struct{}, len(backupList)) + liveCount := 0 + for _, backup := range backupList { + keepNames[backup.BackupName] = struct{}{} + if backup.Broken == "" { + liveCount++ + } + } + isKept := func(name string) bool { + // Live backups are always preserved. + if _, ok := keepNames[name]; ok { + return true + } + // If --include is specified, only consider names matching at least one includeGlob. + if len(includeGlobs) > 0 { + matched := false + for _, g := range includeGlobs { + if ok, _ := path.Match(g, name); ok { + matched = true + break + } + } + if !matched { + return true + } + } + // --exclude globs preserve matched entries from deletion. + for _, g := range excludeGlobs { + if ok, _ := path.Match(g, name); ok { + return true } } + return false + } + + mode := "dry-run" + if commit { + mode = "commit" + } + log.Info().Msgf("clean_broken_retention: mode=%s, %d live backups (of %d in remote list), %d include-globs, %d exclude-globs", mode, liveCount, len(backupList), len(includeGlobs), len(excludeGlobs)) + + objectDiskPath, err := b.getObjectDiskPath() + if err != nil { + return errors.WithMessage(err, "b.getObjectDiskPath") + } + + topObjName := "" + if objectDiskPath != "" { + topObjName = path.Base(objectDiskPath) + } + + backupPath, err := b.getBackupPath() + if err != nil { + return errors.WithMessage(err, "b.getBackupPath") + } + + orphansInPath, err := b.findOrphanTopLevelNames(ctx, bd, backupPath, isKept) + if err != nil { + return errors.WithMessage(err, "scan path orphans") + } + + if topObjName != "" { + filtered := make([]string, 0, len(orphansInPath)) + for _, name := range orphansInPath { + if name != topObjName { + filtered = append(filtered, name) + } + } + orphansInPath = filtered + } + + for _, name := range orphansInPath { + if !commit { + log.Info().Str("orphan", name).Str("location", "path").Msg("clean_broken_retention: would delete") + continue + } + log.Info().Str("orphan", name).Str("location", "path").Msg("clean_broken_retention: deleting") + if err := bd.RemoveBackupRemote(ctx, storage.Backup{BackupMetadata: metadata.BackupMetadata{BackupName: name}}, b.cfg, b); err != nil { + return errors.Wrapf(err, "bd.RemoveBackupRemote orphan %s", name) + } + } + + if objectDiskPath == "" { + log.Info().Msgf("clean_broken_retention: done, mode=%s, path orphans=%d, object_disks_path: not configured", mode, len(orphansInPath)) + return nil + } + + orphansInObj, err := b.findOrphanTopLevelNames(ctx, bd, objectDiskPath, isKept) + if err != nil { + return errors.WithMessage(err, "scan object_disks_path orphans") + } + totalObj := len(orphansInObj) + if totalObj == 0 { + log.Info().Msgf("clean_broken_retention: done, mode=%s, path orphans=%d, object_disks_path orphans=0", mode, len(orphansInPath)) + return nil + } + + if !commit { + for _, name := range orphansInObj { + log.Info().Str("orphan", name).Str("location", "object_disks_path").Msg("clean_broken_retention: would delete") + } + log.Info().Msgf("clean_broken_retention: done, mode=%s, path orphans=%d, object_disks_path orphans=%d", mode, len(orphansInPath), totalObj) + return nil + } + + parallel := int(b.cfg.General.UploadConcurrency) + if parallel < 1 { + parallel = 4 + } + + log.Info().Msgf("clean_broken_retention: deleting %d object_disks_path orphans, concurrency=%d", totalObj, parallel) + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(parallel) + + var doneCount atomic.Int64 + var failCount atomic.Int64 + startTime := time.Now() + logTicker := time.NewTicker(30 * time.Second) + defer logTicker.Stop() + + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + for { + select { + case <-egCtx.Done(): + return + case <-logTicker.C: + d := doneCount.Load() + f := failCount.Load() + log.Info().Msgf("clean_broken_retention: progress [%d/%d] done=%d fail=%d elapsed=%s", + d+f, totalObj, d, f, utils.HumanizeDuration(time.Since(startTime))) + } + } + }() + + for _, name := range orphansInObj { + orphan := name + eg.Go(func() error { + select { + case <-egCtx.Done(): + return egCtx.Err() + default: + } + start := time.Now() + deletedKeys, deleteErr := b.cleanBackupObjectDisks(egCtx, orphan) + if deleteErr != nil { + failCount.Add(1) + log.Warn().Err(deleteErr).Str("orphan", orphan).Msg("clean_broken_retention: deletion failed") + // Don't abort the whole group — log and continue with other orphans + return nil + } + doneCount.Add(1) + d := doneCount.Load() + f := failCount.Load() + log.Info(). + Str("orphan", orphan). + Uint("deleted_keys", deletedKeys). + Str("duration", utils.HumanizeDuration(time.Since(start))). + Msgf("clean_broken_retention: [%d/%d] done fail=%d", d+f, totalObj, f) + return nil + }) + } + _ = eg.Wait() // errors are handled inside the goroutines (non-fatal) + + elapsed := time.Since(startTime) + d := doneCount.Load() + f := failCount.Load() + log.Info().Msgf("clean_broken_retention: done, mode=%s, path orphans=%d, object_disks_path orphans=%d/%d (done=%d fail=%d) elapsed=%s", + mode, len(orphansInPath), d, totalObj, d, f, utils.HumanizeDuration(elapsed)) + if f > 0 { + return fmt.Errorf("clean_broken_retention: %d of %d object_disks_path orphans failed to delete", f, totalObj) } return nil } +// findOrphanTopLevelNames lists top-level entries under rootPath (absolute when rootPath != "/") +// and returns names that are not kept by isKept. Top-level only: any names containing "/" are skipped. +func (b *Backuper) findOrphanTopLevelNames(ctx context.Context, bd *storage.BackupDestination, rootPath string, isKept func(string) bool) ([]string, error) { + seen := make(map[string]struct{}) + walkFn := func(_ context.Context, f storage.RemoteFile) error { + // Walk("/", false) emits names that may have a leading "/" (S3) and/or trailing "/" (CommonPrefix). + name := strings.Trim(f.Name(), "/") + if name == "" || strings.Contains(name, "/") { + return nil + } + // Skip hidden/dotfile entries — clickhouse-backup never produces names starting with ".". + // Protects system dirs like /root/.ssh on filesystem-backed remotes (SFTP/FTP). + if strings.HasPrefix(name, ".") { + return nil + } + if isKept(name) { + return nil + } + seen[name] = struct{}{} + return nil + } + var err error + if rootPath == "/" || rootPath == "" { + err = bd.Walk(ctx, "/", false, walkFn) + } else { + err = bd.WalkAbsolute(ctx, rootPath, false, walkFn) + } + if err != nil { + return nil, errors.Wrapf(err, "walk %q", rootPath) + } + out := make([]string, 0, len(seen)) + for n := range seen { + out = append(out, n) + } + return out, nil +} + func (b *Backuper) cleanPartialRequiredBackup(ctx context.Context, disks []clickhouse.Disk, currentBackupName string) error { if localBackups, _, err := b.GetLocalBackups(ctx, disks); err == nil { for _, localBackup := range localBackups { diff --git a/pkg/filesystemhelper/filesystemhelper.go b/pkg/filesystemhelper/filesystemhelper.go index 069dd26f..53431230 100644 --- a/pkg/filesystemhelper/filesystemhelper.go +++ b/pkg/filesystemhelper/filesystemhelper.go @@ -481,4 +481,3 @@ func IsDuplicatedParts(part1, part2 string) error { } return nil } - diff --git a/pkg/server/server.go b/pkg/server/server.go index acb3e6f9..86865598 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -551,7 +551,7 @@ func (api *APIServer) actionsCleanRemoteBrokenHandler(w http.ResponseWriter, row return actionsResults, err } b := backup.NewBackuper(cfg) - err = b.CleanRemoteBroken(commandId) + err = b.CleanRemoteBroken(commandId, nil) if err != nil { log.Error().Msgf("Clean remote broken error: %v", err) status.Current.Stop(commandId, err) @@ -1378,7 +1378,7 @@ func (api *APIServer) httpCleanRemoteBrokenHandler(w http.ResponseWriter, _ *htt defer status.Current.Stop(commandId, err) b := backup.NewBackuper(cfg) - err = b.CleanRemoteBroken(commandId) + err = b.CleanRemoteBroken(commandId, nil) if err != nil { log.Error().Msgf("Clean remote broken error: %v", err) api.writeError(w, http.StatusInternalServerError, "clean_remote_broken", err) diff --git a/pkg/storage/azblob.go b/pkg/storage/azblob.go index fb6b6407..0bcf565a 100644 --- a/pkg/storage/azblob.go +++ b/pkg/storage/azblob.go @@ -322,7 +322,7 @@ func (a *AzureBlob) StatFileAbsolute(ctx context.Context, key string) (RemoteFil if !errors.As(err, &se) || se.ServiceCode() != azblob.ServiceCodeBlobNotFound { return nil, errors.WithMessage(err, "AzureBlob StatFileAbsolute GetProperties") } - return nil, ErrNotFound + return nil, NewErrNotFound(key) } return &azureBlobFile{ name: key, diff --git a/pkg/storage/cos.go b/pkg/storage/cos.go index 1a4d807d..6550defe 100644 --- a/pkg/storage/cos.go +++ b/pkg/storage/cos.go @@ -75,15 +75,18 @@ func (c *COS) StatFile(ctx context.Context, key string) (RemoteFile, error) { func (c *COS) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error) { // @todo - COS Stat file max size is 5Gb resp, err := c.client.Object.Get(ctx, key, nil) - if err != nil { + if err != nil || resp == nil { var cosErr *cos.ErrorResponse ok := errors.As(err, &cosErr) if ok && cosErr.Code == "NoSuchKey" { - return nil, ErrNotFound + return nil, NewErrNotFound(key) } return nil, errors.WithMessage(err, "COS StatFileAbsolute Get") } - modifiedTime, _ := parseTime(resp.Response.Header.Get("Date")) + modifiedTime, parseErr := parseTime(resp.Response.Header.Get("Date")) + if parseErr != nil { + log.Warn().Err(parseErr).Stack().Msg("parseTime(COS.Response.Header.Get(\"Date\")) return error") + } return &cosFile{ size: resp.Response.ContentLength, name: resp.Request.URL.Path, @@ -105,6 +108,10 @@ func (c *COS) Walk(ctx context.Context, cosPath string, recursive bool, process } func (c *COS) WalkAbsolute(ctx context.Context, prefix string, recursive bool, process func(context.Context, RemoteFile) error) error { + // COS API needs prefix to end with "/" for proper directory listing. + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } delimiter := "" if !recursive { @@ -156,7 +163,7 @@ func (c *COS) GetFileReader(ctx context.Context, key string) (io.ReadCloser, err func (c *COS) GetFileReaderAbsolute(ctx context.Context, key string) (io.ReadCloser, error) { resp, err := c.client.Object.Get(ctx, key, nil) - if err != nil { + if err != nil || resp == nil { return nil, errors.WithMessage(err, "COS GetFileReaderAbsolute Get") } return resp.Body, nil diff --git a/pkg/storage/ftp.go b/pkg/storage/ftp.go index 0f01c5e0..5f52231c 100644 --- a/pkg/storage/ftp.go +++ b/pkg/storage/ftp.go @@ -102,7 +102,7 @@ func (f *FTP) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, err if err != nil { // proftpd return 550 error if `dir` not exists if strings.HasPrefix(err.Error(), "550") { - return nil, ErrNotFound + return nil, NewErrNotFound(key) } return nil, errors.WithMessage(err, "FTP StatFileAbsolute List") } @@ -118,7 +118,7 @@ func (f *FTP) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, err } } - return nil, ErrNotFound + return nil, NewErrNotFound(key) } func (f *FTP) DeleteFile(ctx context.Context, key string) error { @@ -128,7 +128,13 @@ func (f *FTP) DeleteFile(ctx context.Context, key string) error { if err != nil { return errors.WithMessage(err, "FTP DeleteFile getConnection") } - if err := client.RemoveDirRecur(path.Join(f.Config.Path, key)); err != nil { + filePath := path.Join(f.Config.Path, key) + // goftp RemoveDirRecur calls ChangeDir which fails on files. + // Try DELE first for files, fall back to RemoveDirRecur for directories. + if err := client.Delete(filePath); err == nil { + return nil + } + if err := client.RemoveDirRecur(filePath); err != nil { return errors.WithMessage(err, "FTP DeleteFile RemoveDirRecur") } return nil @@ -243,7 +249,13 @@ func (f *FTP) DeleteFileFromObjectDiskBackup(ctx context.Context, key string) er if err != nil { return errors.WithMessage(err, "FTP DeleteFileFromObjectDiskBackup getConnection") } - if err := client.RemoveDirRecur(path.Join(f.Config.ObjectDiskPath, key)); err != nil { + filePath := path.Join(f.Config.ObjectDiskPath, key) + // goftp RemoveDirRecur calls ChangeDir which fails on files (not directories). + // Try DELE first for files, fall back to RemoveDirRecur for directories. + if err := client.Delete(filePath); err == nil { + return nil + } + if err := client.RemoveDirRecur(filePath); err != nil { return errors.WithMessage(err, "FTP DeleteFileFromObjectDiskBackup RemoveDirRecur") } return nil diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 056fd8e6..015739dc 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -401,7 +401,7 @@ func (gcs *GCS) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, e } if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { - return nil, ErrNotFound + return nil, NewErrNotFound(key) } return nil, errors.WithMessage(err, "GCS StatFileAbsolute Attrs") } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index efe0f992..1c0ad7fa 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -742,7 +742,7 @@ func (s *S3) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, erro var httpErr *smithyhttp.ResponseError if errors.As(opError.Err, &httpErr) { if httpErr.Response.StatusCode == http.StatusNotFound { - return nil, ErrNotFound + return nil, NewErrNotFound(key) } } } diff --git a/pkg/storage/sftp.go b/pkg/storage/sftp.go index 4a42cece..985eeb2d 100644 --- a/pkg/storage/sftp.go +++ b/pkg/storage/sftp.go @@ -108,7 +108,7 @@ func (sftp *SFTP) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, if err != nil { sftp.Debug("[SFTP_DEBUG] StatFile::STAT %s return error %v", key, err) if strings.Contains(err.Error(), "not exist") { - return nil, ErrNotFound + return nil, NewErrNotFound(key) } return nil, errors.WithMessage(err, "SFTP StatFileAbsolute Stat") } diff --git a/pkg/storage/structs.go b/pkg/storage/structs.go index eb3d0176..5cd3840a 100644 --- a/pkg/storage/structs.go +++ b/pkg/storage/structs.go @@ -14,6 +14,11 @@ var ( ErrNotFound = errors.New("key not found") ) +// NewErrNotFound wraps ErrNotFound with key; errors.Is(err, ErrNotFound) still matches. +func NewErrNotFound(key string) error { + return fmt.Errorf("%w: %s", ErrNotFound, key) +} + // KeyError represents an error for a specific key during batch deletion type KeyError struct { Key string diff --git a/test/integration/cleanBrokenRetention_test.go b/test/integration/cleanBrokenRetention_test.go new file mode 100644 index 00000000..074a714c --- /dev/null +++ b/test/integration/cleanBrokenRetention_test.go @@ -0,0 +1,472 @@ +//go:build integration + +package main + +import ( + "context" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/Altinity/clickhouse-backup/v2/pkg/utils" + + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/require" +) + +// Each TestCleanBrokenRetention* function verifies that `clean_broken_retention`: +// - lists orphans in object_disks_path (dry-run) without deleting, +// - preserves entries under backup `path` that BackupList discovers as broken +// (in-progress uploads with no metadata.json), +// - by default removes orphans from `object_disks_path` only, +// - preserves the live backup and entries matched by --exclude globs, +// - uses --include=cbr_* to isolate the test from other tests sharing the same bucket. +// +// Each backend is its own top-level test so they can be run independently +// (e.g. `RUN_TESTS=TestCleanBrokenRetentionS3 ./test/integration/run.sh`). +// Backends that need cloud credentials skip themselves when the corresponding env +// var (GCS_TESTS, AZURE_TESTS, QA_TENCENT_SECRET_KEY/QA_TENCENT_SECRET_ID) is unset. + +// cleanBrokenRetentionCase wires one remote-storage backend to the shared scenario. +type cleanBrokenRetentionCase struct { + name string + configFile string + pathRoot string + objRoot string + skip func() bool + skipReason string + setup func(env *TestEnvironment, r *require.Assertions) + plant orphanAction + assertExists orphanAction + assertGone orphanAction + finalEmptyType string +} + +type orphanAction func(env *TestEnvironment, r *require.Assertions, root, name string) + +func TestCleanBrokenRetentionS3(t *testing.T) { + runCleanBrokenRetentionCase(t, s3CleanBrokenRetentionCase()) +} +func TestCleanBrokenRetentionSFTP(t *testing.T) { + runCleanBrokenRetentionCase(t, sftpCleanBrokenRetentionCase()) +} +func TestCleanBrokenRetentionFTP(t *testing.T) { + runCleanBrokenRetentionCase(t, ftpCleanBrokenRetentionCase()) +} +func TestCleanBrokenRetentionGCSEmulator(t *testing.T) { + runCleanBrokenRetentionCase(t, gcsEmulatorCleanBrokenRetentionCase()) +} +func TestCleanBrokenRetentionAZBLOB(t *testing.T) { + runCleanBrokenRetentionCase(t, azblobCleanBrokenRetentionCase()) +} +func TestCleanBrokenRetentionGCS(t *testing.T) { + runCleanBrokenRetentionCase(t, gcsRealCleanBrokenRetentionCase()) +} +func TestCleanBrokenRetentionCOS(t *testing.T) { + runCleanBrokenRetentionCase(t, cosCleanBrokenRetentionCase()) +} + +func runCleanBrokenRetentionCase(t *testing.T, tc cleanBrokenRetentionCase) { + if tc.skip != nil && tc.skip() { + t.Skip(tc.skipReason) + return + } + runCleanBrokenRetentionScenario(t, tc) +} + +func runCleanBrokenRetentionScenario(t *testing.T, tc cleanBrokenRetentionCase) { + chVer := strings.ReplaceAll(os.Getenv("CLICKHOUSE_VERSION"), ".", "_") + cleanBrokenRetentionExcludeGlob := "cbr_orphan_keep_" + chVer + "_*" + cleanBrokenRetentionIncludeGlob := "cbr_*_" + chVer + "_*" + + env, r := NewTestEnvironment(t) + env.connectWithWait(t, r, 0*time.Second, 1*time.Second, 1*time.Minute) + defer env.Cleanup(t, r) + + r.NoError(env.DockerCP("configs/"+tc.configFile, "clickhouse-backup:/etc/clickhouse-backup/config.yml")) + if tc.setup != nil { + tc.setup(env, r) + } + + tableName := fmt.Sprintf("default.clean_broken_retention_%s", strings.ToLower(tc.name)) + env.queryWithNoError(r, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s(id UInt64) ENGINE=MergeTree() ORDER BY id", tableName)) + t.Cleanup(func() { + dropQ := "DROP TABLE IF EXISTS " + tableName + if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "20.3") > 0 { + dropQ += " NO DELAY" + } + if _, err := env.DockerExecOut("clickhouse", "clickhouse", "client", "-q", dropQ); err != nil { + log.Warn().Err(err).Str("table", tableName).Msg("t.Cleanup: failed to drop table") + } + }) + env.queryWithNoError(r, fmt.Sprintf("INSERT INTO %s SELECT number FROM numbers(50)", tableName)) + + suffix := time.Now().UnixNano() + keepBackup := fmt.Sprintf("cbr_keep_%s_%d", chVer, suffix) + brokenPath := fmt.Sprintf("cbr_broken_%s_%d", chVer, suffix) + orphanObj := fmt.Sprintf("cbr_orphan_obj_%s_%d", chVer, suffix) + orphanKept := fmt.Sprintf("cbr_orphan_keep_%s_%d", chVer, suffix) + + log.Debug().Str("backend", tc.name).Msg("Create a live backup that must survive the cleanup") + env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "create_remote", "--tables", tableName, keepBackup) + + log.Debug().Str("backend", tc.name).Msg("Plant broken entry under backup path and orphans under object_disks_path") + // brokenPath: no metadata.json → BackupList treats it as a broken (in-progress) backup → must be KEPT. + tc.plant(env, r, tc.pathRoot, brokenPath) + tc.assertExists(env, r, tc.pathRoot, brokenPath) + // Genuine orphans live only under object_disks_path, which BackupList does not scan. + for _, name := range []string{orphanObj, orphanKept} { + tc.plant(env, r, tc.objRoot, name) + tc.assertExists(env, r, tc.objRoot, name) + } + + log.Debug().Str("backend", tc.name).Msg("Dry-run lists object disk orphans but preserves broken backup path entries") + dryRunOut, err := env.DockerExecOut("clickhouse-backup", "clickhouse-backup", "clean_broken_retention", "--include="+cleanBrokenRetentionIncludeGlob) + r.NoError(err, "dry-run failed: %s", dryRunOut) + r.NotContains(dryRunOut, fmt.Sprintf("orphan=%s", brokenPath), "broken backup path entry must not appear as orphan") + r.Contains(dryRunOut, orphanObj, "dry-run must mention object disk orphan") + r.Contains(dryRunOut, "would delete", "dry-run must announce planned deletions") + r.NotContains(dryRunOut, "clean_broken_retention: deleting", "dry-run must not delete") + r.NotContains(dryRunOut, fmt.Sprintf("\"orphan\":\"%s\"", keepBackup), "live backup must not appear as orphan") + tc.assertExists(env, r, tc.pathRoot, brokenPath) + tc.assertExists(env, r, tc.objRoot, orphanObj) + + log.Debug().Str("backend", tc.name).Msg("--exclude glob preserves matched object disk orphans") + commitOut, err := env.DockerExecOut("clickhouse-backup", "clickhouse-backup", "clean_broken_retention", "--commit", "--include="+cleanBrokenRetentionIncludeGlob, "--exclude="+cleanBrokenRetentionExcludeGlob) + r.NoError(err, "commit failed: %s", commitOut) + r.Contains(commitOut, "clean_broken_retention: deleting") + tc.assertExists(env, r, tc.pathRoot, brokenPath) + tc.assertGone(env, r, tc.objRoot, orphanObj) + tc.assertExists(env, r, tc.objRoot, orphanKept) + + log.Debug().Str("backend", tc.name).Msg("Second run without --exclude clears the remaining object disk orphan") + env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "clean_broken_retention", "--commit", "--include="+cleanBrokenRetentionIncludeGlob) + tc.assertExists(env, r, tc.pathRoot, brokenPath) + tc.assertGone(env, r, tc.objRoot, orphanKept) + + log.Debug().Str("backend", tc.name).Msg("Cleanup live backup and broken entry") + env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "delete", "remote", keepBackup) + env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "delete", "local", keepBackup) + env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "clean_remote_broken", "--include="+cleanBrokenRetentionIncludeGlob) + if tc.finalEmptyType != "" { + env.checkObjectStorageIsEmpty(t, r, tc.finalEmptyType) + } +} + +// containerFSCase builds a case for a backend that maps its remote storage to a +// path on the given docker container's filesystem. +func containerFSCase(name, configFile, container, pathRoot, objRoot, finalEmptyType string) cleanBrokenRetentionCase { + plant := func(env *TestEnvironment, r *require.Assertions, root, name string) { + env.DockerExecNoError(r, container, "sh", "-c", fmt.Sprintf( + "mkdir -p %s/%s/sub && echo garbage > %s/%s/data.bin && echo garbage > %s/%s/sub/nested.bin && chmod -R 777 %s/%s", + root, name, root, name, root, name, root, name)) + } + exists := func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, err := env.DockerExecOut(container, "ls", root+"/"+name) + r.NoError(err, "expected %s/%s to exist on %s, output: %s", root, name, container, out) + } + gone := func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, _ := env.DockerExecOut(container, "sh", "-c", fmt.Sprintf("ls %s/%s 2>/dev/null || true", root, name)) + r.Empty(strings.TrimSpace(out), "expected %s/%s on %s to be removed, ls returned: %s", root, name, container, out) + } + return cleanBrokenRetentionCase{ + name: name, + configFile: configFile, + pathRoot: pathRoot, + objRoot: objRoot, + plant: plant, + assertExists: exists, + assertGone: gone, + finalEmptyType: finalEmptyType, + } +} + +func s3CleanBrokenRetentionCase() cleanBrokenRetentionCase { + // Plant via `mc cp` instead of direct FS writes — MinIO ignores raw files on disk and + // only sees objects that went through its S3 API. + const mcAliasCmd = "mc alias set local https://localhost:9000 access_key it_is_my_super_secret_key >/dev/null 2>&1" + const bucketPath = "local/clickhouse/backup/cluster/0" + const objBucketPath = "local/clickhouse/object_disk/cluster/0" + plant := func(env *TestEnvironment, r *require.Assertions, root, name string) { + env.DockerExecNoError(r, "minio", "bash", "-c", fmt.Sprintf( + "%s && echo garbage > /tmp/data.bin && mc cp /tmp/data.bin %s/%s/data.bin >/dev/null && mc cp /tmp/data.bin %s/%s/sub/nested.bin >/dev/null", + mcAliasCmd, root, name, root, name, + )) + } + exists := func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, err := env.DockerExecOut("minio", "bash", "-c", fmt.Sprintf("%s && mc ls %s/%s/", mcAliasCmd, root, name)) + r.NoError(err, "mc ls failed: %s", out) + r.Contains(out, "data.bin", "expected data.bin under %s/%s, got: %s", root, name, out) + } + gone := func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, _ := env.DockerExecOut("minio", "bash", "-c", fmt.Sprintf("%s && mc ls -r %s/%s/ 2>&1 || true", mcAliasCmd, root, name)) + r.NotContains(out, "data.bin", "expected no objects under %s/%s, got: %s", root, name, out) + r.NotContains(out, "nested.bin", "expected no objects under %s/%s, got: %s", root, name, out) + } + return cleanBrokenRetentionCase{ + name: "S3", + configFile: "config-s3.yml", + pathRoot: bucketPath, + objRoot: objBucketPath, + plant: plant, + assertExists: exists, + assertGone: gone, + finalEmptyType: "S3", + } +} + +func sftpCleanBrokenRetentionCase() cleanBrokenRetentionCase { + tc := containerFSCase("SFTP", "config-sftp-auth-key.yaml", "sshd", "/root", "/object_disk", "") + tc.setup = func(env *TestEnvironment, r *require.Assertions) { + env.uploadSSHKeys(r, "clickhouse-backup") + env.DockerExecNoError(r, "sshd", "mkdir", "-p", "/object_disk") + } + return tc +} + +func ftpCleanBrokenRetentionCase() cleanBrokenRetentionCase { + home := "/home/test_backup" + if isAdvancedMode() { + home = "/home/ftpusers/test_backup" + } + tc := containerFSCase("FTP", "config-ftp.yaml", "ftp", home+"/backup", home+"/object_disk", "") + tc.skip = func() bool { return compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "21.8") <= 0 } + tc.skipReason = "FTP scenario only validated on ClickHouse > 21.8" + tc.setup = func(env *TestEnvironment, r *require.Assertions) { + // proftpd/vsftpd containers don't create `test_backup` as a system user; uid 1000 owns the home dir. + env.DockerExecNoError(r, "ftp", "sh", "-c", fmt.Sprintf("mkdir -p %s/backup %s/object_disk && chown -R 1000:1000 %s && chmod -R 0777 %s", home, home, home, home)) + } + return tc +} + +func gcsEmulatorCleanBrokenRetentionCase() cleanBrokenRetentionCase { + const bucket = "altinity-qa-test" + const baseURL = "http://localhost:8080" + setup := func(env *TestEnvironment, r *require.Assertions) { + env.DockerExecNoError(r, "gcs", "apk", "add", "-q", "curl") + } + plant := func(env *TestEnvironment, r *require.Assertions, root, name string) { + obj := root + "/" + name + env.DockerExecNoError(r, "gcs", "sh", "-c", fmt.Sprintf( + `echo garbage > /tmp/data.bin && `+ + `curl -s -o /dev/null -X POST "%s/upload/storage/v1/b/%s/o?name=%s/data.bin&uploadType=media" -H "Content-Type: application/octet-stream" --data-binary @/tmp/data.bin && `+ + `curl -s -o /dev/null -X POST "%s/upload/storage/v1/b/%s/o?name=%s/sub/nested.bin&uploadType=media" -H "Content-Type: application/octet-stream" --data-binary @/tmp/data.bin`, + baseURL, bucket, obj, baseURL, bucket, obj)) + } + assertExists := func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, err := env.DockerExecOut("gcs", "sh", "-c", fmt.Sprintf( + `curl -s "%s/storage/v1/b/%s/o?prefix=%s/"`, baseURL, bucket, root+"/"+name)) + r.NoError(err, "assertExists list failed: %s", out) + r.Contains(out, "data.bin", "expected data.bin under %s/%s", root, name) + } + assertGone := func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, _ := env.DockerExecOut("gcs", "sh", "-c", fmt.Sprintf( + `curl -s "%s/storage/v1/b/%s/o?prefix=%s/"`, baseURL, bucket, root+"/"+name)) + r.NotContains(out, "data.bin", "expected no blobs under %s/%s", root, name) + } + return cleanBrokenRetentionCase{ + name: "GCS_EMULATOR", + configFile: "config-gcs-custom-endpoint.yml", + pathRoot: "backup/cluster/0", + objRoot: "object_disks/cluster/0", + setup: setup, + plant: plant, + assertExists: assertExists, + assertGone: assertGone, + finalEmptyType: "GCS_EMULATOR", + } +} + +func gcsRealCleanBrokenRetentionCase() cleanBrokenRetentionCase { + const bucket = "altinity-qa-test" + const image = "google/cloud-sdk:slim" + // All gsutil invocations need the service account activated first. + const authPrefix = "gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS >/dev/null 2>&1 && " + gsutil := func(env *TestEnvironment, r *require.Assertions, sh string) string { + // --volumes-from gives us /etc/clickhouse-backup/credentials.json from the backup container. + args := []string{ + "run", "--rm", "--network", env.tc.networkName, + "--volumes-from", env.tc.GetContainerID("clickhouse-backup"), + "-e", "GOOGLE_APPLICATION_CREDENTIALS=/etc/clickhouse-backup/credentials.json", + image, "bash", "-c", authPrefix + sh, + } + out, err := utils.ExecCmdOut(context.Background(), dockerExecTimeout, "docker", args...) + r.NoError(err, "gsutil command `%s` failed: %s", sh, out) + return out + } + return cleanBrokenRetentionCase{ + name: "GCS", + configFile: "config-gcs.yml", + pathRoot: "backup/cluster/0", + objRoot: "object_disks/cluster/0", + skip: func() bool { return isTestShouldSkip("GCS_TESTS") }, + skipReason: "Skipping GCS integration tests (GCS_TESTS not set)", + setup: func(env *TestEnvironment, r *require.Assertions) { + env.tc.pullImageIfNeeded(context.Background(), image) + }, + plant: func(env *TestEnvironment, r *require.Assertions, root, name string) { + obj := root + "/" + name + gsutil(env, r, fmt.Sprintf( + "echo garbage > /tmp/data.bin && gsutil -q cp /tmp/data.bin gs://%s/%s/data.bin && gsutil -q cp /tmp/data.bin gs://%s/%s/sub/nested.bin", + bucket, obj, bucket, obj)) + }, + assertExists: func(env *TestEnvironment, r *require.Assertions, root, name string) { + obj := root + "/" + name + out := gsutil(env, r, fmt.Sprintf("gsutil ls gs://%s/%s/", bucket, obj)) + r.Contains(out, "gs://"+bucket+"/"+obj+"/", "expected listing to contain %s", obj) + }, + assertGone: func(env *TestEnvironment, r *require.Assertions, root, name string) { + obj := root + "/" + name + out := gsutil(env, r, fmt.Sprintf("gsutil ls gs://%s/%s/** 2>&1 || true", bucket, obj)) + r.NotContains(out, "gs://"+bucket+"/"+obj, "expected no blobs under gs://%s/%s, got: %s", bucket, obj, out) + }, + } +} + +func cosCleanBrokenRetentionCase() cleanBrokenRetentionCase { + // COS exposes an S3-compatible API on its regional endpoint. + const bucket = "clickhouse-backup-1336113806" + const endpoint = "https://cos.na-ashburn.myqcloud.com" + const image = "amazon/aws-cli:latest" + // Tencent COS rejects path-style addressing (PathStyleDomainForbidden); force virtual-hosted style. + const awsPrefix = "aws configure set default.s3.addressing_style virtual >/dev/null && " + awsRun := func(env *TestEnvironment, r *require.Assertions, sh string) string { + // --entrypoint sh overrides aws-cli's default `aws` entrypoint. + out, err := utils.ExecCmdOut(context.Background(), dockerExecTimeout, "docker", + "run", "--rm", "--network", env.tc.networkName, + "-e", "AWS_ACCESS_KEY_ID="+os.Getenv("QA_TENCENT_SECRET_ID"), + "-e", "AWS_SECRET_ACCESS_KEY="+os.Getenv("QA_TENCENT_SECRET_KEY"), + "-e", "AWS_DEFAULT_REGION=na-ashburn", + "--entrypoint", "sh", image, "-c", awsPrefix+sh) + r.NoError(err, "aws-cli failed: %s", out) + return out + } + return cleanBrokenRetentionCase{ + name: "COS", + configFile: "config-cos.yml", + pathRoot: "backup/cluster/0", + objRoot: "object_disk/cluster/0", + skip: func() bool { + return os.Getenv("QA_TENCENT_SECRET_KEY") == "" || os.Getenv("QA_TENCENT_SECRET_ID") == "" + }, + skipReason: "Skipping COS integration tests (QA_TENCENT_SECRET_ID / QA_TENCENT_SECRET_KEY not set)", + setup: func(env *TestEnvironment, r *require.Assertions) { + env.tc.pullImageIfNeeded(context.Background(), image) + env.InstallDebIfNotExists(r, "clickhouse-backup", "gettext-base") + // config.yml was copied raw and still has ${QA_TENCENT_SECRET_*} placeholders. + env.DockerExecNoError(r, "clickhouse-backup", "bash", "-xec", + "envsubst < /etc/clickhouse-backup/config.yml > /tmp/c.yml && mv /tmp/c.yml /etc/clickhouse-backup/config.yml") + }, + plant: func(env *TestEnvironment, r *require.Assertions, root, name string) { + obj := root + "/" + name + awsRun(env, r, fmt.Sprintf( + "echo garbage > /tmp/data.bin && aws --endpoint-url=%s s3 cp /tmp/data.bin s3://%s/%s/data.bin >/dev/null && aws --endpoint-url=%s s3 cp /tmp/data.bin s3://%s/%s/sub/nested.bin >/dev/null", + endpoint, bucket, obj, endpoint, bucket, obj)) + }, + assertExists: func(env *TestEnvironment, r *require.Assertions, root, name string) { + obj := root + "/" + name + out := awsRun(env, r, fmt.Sprintf("aws --endpoint-url=%s s3 ls s3://%s/%s/", endpoint, bucket, obj)) + r.Contains(out, "data.bin", "expected data.bin under %s, got: %s", obj, out) + }, + assertGone: func(env *TestEnvironment, r *require.Assertions, root, name string) { + obj := root + "/" + name + out := awsRun(env, r, fmt.Sprintf("aws --endpoint-url=%s s3 ls s3://%s/%s/ 2>&1 || true", endpoint, bucket, obj)) + r.NotContains(out, "data.bin", "expected no blobs under s3://%s/%s, got: %s", bucket, obj, out) + r.NotContains(out, "nested.bin", "expected no blobs under s3://%s/%s, got: %s", bucket, obj, out) + }, + } +} + +func azblobCleanBrokenRetentionCase() cleanBrokenRetentionCase { + const container = "container1" + const accountName = "devstoreaccount1" + const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + const azureCliImage = "mcr.microsoft.com/azure-cli:latest" + blobPath := func(root, name string) string { return strings.TrimPrefix(root+"/"+name, "/") } + uploadTimeout := 2 * time.Minute + + azConnString := fmt.Sprintf( + "DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s;BlobEndpoint=http://azure:10000/%s;", + accountName, accountKey, accountName, + ) + + azRun := func(env *TestEnvironment, args ...string) (string, error) { + dockerArgs := append([]string{ + "run", "--rm", "--network", env.tc.networkName, + "-e", "AZURE_STORAGE_CONNECTION_STRING=" + azConnString, + azureCliImage, "az", + }, args...) + return utils.ExecCmdOut(context.Background(), uploadTimeout, "docker", dockerArgs...) + } + + setup := func(env *TestEnvironment, r *require.Assertions) { + env.tc.pullImageIfNeeded(context.Background(), azureCliImage) + } + + plant := func(env *TestEnvironment, r *require.Assertions, root, name string) { + p := blobPath(root, name) + tmpDir, err := os.MkdirTemp("", "azblob-plant-*") + r.NoError(err) + defer func() { + if deleteErr := os.RemoveAll(tmpDir); deleteErr != nil { + log.Error().Err(deleteErr).Stack().Msgf("can't remove tmpDir=%s", tmpDir) + } + }() + + r.NoError(os.WriteFile(tmpDir+"/data.bin", []byte("garbage"), 0644)) + out, err := utils.ExecCmdOut(context.Background(), uploadTimeout, "docker", + "run", "--rm", "--network", env.tc.networkName, + "-e", "AZURE_STORAGE_CONNECTION_STRING="+azConnString, + "-v", tmpDir+"/data.bin:/data.bin:ro", + azureCliImage, "az", + "storage", "blob", "upload", + "--container-name", container, + "--name", p+"/data.bin", + "--file", "/data.bin", + ) + r.NoError(err, "azblob plant data.bin: %s", out) + + r.NoError(os.MkdirAll(tmpDir+"/sub", 0755)) + r.NoError(os.WriteFile(tmpDir+"/sub/nested.bin", []byte("garbage"), 0644)) + out, err = utils.ExecCmdOut(context.Background(), uploadTimeout, "docker", + "run", "--rm", "--network", env.tc.networkName, + "-e", "AZURE_STORAGE_CONNECTION_STRING="+azConnString, + "-v", tmpDir+"/sub/nested.bin:/nested.bin:ro", + azureCliImage, "az", + "storage", "blob", "upload", + "--container-name", container, + "--name", p+"/sub/nested.bin", + "--file", "/nested.bin", + ) + r.NoError(err, "azblob plant nested.bin: %s", out) + } + + blobShow := func(env *TestEnvironment, r *require.Assertions, root, name string) (string, error) { + return azRun(env, + "storage", "blob", "show", + "--container-name", container, + "--name", blobPath(root, name)+"/data.bin", + ) + } + + return cleanBrokenRetentionCase{ + name: "AZBLOB", + configFile: "config-azblob.yml", + pathRoot: "backup", + objRoot: "object_disks", + skip: func() bool { return isTestShouldSkip("AZURE_TESTS") }, + skipReason: "Skipping AZBLOB integration tests (AZURE_TESTS not set)", + setup: setup, + plant: plant, + assertExists: func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, err := blobShow(env, r, root, name) + r.NoError(err, "azblob assertExists failed: %s", out) + }, + assertGone: func(env *TestEnvironment, r *require.Assertions, root, name string) { + out, err := blobShow(env, r, root, name) + r.Error(err, "expected %s to be gone, got: %s", blobPath(root, name), out) + }, + } +} diff --git a/test/integration/containers.go b/test/integration/containers.go index 9ba7be86..ab407101 100644 --- a/test/integration/containers.go +++ b/test/integration/containers.go @@ -589,7 +589,10 @@ func (tc *TestContainers) startAzure(ctx context.Context) error { return tc.startContainer(ctx, "azure", &container.Config{ Image: "mcr.microsoft.com/azure-storage/azurite:latest", - Cmd: []string{"azurite", "--debug", "/dev/stderr", "-l", "/data", "--blobHost", "0.0.0.0", "--blobKeepAliveTimeout", "600", "--disableTelemetry"}, + // --skipApiVersionCheck: azure-cli 2.84+ (w/ Azure SDK v12.27+) sends x-ms-version 2026-02-06 + // which Azurite 3.35.0 does not recognise. Tracked upstream: + // https://github.com/Azure/Azurite/issues/2623 + Cmd: []string{"azurite", "--debug", "/dev/stderr", "-l", "/data", "--blobHost", "0.0.0.0", "--blobKeepAliveTimeout", "600", "--disableTelemetry", "--skipApiVersionCheck"}, Healthcheck: &container.HealthConfig{ Test: []string{"CMD-SHELL", "nc 127.0.0.1 10000 -z"}, Interval: 1 * time.Second, diff --git a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot index e7762e84..518413f1 100644 --- a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot +++ b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot @@ -1,6 +1,6 @@ default_config = r"""'[\'general:\', \' remote_storage: none\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' allow_object_disk_streaming: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' restore_table_mapping: {}\', \' retries_on_failure: 3\', \' retries_pause: 5s\', \' retries_jitter: 0\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' sharded_operation_mode: ""\', \' cpu_nice_priority: 15\', \' io_nice_priority: idle\', \' rbac_backup_always: true\', \' rbac_conflict_resolution: recreate\', \' config_backup_always: false\', \' named_collections_backup_always: false\', \' delete_batch_size: 1000\', \' retriesduration: 5s\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' skip_table_engines: []\', \' skip_disks: []\', \' skip_disk_types: []\', \' timeout: 30m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' use_embedded_backup_restore_cluster: ""\', \' embedded_backup_disk: ""\', \' backup_mutations: true\', \' restore_as_attach: false\', \' restore_distributed_cluster: ""\', \' check_parts_columns: true\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: exec:systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' default_replica_path: /clickhouse/tables/{cluster}/{shard}/{database}/{table}\', " default_replica_name: \'{replica}\'", \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \' force_rebalance: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' object_disk_path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' allow_multipart_download: false\', \' object_labels: {}\', \' request_payer: ""\', \' check_sum_algorithm: ""\', \' request_content_md5: false\', \' retry_mode: standard\', \' chunk_size: 5242880\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' sa_email: ""\', \' embedded_access_key: ""\', \' embedded_secret_key: ""\', \' skip_credentials: false\', \' bucket: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' force_http: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \' chunk_size: 16777216\', \' encryption_key: ""\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' allow_multipart_download: false\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' ca_cert_file: ""\', \' ca_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \' watch_is_main_process: false\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' skip_tls_verify: false\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' assume_container_exists: false\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_count: 3\', \' timeout: 4h\', \' debug: false\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" -help_flag = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n clean_local_broken Remove all broken local backups\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n acvp Run ACVP wrapper protocol over stdin/stdout\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --environment-override value, --env value override any environment variable via CLI parameter\n --help, -h show help\n --version, -v print the version'""" +help_flag = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.
] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n clean_local_broken Remove all broken local backups\n clean_broken_retention Remove orphan entries under remote `path` and `object_disks_path` that are not in the live backup list\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n acvp Run ACVP wrapper protocol over stdin/stdout\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --environment-override value, --env value override any environment variable via CLI parameter\n --help, -h show help\n --version, -v print the version'""" -cli_usage = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.
] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n clean_local_broken Remove all broken local backups\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n acvp Run ACVP wrapper protocol over stdin/stdout\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --environment-override value, --env value override any environment variable via CLI parameter\n --help, -h show help\n --version, -v print the version'""" +cli_usage = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.
] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n clean_local_broken Remove all broken local backups\n clean_broken_retention Remove orphan entries under remote `path` and `object_disks_path` that are not in the live backup list\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n acvp Run ACVP wrapper protocol over stdin/stdout\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --environment-override value, --env value override any environment variable via CLI parameter\n --help, -h show help\n --version, -v print the version'"""