Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

var (
ErrBackupIsAlreadyExists = errors.New("backup is already exists")
errIncompleteLocalPart = errors.New("incomplete local part")
)

func (b *Backuper) Download(backupName string, tablePattern string, partitions []string, schemaOnly, rbacOnly, configsOnly, namedCollectionsOnly, resume bool, hardlinkExistsFiles bool, backupVersion string, commandId int) error {
Expand Down Expand Up @@ -798,10 +799,20 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
dataGroup.Go(func() error {
log.Debug().Msgf("start %s -> %s", partRemotePath, partLocalPath)
if b.resume {
isProcesses, pathSize := b.resumableState.IsAlreadyProcessed(partRemotePath)
atomic.AddUint64(&downloadedSize, uint64(pathSize))
if isProcesses {
return nil
isProcessed, pathSize := b.resumableState.IsAlreadyProcessed(partRemotePath)
if isProcessed {
localComplete, localSize, completeErr := b.isLocalPartComplete(dataCtx, partRemotePath, partLocalPath)
if completeErr != nil {
return errors.WithMessage(completeErr, "isLocalPartComplete")
}
if localComplete {
if localSize > 0 {
pathSize = localSize
}
atomic.AddUint64(&downloadedSize, uint64(pathSize))
return nil
}
log.Warn().Msgf("resume state marked %s as processed but local part %s is incomplete, downloading again", partRemotePath, partLocalPath)
}
}
if hardlinkExistsFiles {
Expand Down Expand Up @@ -855,6 +866,40 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
return downloadedSize, nil
}

func (b *Backuper) isLocalPartComplete(ctx context.Context, remotePartPath, localPartPath string) (bool, int64, error) {
filesChecked := 0
var localSize int64
walkErr := b.dst.Walk(ctx, remotePartPath, true, func(ctx context.Context, f storage.RemoteFile) error {
if b.dst.Kind() == "SFTP" && (f.Name() == "." || f.Name() == "..") {
return nil
}
filesChecked++
localFilePath := path.Join(localPartPath, f.Name())
info, err := os.Stat(localFilePath)
if err != nil {
if os.IsNotExist(err) {
return errIncompleteLocalPart
}
return errors.Wrapf(err, "stat local part file %s", localFilePath)
}
if !info.Mode().IsRegular() || info.Size() != f.Size() {
return errIncompleteLocalPart
}
localSize += info.Size()
return nil
})
if walkErr != nil {
if errors.Cause(walkErr) == errIncompleteLocalPart {
return false, 0, nil
}
return false, 0, walkErr
}
if filesChecked == 0 {
return false, 0, nil
}
return true, localSize, nil
}

func (b *Backuper) hardlinkIfLocalPartExistsAndChecksumEqual(backupName string, table metadata.TableMetadata, part *metadata.Part, disks []clickhouse.Disk, diskName, dbAndTableDir string) (bool, int64, error) {
diskType := ""
for _, d := range disks {
Expand Down
123 changes: 123 additions & 0 deletions pkg/backup/download_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package backup

import (
"context"
"io"
"os"
"regexp"
"testing"
"time"
Expand Down Expand Up @@ -282,3 +285,123 @@ func TestReBalanceTablesMetadataIfDiskNotExists_CheckErrors(t *testing.T) {
assert.Contains(t, err.Error(), "250B free space, not found in system.disks with `local` type")

}

type fakeResumeRemoteFile struct {
name string
size int64
}

func (f fakeResumeRemoteFile) Size() int64 { return f.size }
func (f fakeResumeRemoteFile) Name() string { return f.name }
func (f fakeResumeRemoteFile) LastModified() time.Time { return time.Time{} }

type fakeResumeRemoteStorage struct {
kind string
files []storage.RemoteFile
}

func (s fakeResumeRemoteStorage) Kind() string { return s.kind }
func (s fakeResumeRemoteStorage) Connect(context.Context) error { return nil }
func (s fakeResumeRemoteStorage) Close(context.Context) error { return nil }
func (s fakeResumeRemoteStorage) StatFile(context.Context, string) (storage.RemoteFile, error) {
return nil, os.ErrInvalid
}
func (s fakeResumeRemoteStorage) StatFileAbsolute(context.Context, string) (storage.RemoteFile, error) {
return nil, os.ErrInvalid
}
func (s fakeResumeRemoteStorage) DeleteFile(context.Context, string) error { return nil }
func (s fakeResumeRemoteStorage) DeleteFileFromObjectDiskBackup(context.Context, string) error {
return nil
}
func (s fakeResumeRemoteStorage) Walk(ctx context.Context, _ string, _ bool, fn func(context.Context, storage.RemoteFile) error) error {
for _, f := range s.files {
if err := fn(ctx, f); err != nil {
return err
}
}
return nil
}
func (s fakeResumeRemoteStorage) WalkAbsolute(context.Context, string, bool, func(context.Context, storage.RemoteFile) error) error {
return nil
}
func (s fakeResumeRemoteStorage) GetFileReader(context.Context, string) (io.ReadCloser, error) {
return nil, os.ErrInvalid
}
func (s fakeResumeRemoteStorage) GetFileReaderAbsolute(context.Context, string) (io.ReadCloser, error) {
return nil, os.ErrInvalid
}
func (s fakeResumeRemoteStorage) GetFileReaderWithLocalPath(context.Context, string, string, int64) (io.ReadCloser, error) {
return nil, os.ErrInvalid
}
func (s fakeResumeRemoteStorage) PutFile(context.Context, string, io.ReadCloser, int64) error {
return nil
}
func (s fakeResumeRemoteStorage) PutFileAbsolute(context.Context, string, io.ReadCloser, int64) error {
return nil
}
func (s fakeResumeRemoteStorage) CopyObject(context.Context, int64, string, string, string) (int64, error) {
return 0, nil
}

func TestIsLocalPartCompleteRequiresEveryRemoteFile(t *testing.T) {
ctx := context.Background()
partPath := t.TempDir()
assert.NoError(t, os.WriteFile(partPath+"/checksums.txt", []byte("sum"), 0640))
assert.NoError(t, os.WriteFile(partPath+"/data.bin", []byte("data"), 0640))

backuper := Backuper{dst: &storage.BackupDestination{RemoteStorage: fakeResumeRemoteStorage{
kind: "s3",
files: []storage.RemoteFile{
fakeResumeRemoteFile{name: "checksums.txt", size: 3},
fakeResumeRemoteFile{name: "data.bin", size: 4},
fakeResumeRemoteFile{name: "data.cmrk3", size: 5},
},
}}}

complete, size, err := backuper.isLocalPartComplete(ctx, "backup/shadow/db/table/default/all_1_1_0", partPath)
assert.NoError(t, err)
assert.False(t, complete)
assert.EqualValues(t, 0, size)

assert.NoError(t, os.WriteFile(partPath+"/data.cmrk3", []byte("marks"), 0640))
complete, size, err = backuper.isLocalPartComplete(ctx, "backup/shadow/db/table/default/all_1_1_0", partPath)
assert.NoError(t, err)
assert.True(t, complete)
assert.EqualValues(t, 12, size)
}

func TestIsRemotePartCompleteRequiresEveryExpectedLocalFile(t *testing.T) {
ctx := context.Background()
backupPath := t.TempDir()
assert.NoError(t, os.MkdirAll(backupPath+"/all_1_1_0", 0750))
assert.NoError(t, os.WriteFile(backupPath+"/all_1_1_0/checksums.txt", []byte("sum"), 0640))
assert.NoError(t, os.WriteFile(backupPath+"/all_1_1_0/data.bin", []byte("data"), 0640))
assert.NoError(t, os.WriteFile(backupPath+"/all_1_1_0/data.cmrk3", []byte("marks"), 0640))

backuper := Backuper{dst: &storage.BackupDestination{RemoteStorage: fakeResumeRemoteStorage{
kind: "s3",
files: []storage.RemoteFile{
fakeResumeRemoteFile{name: "/all_1_1_0/checksums.txt", size: 3},
fakeResumeRemoteFile{name: "/all_1_1_0/data.bin", size: 4},
},
}}}
partFiles := []string{"/all_1_1_0/checksums.txt", "/all_1_1_0/data.bin", "/all_1_1_0/data.cmrk3"}

complete, size, err := backuper.isRemotePartComplete(ctx, "backup/shadow/db/table/default", backupPath, partFiles)
assert.NoError(t, err)
assert.False(t, complete)
assert.EqualValues(t, 0, size)

backuper.dst.RemoteStorage = fakeResumeRemoteStorage{
kind: "s3",
files: []storage.RemoteFile{
fakeResumeRemoteFile{name: "/all_1_1_0/checksums.txt", size: 3},
fakeResumeRemoteFile{name: "/all_1_1_0/data.bin", size: 4},
fakeResumeRemoteFile{name: "/all_1_1_0/data.cmrk3", size: 5},
},
}
complete, size, err = backuper.isRemotePartComplete(ctx, "backup/shadow/db/table/default", backupPath, partFiles)
assert.NoError(t, err)
assert.True(t, complete)
assert.EqualValues(t, 12, size)
}
64 changes: 60 additions & 4 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,18 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet
dataGroup.Go(func() error {
if b.resume {
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remotePathFull); isProcessed {
atomic.AddInt64(&uploadedBytes, processedSize)
return nil
remoteComplete, remoteSize, validationErr := b.isRemotePartComplete(ctx, remotePath, backupPath, partFiles)
if validationErr != nil {
return errors.WithMessage(validationErr, "isRemotePartComplete")
}
if remoteComplete {
if remoteSize > 0 {
processedSize = remoteSize
}
atomic.AddInt64(&uploadedBytes, processedSize)
return nil
}
log.Warn().Msgf("resume state marked %s as processed but remote part %s is incomplete, uploading again", remotePathFull, remotePath)
}
}
log.Debug().Msgf("start upload %d files to %s", len(partFiles), remotePath)
Expand Down Expand Up @@ -620,8 +630,14 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet
dataGroup.Go(func() error {
if b.resume {
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteDataFile); isProcessed {
atomic.AddInt64(&uploadedBytes, processedSize)
return nil
if remoteFile, statErr := b.dst.StatFile(ctx, remoteDataFile); statErr == nil {
if remoteFile.Size() > 0 {
processedSize = remoteFile.Size()
}
atomic.AddInt64(&uploadedBytes, processedSize)
return nil
}
log.Warn().Msgf("resume state marked %s as processed but remote archive is missing, uploading again", remoteDataFile)
}
}
log.Debug().Msgf("start upload %d files to %s", len(localFiles), remoteDataFile)
Expand Down Expand Up @@ -668,6 +684,46 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet
return uploadedFiles, uploadedParts, uploadedBytes, nil
}

func (b *Backuper) isRemotePartComplete(ctx context.Context, remoteBasePath, localBasePath string, expectedFiles []string) (bool, int64, error) {
if b.dst == nil || len(expectedFiles) == 0 {
return false, 0, nil
}
remoteFiles := make(map[string]int64)
walkErr := b.dst.Walk(ctx, remoteBasePath, true, func(ctx context.Context, f storage.RemoteFile) error {
if b.dst.Kind() == "SFTP" && (f.Name() == "." || f.Name() == "..") {
return nil
}
remoteFiles[strings.TrimPrefix(f.Name(), "/")] = f.Size()
return nil
})
if walkErr != nil {
log.Debug().Msgf("resume validation: can't walk remote part %s: %v", remoteBasePath, walkErr)
return false, 0, nil
}

var remoteSize int64
for _, expectedFile := range expectedFiles {
localFilePath := path.Join(localBasePath, expectedFile)
info, err := os.Stat(localFilePath)
if err != nil {
if os.IsNotExist(err) {
return false, 0, nil
}
return false, 0, errors.Wrapf(err, "stat local upload part file %s", localFilePath)
}
if !info.Mode().IsRegular() {
return false, 0, nil
}
remoteFileName := strings.TrimPrefix(expectedFile, "/")
remoteFileSize, ok := remoteFiles[remoteFileName]
if !ok || remoteFileSize != info.Size() {
return false, 0, nil
}
remoteSize += remoteFileSize
}
return true, remoteSize, nil
}

func (b *Backuper) uploadTableMetadata(ctx context.Context, backupName string, requiredBackupName string, tableMetadata *metadata.TableMetadata) (int64, error) {
if b.isEmbedded {
var sqlSize, jsonSize int64
Expand Down