Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 128 additions & 62 deletions src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from pathlib import Path

from ._common import with_repository
Expand All @@ -19,7 +20,7 @@


class ArchiveGarbageCollector:
def __init__(self, repository, manifest, *, stats, iec):
def __init__(self, repository, manifest, *, stats, iec, threshold, dry_run=False):
self.repository = repository
assert isinstance(repository, Repository)
self.manifest = manifest
Expand All @@ -28,7 +29,9 @@ def __init__(self, repository, manifest, *, stats, iec):
self.total_size = None # overall size of source file content data written to all archives
self.archives_count = None # number of archives
self.stats = stats # compute repo space usage before/after - lists all repo objects, can be slow.
self.threshold = threshold # rewrite a mixed pack only when its wasted-bytes fraction reaches this percent
self.iec = iec # formats statistics using IEC units (1KiB = 1024B)
self.dry_run = dry_run

@property
def repository_size(self):
Expand All @@ -43,20 +46,17 @@ def garbage_collect(self):
logger.info("Computing object IDs used by archives...")
(self.missing_chunks, self.total_files, self.total_size, self.archives_count) = self.analyze_archives()
self.report_and_delete()
self.save_chunk_index()
self.cleanup_files_cache()
if not self.dry_run:
self.save_chunk_index()
self.cleanup_files_cache()
logger.info("Finished compaction / garbage collection...")

def get_repository_chunks(self) -> ChunkIndex:
"""return a chunks index"""
if self.stats:
# slow but thorough: scan the pack headers for real sizes/locations and to catch objects
# missing from the cached index. Start unused (F_NONE); analyze_archives marks used ones.
logger.info("Getting object IDs present in the repository...")
chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, init_flags=ChunkIndex.F_NONE)
else: # faster: rely on existing chunks index (with flags F_NONE and size 0).
logger.info("Getting object IDs from cached chunks index...")
chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
# The cached index already has each object's obj_size and starts entries as F_NONE, so it
# serves both GC and --stats; no need to force the slow pack-header scan just to get sizes.
logger.info("Getting object IDs from the cached chunks index...")
chunks = build_chunkindex_from_repo(self.repository, cache_immediately=not self.dry_run)
return chunks

def save_chunk_index(self):
Expand Down Expand Up @@ -127,7 +127,6 @@ def use_it(id):
)
total_size, total_files = 0, 0
for i, info in enumerate(archive_infos):
pi.show(i)
logger.info(
f"Analyzing archive {info.name} {info.ts.astimezone()} {bin_to_hex(info.id)} ({i + 1}/{num_archives})"
)
Expand All @@ -145,6 +144,7 @@ def use_it(id):
for id, size in item.chunks:
total_size += size # original, uncompressed file content size
use_it(id)
pi.show(i + 1) # report after each archive, so the last one lands on 100%
pi.finish()
return missing_chunks, total_files, total_size, num_archives

Expand All @@ -154,41 +154,18 @@ def report_and_delete(self):
for id in sorted(self.missing_chunks):
logger.debug(f"Missing object {bin_to_hex(id)}")
set_ec(EXIT_ERROR)

logger.info("Cleaning archives directory from soft-deleted archives...")
archive_infos = self.manifest.archives.list(sort_by=["ts"], deleted=True)
for archive_info in archive_infos:
name, id, hex_id = archive_info.name, archive_info.id, bin_to_hex(archive_info.id)
try:
self.manifest.archives.nuke_by_id(id)
except self.repository.ObjectNotFound:
logger.warning(f"Soft-deleted archive {name} {hex_id} not found.")
if not self.dry_run: # nuking removes the soft-deleted archives from the archives directory; skip on a dry run
logger.info("Cleaning archives directory from soft-deleted archives...")
archive_infos = self.manifest.archives.list(sort_by=["ts"], deleted=True)
for archive_info in archive_infos:
name, id, hex_id = archive_info.name, archive_info.id, bin_to_hex(archive_info.id)
try:
self.manifest.archives.nuke_by_id(id)
except self.repository.ObjectNotFound:
logger.warning(f"Soft-deleted archive {name} {hex_id} not found.")

repo_size_before = self.repository_size
logger.info("Determining unused objects...")
unused = set()
for id, entry in self.chunks.iteritems():
if not (entry.flags & ChunkIndex.F_USED):
unused.add(id)
logger.info(f"Deleting {len(unused)} unused objects...")
if unused:
# Before deleting any repository object, invalidate all centrally cached chunk indexes.
# Otherwise, if we get interrupted within the deletion loop, the still-existing index/*
# would claim that already-deleted objects are still present. A later "borg create" would then
# trust that stale index, not re-upload the affected chunks and silently create an archive with
# dangling object references (see issue #9748). By removing the cached indexes first, an
# interruption is conservative: clients must rebuild the index from actual repository contents
# and will re-upload any deleted data. save_chunk_index() writes a fresh, valid index afterwards.
delete_chunkindex_from_repo(self.repository)
pi = ProgressIndicatorPercent(
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
)
for i, id in enumerate(unused):
pi.show(i)
# N=1: the chunk is alone in its pack, so dropping the pack frees just it; N>1 needs compaction.
self.repository.store_delete("packs/" + bin_to_hex(self.chunks[id].pack_id))
del self.chunks[id]
pi.finish()
self.compact_packs()
repo_size_after = self.repository_size

count = len(self.chunks)
Expand All @@ -202,20 +179,104 @@ def report_and_delete(self):
f"Repository size is {format_file_size(repo_size_after, precision=0, iec=self.iec)} "
f"in {count} objects."
)
logger.info(
f"Compaction saved "
f"{format_file_size(repo_size_before - repo_size_after, precision=0, iec=self.iec)}."
)
if not self.dry_run: # nothing was deleted on a dry run, so before == after
logger.info(
f"Compaction saved "
f"{format_file_size(repo_size_before - repo_size_after, precision=0, iec=self.iec)}."
)
else:
logger.info(f"Repository has data stored in {count} objects.")

def compact_packs(self):
"""Free space one pack at a time (the store can only delete whole packs).

analyze_archives() has flagged the used objects F_USED. Per pack:

- all objects unused -> delete the pack.
- all objects used -> keep it.
- mixed -> rewrite only if the unused bytes reach --threshold percent: copy the
used objects into a new pack via compact_pack and drop the old one.
Below the threshold keep the pack, so we don't rewrite a large pack
to reclaim little.

Two passes bound the memory use: the first keeps only per-pack byte counts to pick the packs
to change, the second collects object ids for just those packs, not the whole index.
"""
# Pass 1: one index scan, keep only per-pack byte tallies (two ints per pack, no id lists).
pack_total, pack_unused = defaultdict(int), defaultdict(int)
for id, entry in self.chunks.iteritems():
pid = entry.pack_id
pack_total[pid] += entry.obj_size
if not (entry.flags & ChunkIndex.F_USED):
pack_unused[pid] += entry.obj_size

# decide each pack's fate from the tallies
drop_packs, rewrite_packs = set(), set()
for pid, total in pack_total.items():
unused = pack_unused.get(pid, 0) # .get, not [pid]: don't insert all-used packs into the dict
if unused == 0:
continue # all used -> leave alone
if unused == total:
drop_packs.add(pid) # all unused -> drop the whole file
elif 100 * unused / total >= self.threshold:
rewrite_packs.add(pid) # mixed and wasteful enough -> copy survivors forward
# else: mixed but below threshold -> leave alone
if self.dry_run:
freed = sum(pack_unused[pid] for pid in drop_packs) + sum(pack_unused[pid] for pid in rewrite_packs)
logger.info(
f"Would free {format_file_size(freed, iec=self.iec)} "
f"by dropping {len(drop_packs)} packs and rewriting {len(rewrite_packs)} packs."
)
return # dry run: report only, change nothing
if not drop_packs and not rewrite_packs:
logger.info("Deleting 0 unused objects...")
return # nothing to reclaim; do not touch the cached chunk indexes

# crash-safety (#9748): invalidate cached chunk indexes before the first store change
delete_chunkindex_from_repo(self.repository)

# Pass 2: collect object ids only for the affected packs (a subset, not the whole index)
keep = {pid: set() for pid in rewrite_packs} # survivors to copy forward, per pack
drop = {pid: set() for pid in rewrite_packs} # unused objects in those same packs
forget = [] # ids living in fully-unused packs we delete outright
for id, entry in self.chunks.iteritems():
pid = entry.pack_id
if pid in rewrite_packs:
(keep if entry.flags & ChunkIndex.F_USED else drop)[pid].add(id)
elif pid in drop_packs:
forget.append(id)

# count what we remove: every object of a dropped pack, plus the unused objects cut from
# rewritten packs. unused objects in below-threshold packs stay, so they don't count.
deleted = len(forget) + sum(len(ids) for ids in drop.values())
logger.info(f"Deleting {deleted} unused objects...")
pi = ProgressIndicatorPercent(
total=len(drop_packs) + len(rewrite_packs),
msg="Compacting packs %3.1f%%",
step=0.1,
msgid="compact.compact_packs",
)
progress = 0
for pid in drop_packs:
self.repository.store_delete("packs/" + bin_to_hex(pid))
progress += 1
pi.show(progress) # report after the work, so the final pack lands on 100%
for id in forget:
del self.chunks[id] # their pack file is gone, so drop their index entries too
for pid in rewrite_packs:
self.repository.compact_pack(pid, keep_ids=keep[pid], drop_ids=drop[pid]) # helper owns index update
progress += 1
pi.show(progress)
pi.finish()


class CompactMixIn:
@with_repository(exclusive=True, compatibility=(Manifest.Operation.DELETE,))
def do_compact(self, args, repository, manifest):
"""Collects garbage in the repository."""
if not args.dry_run: # support --dry-run to simplify scripting
ArchiveGarbageCollector(repository, manifest, stats=args.stats, iec=args.iec).garbage_collect()
ArchiveGarbageCollector(
repository, manifest, stats=args.stats, iec=args.iec, threshold=args.threshold, dry_run=args.dry_run
).garbage_collect()

def build_parser_compact(self, subparsers, common_parser, mid_common_parser):
from ._common import process_epilog
Expand Down Expand Up @@ -251,22 +312,27 @@ def build_parser_compact(self, subparsers, common_parser, mid_common_parser):
seeing fatal errors when creating backups or when archives are missing in
``borg repo-list``).

When using the ``--stats`` option, borg will internally list all repository
objects to determine their existence and stored size. It will build a fresh
chunks index from that information and cache it in the repository. For some
types of repositories, this might be very slow. It will tell you the sum of
stored object sizes, before and after compaction.

Without ``--stats``, borg will rely on the cached chunks index to determine
existing object IDs (but there is no stored size information in the index,
thus it cannot compute before/after compaction size statistics).
With ``--stats``, borg additionally reports the sum of stored object sizes
before and after compaction.
"""
)
subparser = ArgumentParser(parents=[common_parser], description=self.do_compact.__doc__, epilog=compact_epilog)
subparsers.add_subcommand("compact", subparser, help="compact the repository")
subparser.add_argument(
"-n", "--dry-run", dest="dry_run", action="store_true", help="do not change the repository"
"-n",
"--dry-run",
dest="dry_run",
action="store_true",
help="do not change the repository, just show what compact would free",
)
subparser.add_argument(
"-s", "--stats", dest="stats", action="store_true", help="print repository size statistics"
)
subparser.add_argument(
"-s", "--stats", dest="stats", action="store_true", help="print statistics (might be much slower)"
"--threshold",
metavar="PERCENT",
dest="threshold",
type=int,
default=10,
help="rewrite a pack when at least PERCENT of its bytes are unused (default: 10)",
)
Comment thread
mr-raj12 marked this conversation as resolved.
64 changes: 62 additions & 2 deletions src/borg/testsuite/archiver/compact_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import pytest

from ...constants import * # NOQA
from ...helpers import get_cache_dir
from ...helpers import get_cache_dir, bin_to_hex
from ...hashindex import ChunkIndex
from ...repository import Repository
from ...cache import files_cache_name, discover_files_cache_names, list_chunkindex_hashes
from ...manifest import Manifest
from . import cmd, create_regular_file, create_src_archive, generate_archiver_tests, open_repository, RK_ENCRYPTION
from . import changedir
from ..repository_test import H, fchunk, pdchunk

pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA

Expand Down Expand Up @@ -119,7 +122,7 @@ def test_compact_interrupted_does_not_poison_chunk_index(archivers, request, mon
repository = open_repository(archiver)
with repository:
manifest = Manifest.load(repository, (Manifest.Operation.DELETE,))
gc = ArchiveGarbageCollector(repository, manifest, stats=stats, iec=False)
gc = ArchiveGarbageCollector(repository, manifest, stats=stats, iec=False, threshold=40.0)

def interrupt():
raise KeyboardInterrupt("simulated interruption before save_chunk_index")
Expand All @@ -143,6 +146,63 @@ def interrupt():
assert fd.read() == content


def test_compact_packs_respects_threshold(tmp_path):
# Two multi-object packs in one repo, then pack-level compaction at a 40% threshold. The pack that
# wastes 2/3 of its bytes is rewritten down to its single survivor (and its old file deleted); the
# pack that wastes only 1/3 stays untouched, since copying its survivors to reclaim that little is
# not worth it. This covers the rewrite, leave-alone and keep/drop split that N=1 can't reach.
from ...archiver.compact_cmd import ArchiveGarbageCollector

location = os.fspath(tmp_path / "repo")
with Repository(location, exclusive=True, create=True) as repository:
repository._pack_writer.max_count = 4 # buffer several objects, so each flush() writes one pack
for i in range(3): # H0..H2 -> wasteful pack
repository.put(H(i), fchunk(f"DATA{i}".encode(), chunk_id=H(i)))
repository.flush()
for i in range(3, 6): # H3..H5 -> frugal pack
repository.put(H(i), fchunk(f"DATA{i}".encode(), chunk_id=H(i)))
repository.flush()

wasteful_pack = repository.chunks[H(0)].pack_id
frugal_pack = repository.chunks[H(3)].pack_id
# wasteful pack keeps only H0 (2/3 wasted), frugal pack keeps H3 and H4 (1/3 wasted)
used = {H(0), H(3), H(4)}
for i in range(6):
entry = repository.chunks[H(i)]
flags = ChunkIndex.F_USED if H(i) in used else ChunkIndex.F_NONE
repository.chunks[H(i)] = entry._replace(flags=flags)

gc = ArchiveGarbageCollector(repository, manifest=None, stats=False, iec=False, threshold=40)
gc.chunks = repository.chunks
gc.compact_packs()

# wasteful pack was rewritten: the survivor reads back, the dropped objects and the old file are gone
assert pdchunk(repository.get(H(0))) == b"DATA0"
assert repository.get(H(1), raise_missing=False) is None
assert repository.get(H(2), raise_missing=False) is None
# frugal pack stayed below threshold: untouched, every object (even the unused H5) still present
for i in range(3, 6):
assert pdchunk(repository.get(H(i))) == f"DATA{i}".encode()
pack_names = [info.name for info in repository.store_list("packs")]
assert bin_to_hex(wasteful_pack) not in pack_names
assert bin_to_hex(frugal_pack) in pack_names


def test_compact_dry_run_reports_and_changes_nothing(archivers, request):
# --dry-run prints the would-free estimate (issue #9379) and must not touch the repository.
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
create_src_archive(archiver, "archive")
cmd(archiver, "delete", "-a", "archive")

out = cmd(archiver, "compact", "--dry-run", "-v", exit_code=0)
assert "Would free" in out # the estimate is reported

# proof nothing was removed: a real compact afterwards still finds the unused objects to delete
out2 = cmd(archiver, "compact", "-v", exit_code=0)
assert "Deleting 0 unused objects" not in out2


def test_compact_files_cache_cleanup(archivers, request):
"""Test that files cache files for deleted archives are removed during compact."""
archiver = request.getfixturevalue(archivers)
Expand Down
Loading