diff --git a/pkg/backup/download.go b/pkg/backup/download.go index c469e488a..9bc5cc428 100644 --- a/pkg/backup/download.go +++ b/pkg/backup/download.go @@ -38,6 +38,7 @@ import ( var ( ErrBackupIsAlreadyExists = errors.New("backup is already exists") + errIncompleteLocalPart = errors.New("incomplete local part") ) func (b *Backuper) Download(backupName string, tablePattern string, partitions []string, schemaOnly, rbacOnly, configsOnly, namedCollectionsOnly, resume bool, hardlinkExistsFiles bool, backupVersion string, commandId int) error { @@ -798,10 +799,20 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata. dataGroup.Go(func() error { log.Debug().Msgf("start %s -> %s", partRemotePath, partLocalPath) if b.resume { - isProcesses, pathSize := b.resumableState.IsAlreadyProcessed(partRemotePath) - atomic.AddUint64(&downloadedSize, uint64(pathSize)) - if isProcesses { - return nil + isProcessed, pathSize := b.resumableState.IsAlreadyProcessed(partRemotePath) + if isProcessed { + localComplete, localSize, completeErr := b.isLocalPartComplete(dataCtx, partRemotePath, partLocalPath) + if completeErr != nil { + return errors.WithMessage(completeErr, "isLocalPartComplete") + } + if localComplete { + if localSize > 0 { + pathSize = localSize + } + atomic.AddUint64(&downloadedSize, uint64(pathSize)) + return nil + } + log.Warn().Msgf("resume state marked %s as processed but local part %s is incomplete, downloading again", partRemotePath, partLocalPath) } } if hardlinkExistsFiles { @@ -855,6 +866,40 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata. return downloadedSize, nil } +func (b *Backuper) isLocalPartComplete(ctx context.Context, remotePartPath, localPartPath string) (bool, int64, error) { + filesChecked := 0 + var localSize int64 + walkErr := b.dst.Walk(ctx, remotePartPath, true, func(ctx context.Context, f storage.RemoteFile) error { + if b.dst.Kind() == "SFTP" && (f.Name() == "." || f.Name() == "..") { + return nil + } + filesChecked++ + localFilePath := path.Join(localPartPath, f.Name()) + info, err := os.Stat(localFilePath) + if err != nil { + if os.IsNotExist(err) { + return errIncompleteLocalPart + } + return errors.Wrapf(err, "stat local part file %s", localFilePath) + } + if !info.Mode().IsRegular() || info.Size() != f.Size() { + return errIncompleteLocalPart + } + localSize += info.Size() + return nil + }) + if walkErr != nil { + if errors.Cause(walkErr) == errIncompleteLocalPart { + return false, 0, nil + } + return false, 0, walkErr + } + if filesChecked == 0 { + return false, 0, nil + } + return true, localSize, nil +} + func (b *Backuper) hardlinkIfLocalPartExistsAndChecksumEqual(backupName string, table metadata.TableMetadata, part *metadata.Part, disks []clickhouse.Disk, diskName, dbAndTableDir string) (bool, int64, error) { diskType := "" for _, d := range disks { diff --git a/pkg/backup/download_test.go b/pkg/backup/download_test.go index 27ea7434e..c05b002c7 100644 --- a/pkg/backup/download_test.go +++ b/pkg/backup/download_test.go @@ -1,6 +1,9 @@ package backup import ( + "context" + "io" + "os" "regexp" "testing" "time" @@ -282,3 +285,123 @@ func TestReBalanceTablesMetadataIfDiskNotExists_CheckErrors(t *testing.T) { assert.Contains(t, err.Error(), "250B free space, not found in system.disks with `local` type") } + +type fakeResumeRemoteFile struct { + name string + size int64 +} + +func (f fakeResumeRemoteFile) Size() int64 { return f.size } +func (f fakeResumeRemoteFile) Name() string { return f.name } +func (f fakeResumeRemoteFile) LastModified() time.Time { return time.Time{} } + +type fakeResumeRemoteStorage struct { + kind string + files []storage.RemoteFile +} + +func (s fakeResumeRemoteStorage) Kind() string { return s.kind } +func (s fakeResumeRemoteStorage) Connect(context.Context) error { return nil } +func (s fakeResumeRemoteStorage) Close(context.Context) error { return nil } +func (s fakeResumeRemoteStorage) StatFile(context.Context, string) (storage.RemoteFile, error) { + return nil, os.ErrInvalid +} +func (s fakeResumeRemoteStorage) StatFileAbsolute(context.Context, string) (storage.RemoteFile, error) { + return nil, os.ErrInvalid +} +func (s fakeResumeRemoteStorage) DeleteFile(context.Context, string) error { return nil } +func (s fakeResumeRemoteStorage) DeleteFileFromObjectDiskBackup(context.Context, string) error { + return nil +} +func (s fakeResumeRemoteStorage) Walk(ctx context.Context, _ string, _ bool, fn func(context.Context, storage.RemoteFile) error) error { + for _, f := range s.files { + if err := fn(ctx, f); err != nil { + return err + } + } + return nil +} +func (s fakeResumeRemoteStorage) WalkAbsolute(context.Context, string, bool, func(context.Context, storage.RemoteFile) error) error { + return nil +} +func (s fakeResumeRemoteStorage) GetFileReader(context.Context, string) (io.ReadCloser, error) { + return nil, os.ErrInvalid +} +func (s fakeResumeRemoteStorage) GetFileReaderAbsolute(context.Context, string) (io.ReadCloser, error) { + return nil, os.ErrInvalid +} +func (s fakeResumeRemoteStorage) GetFileReaderWithLocalPath(context.Context, string, string, int64) (io.ReadCloser, error) { + return nil, os.ErrInvalid +} +func (s fakeResumeRemoteStorage) PutFile(context.Context, string, io.ReadCloser, int64) error { + return nil +} +func (s fakeResumeRemoteStorage) PutFileAbsolute(context.Context, string, io.ReadCloser, int64) error { + return nil +} +func (s fakeResumeRemoteStorage) CopyObject(context.Context, int64, string, string, string) (int64, error) { + return 0, nil +} + +func TestIsLocalPartCompleteRequiresEveryRemoteFile(t *testing.T) { + ctx := context.Background() + partPath := t.TempDir() + assert.NoError(t, os.WriteFile(partPath+"/checksums.txt", []byte("sum"), 0640)) + assert.NoError(t, os.WriteFile(partPath+"/data.bin", []byte("data"), 0640)) + + backuper := Backuper{dst: &storage.BackupDestination{RemoteStorage: fakeResumeRemoteStorage{ + kind: "s3", + files: []storage.RemoteFile{ + fakeResumeRemoteFile{name: "checksums.txt", size: 3}, + fakeResumeRemoteFile{name: "data.bin", size: 4}, + fakeResumeRemoteFile{name: "data.cmrk3", size: 5}, + }, + }}} + + complete, size, err := backuper.isLocalPartComplete(ctx, "backup/shadow/db/table/default/all_1_1_0", partPath) + assert.NoError(t, err) + assert.False(t, complete) + assert.EqualValues(t, 0, size) + + assert.NoError(t, os.WriteFile(partPath+"/data.cmrk3", []byte("marks"), 0640)) + complete, size, err = backuper.isLocalPartComplete(ctx, "backup/shadow/db/table/default/all_1_1_0", partPath) + assert.NoError(t, err) + assert.True(t, complete) + assert.EqualValues(t, 12, size) +} + +func TestIsRemotePartCompleteRequiresEveryExpectedLocalFile(t *testing.T) { + ctx := context.Background() + backupPath := t.TempDir() + assert.NoError(t, os.MkdirAll(backupPath+"/all_1_1_0", 0750)) + assert.NoError(t, os.WriteFile(backupPath+"/all_1_1_0/checksums.txt", []byte("sum"), 0640)) + assert.NoError(t, os.WriteFile(backupPath+"/all_1_1_0/data.bin", []byte("data"), 0640)) + assert.NoError(t, os.WriteFile(backupPath+"/all_1_1_0/data.cmrk3", []byte("marks"), 0640)) + + backuper := Backuper{dst: &storage.BackupDestination{RemoteStorage: fakeResumeRemoteStorage{ + kind: "s3", + files: []storage.RemoteFile{ + fakeResumeRemoteFile{name: "/all_1_1_0/checksums.txt", size: 3}, + fakeResumeRemoteFile{name: "/all_1_1_0/data.bin", size: 4}, + }, + }}} + partFiles := []string{"/all_1_1_0/checksums.txt", "/all_1_1_0/data.bin", "/all_1_1_0/data.cmrk3"} + + complete, size, err := backuper.isRemotePartComplete(ctx, "backup/shadow/db/table/default", backupPath, partFiles) + assert.NoError(t, err) + assert.False(t, complete) + assert.EqualValues(t, 0, size) + + backuper.dst.RemoteStorage = fakeResumeRemoteStorage{ + kind: "s3", + files: []storage.RemoteFile{ + fakeResumeRemoteFile{name: "/all_1_1_0/checksums.txt", size: 3}, + fakeResumeRemoteFile{name: "/all_1_1_0/data.bin", size: 4}, + fakeResumeRemoteFile{name: "/all_1_1_0/data.cmrk3", size: 5}, + }, + } + complete, size, err = backuper.isRemotePartComplete(ctx, "backup/shadow/db/table/default", backupPath, partFiles) + assert.NoError(t, err) + assert.True(t, complete) + assert.EqualValues(t, 12, size) +} diff --git a/pkg/backup/upload.go b/pkg/backup/upload.go index d31afda59..2d8afb0ec 100644 --- a/pkg/backup/upload.go +++ b/pkg/backup/upload.go @@ -586,8 +586,18 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet dataGroup.Go(func() error { if b.resume { if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remotePathFull); isProcessed { - atomic.AddInt64(&uploadedBytes, processedSize) - return nil + remoteComplete, remoteSize, validationErr := b.isRemotePartComplete(ctx, remotePath, backupPath, partFiles) + if validationErr != nil { + return errors.WithMessage(validationErr, "isRemotePartComplete") + } + if remoteComplete { + if remoteSize > 0 { + processedSize = remoteSize + } + atomic.AddInt64(&uploadedBytes, processedSize) + return nil + } + log.Warn().Msgf("resume state marked %s as processed but remote part %s is incomplete, uploading again", remotePathFull, remotePath) } } log.Debug().Msgf("start upload %d files to %s", len(partFiles), remotePath) @@ -620,8 +630,14 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet dataGroup.Go(func() error { if b.resume { if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteDataFile); isProcessed { - atomic.AddInt64(&uploadedBytes, processedSize) - return nil + if remoteFile, statErr := b.dst.StatFile(ctx, remoteDataFile); statErr == nil { + if remoteFile.Size() > 0 { + processedSize = remoteFile.Size() + } + atomic.AddInt64(&uploadedBytes, processedSize) + return nil + } + log.Warn().Msgf("resume state marked %s as processed but remote archive is missing, uploading again", remoteDataFile) } } log.Debug().Msgf("start upload %d files to %s", len(localFiles), remoteDataFile) @@ -668,6 +684,46 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet return uploadedFiles, uploadedParts, uploadedBytes, nil } +func (b *Backuper) isRemotePartComplete(ctx context.Context, remoteBasePath, localBasePath string, expectedFiles []string) (bool, int64, error) { + if b.dst == nil || len(expectedFiles) == 0 { + return false, 0, nil + } + remoteFiles := make(map[string]int64) + walkErr := b.dst.Walk(ctx, remoteBasePath, true, func(ctx context.Context, f storage.RemoteFile) error { + if b.dst.Kind() == "SFTP" && (f.Name() == "." || f.Name() == "..") { + return nil + } + remoteFiles[strings.TrimPrefix(f.Name(), "/")] = f.Size() + return nil + }) + if walkErr != nil { + log.Debug().Msgf("resume validation: can't walk remote part %s: %v", remoteBasePath, walkErr) + return false, 0, nil + } + + var remoteSize int64 + for _, expectedFile := range expectedFiles { + localFilePath := path.Join(localBasePath, expectedFile) + info, err := os.Stat(localFilePath) + if err != nil { + if os.IsNotExist(err) { + return false, 0, nil + } + return false, 0, errors.Wrapf(err, "stat local upload part file %s", localFilePath) + } + if !info.Mode().IsRegular() { + return false, 0, nil + } + remoteFileName := strings.TrimPrefix(expectedFile, "/") + remoteFileSize, ok := remoteFiles[remoteFileName] + if !ok || remoteFileSize != info.Size() { + return false, 0, nil + } + remoteSize += remoteFileSize + } + return true, remoteSize, nil +} + func (b *Backuper) uploadTableMetadata(ctx context.Context, backupName string, requiredBackupName string, tableMetadata *metadata.TableMetadata) (int64, error) { if b.isEmbedded { var sqlSize, jsonSize int64