diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 5fd19dc1a4..8689c3894e 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -1,3 +1,4 @@ +from collections import defaultdict from pathlib import Path from ._common import with_repository @@ -19,7 +20,7 @@ class ArchiveGarbageCollector: - def __init__(self, repository, manifest, *, stats, iec): + def __init__(self, repository, manifest, *, stats, iec, threshold, dry_run=False): self.repository = repository assert isinstance(repository, Repository) self.manifest = manifest @@ -28,7 +29,9 @@ def __init__(self, repository, manifest, *, stats, iec): self.total_size = None # overall size of source file content data written to all archives self.archives_count = None # number of archives self.stats = stats # compute repo space usage before/after - lists all repo objects, can be slow. + self.threshold = threshold # rewrite a mixed pack only when its wasted-bytes fraction reaches this percent self.iec = iec # formats statistics using IEC units (1KiB = 1024B) + self.dry_run = dry_run @property def repository_size(self): @@ -43,20 +46,17 @@ def garbage_collect(self): logger.info("Computing object IDs used by archives...") (self.missing_chunks, self.total_files, self.total_size, self.archives_count) = self.analyze_archives() self.report_and_delete() - self.save_chunk_index() - self.cleanup_files_cache() + if not self.dry_run: + self.save_chunk_index() + self.cleanup_files_cache() logger.info("Finished compaction / garbage collection...") def get_repository_chunks(self) -> ChunkIndex: """return a chunks index""" - if self.stats: - # slow but thorough: scan the pack headers for real sizes/locations and to catch objects - # missing from the cached index. Start unused (F_NONE); analyze_archives marks used ones. - logger.info("Getting object IDs present in the repository...") - chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, init_flags=ChunkIndex.F_NONE) - else: # faster: rely on existing chunks index (with flags F_NONE and size 0). - logger.info("Getting object IDs from cached chunks index...") - chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) + # The cached index already has each object's obj_size and starts entries as F_NONE, so it + # serves both GC and --stats; no need to force the slow pack-header scan just to get sizes. + logger.info("Getting object IDs from the cached chunks index...") + chunks = build_chunkindex_from_repo(self.repository, cache_immediately=not self.dry_run) return chunks def save_chunk_index(self): @@ -127,7 +127,6 @@ def use_it(id): ) total_size, total_files = 0, 0 for i, info in enumerate(archive_infos): - pi.show(i) logger.info( f"Analyzing archive {info.name} {info.ts.astimezone()} {bin_to_hex(info.id)} ({i + 1}/{num_archives})" ) @@ -145,6 +144,7 @@ def use_it(id): for id, size in item.chunks: total_size += size # original, uncompressed file content size use_it(id) + pi.show(i + 1) # report after each archive, so the last one lands on 100% pi.finish() return missing_chunks, total_files, total_size, num_archives @@ -154,41 +154,18 @@ def report_and_delete(self): for id in sorted(self.missing_chunks): logger.debug(f"Missing object {bin_to_hex(id)}") set_ec(EXIT_ERROR) - - logger.info("Cleaning archives directory from soft-deleted archives...") - archive_infos = self.manifest.archives.list(sort_by=["ts"], deleted=True) - for archive_info in archive_infos: - name, id, hex_id = archive_info.name, archive_info.id, bin_to_hex(archive_info.id) - try: - self.manifest.archives.nuke_by_id(id) - except self.repository.ObjectNotFound: - logger.warning(f"Soft-deleted archive {name} {hex_id} not found.") + if not self.dry_run: # nuking removes the soft-deleted archives from the archives directory; skip on a dry run + logger.info("Cleaning archives directory from soft-deleted archives...") + archive_infos = self.manifest.archives.list(sort_by=["ts"], deleted=True) + for archive_info in archive_infos: + name, id, hex_id = archive_info.name, archive_info.id, bin_to_hex(archive_info.id) + try: + self.manifest.archives.nuke_by_id(id) + except self.repository.ObjectNotFound: + logger.warning(f"Soft-deleted archive {name} {hex_id} not found.") repo_size_before = self.repository_size - logger.info("Determining unused objects...") - unused = set() - for id, entry in self.chunks.iteritems(): - if not (entry.flags & ChunkIndex.F_USED): - unused.add(id) - logger.info(f"Deleting {len(unused)} unused objects...") - if unused: - # Before deleting any repository object, invalidate all centrally cached chunk indexes. - # Otherwise, if we get interrupted within the deletion loop, the still-existing index/* - # would claim that already-deleted objects are still present. A later "borg create" would then - # trust that stale index, not re-upload the affected chunks and silently create an archive with - # dangling object references (see issue #9748). By removing the cached indexes first, an - # interruption is conservative: clients must rebuild the index from actual repository contents - # and will re-upload any deleted data. save_chunk_index() writes a fresh, valid index afterwards. - delete_chunkindex_from_repo(self.repository) - pi = ProgressIndicatorPercent( - total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete" - ) - for i, id in enumerate(unused): - pi.show(i) - # N=1: the chunk is alone in its pack, so dropping the pack frees just it; N>1 needs compaction. - self.repository.store_delete("packs/" + bin_to_hex(self.chunks[id].pack_id)) - del self.chunks[id] - pi.finish() + self.compact_packs() repo_size_after = self.repository_size count = len(self.chunks) @@ -202,20 +179,104 @@ def report_and_delete(self): f"Repository size is {format_file_size(repo_size_after, precision=0, iec=self.iec)} " f"in {count} objects." ) - logger.info( - f"Compaction saved " - f"{format_file_size(repo_size_before - repo_size_after, precision=0, iec=self.iec)}." - ) + if not self.dry_run: # nothing was deleted on a dry run, so before == after + logger.info( + f"Compaction saved " + f"{format_file_size(repo_size_before - repo_size_after, precision=0, iec=self.iec)}." + ) else: logger.info(f"Repository has data stored in {count} objects.") + def compact_packs(self): + """Free space one pack at a time (the store can only delete whole packs). + + analyze_archives() has flagged the used objects F_USED. Per pack: + + - all objects unused -> delete the pack. + - all objects used -> keep it. + - mixed -> rewrite only if the unused bytes reach --threshold percent: copy the + used objects into a new pack via compact_pack and drop the old one. + Below the threshold keep the pack, so we don't rewrite a large pack + to reclaim little. + + Two passes bound the memory use: the first keeps only per-pack byte counts to pick the packs + to change, the second collects object ids for just those packs, not the whole index. + """ + # Pass 1: one index scan, keep only per-pack byte tallies (two ints per pack, no id lists). + pack_total, pack_unused = defaultdict(int), defaultdict(int) + for id, entry in self.chunks.iteritems(): + pid = entry.pack_id + pack_total[pid] += entry.obj_size + if not (entry.flags & ChunkIndex.F_USED): + pack_unused[pid] += entry.obj_size + + # decide each pack's fate from the tallies + drop_packs, rewrite_packs = set(), set() + for pid, total in pack_total.items(): + unused = pack_unused.get(pid, 0) # .get, not [pid]: don't insert all-used packs into the dict + if unused == 0: + continue # all used -> leave alone + if unused == total: + drop_packs.add(pid) # all unused -> drop the whole file + elif 100 * unused / total >= self.threshold: + rewrite_packs.add(pid) # mixed and wasteful enough -> copy survivors forward + # else: mixed but below threshold -> leave alone + if self.dry_run: + freed = sum(pack_unused[pid] for pid in drop_packs) + sum(pack_unused[pid] for pid in rewrite_packs) + logger.info( + f"Would free {format_file_size(freed, iec=self.iec)} " + f"by dropping {len(drop_packs)} packs and rewriting {len(rewrite_packs)} packs." + ) + return # dry run: report only, change nothing + if not drop_packs and not rewrite_packs: + logger.info("Deleting 0 unused objects...") + return # nothing to reclaim; do not touch the cached chunk indexes + + # crash-safety (#9748): invalidate cached chunk indexes before the first store change + delete_chunkindex_from_repo(self.repository) + + # Pass 2: collect object ids only for the affected packs (a subset, not the whole index) + keep = {pid: set() for pid in rewrite_packs} # survivors to copy forward, per pack + drop = {pid: set() for pid in rewrite_packs} # unused objects in those same packs + forget = [] # ids living in fully-unused packs we delete outright + for id, entry in self.chunks.iteritems(): + pid = entry.pack_id + if pid in rewrite_packs: + (keep if entry.flags & ChunkIndex.F_USED else drop)[pid].add(id) + elif pid in drop_packs: + forget.append(id) + + # count what we remove: every object of a dropped pack, plus the unused objects cut from + # rewritten packs. unused objects in below-threshold packs stay, so they don't count. + deleted = len(forget) + sum(len(ids) for ids in drop.values()) + logger.info(f"Deleting {deleted} unused objects...") + pi = ProgressIndicatorPercent( + total=len(drop_packs) + len(rewrite_packs), + msg="Compacting packs %3.1f%%", + step=0.1, + msgid="compact.compact_packs", + ) + progress = 0 + for pid in drop_packs: + self.repository.store_delete("packs/" + bin_to_hex(pid)) + progress += 1 + pi.show(progress) # report after the work, so the final pack lands on 100% + for id in forget: + del self.chunks[id] # their pack file is gone, so drop their index entries too + for pid in rewrite_packs: + self.repository.compact_pack(pid, keep_ids=keep[pid], drop_ids=drop[pid]) # helper owns index update + progress += 1 + pi.show(progress) + pi.finish() + class CompactMixIn: @with_repository(exclusive=True, compatibility=(Manifest.Operation.DELETE,)) def do_compact(self, args, repository, manifest): """Collects garbage in the repository.""" - if not args.dry_run: # support --dry-run to simplify scripting - ArchiveGarbageCollector(repository, manifest, stats=args.stats, iec=args.iec).garbage_collect() + ArchiveGarbageCollector( + repository, manifest, stats=args.stats, iec=args.iec, threshold=args.threshold, dry_run=args.dry_run + ).garbage_collect() def build_parser_compact(self, subparsers, common_parser, mid_common_parser): from ._common import process_epilog @@ -251,22 +312,27 @@ def build_parser_compact(self, subparsers, common_parser, mid_common_parser): seeing fatal errors when creating backups or when archives are missing in ``borg repo-list``). - When using the ``--stats`` option, borg will internally list all repository - objects to determine their existence and stored size. It will build a fresh - chunks index from that information and cache it in the repository. For some - types of repositories, this might be very slow. It will tell you the sum of - stored object sizes, before and after compaction. - - Without ``--stats``, borg will rely on the cached chunks index to determine - existing object IDs (but there is no stored size information in the index, - thus it cannot compute before/after compaction size statistics). + With ``--stats``, borg additionally reports the sum of stored object sizes + before and after compaction. """ ) subparser = ArgumentParser(parents=[common_parser], description=self.do_compact.__doc__, epilog=compact_epilog) subparsers.add_subcommand("compact", subparser, help="compact the repository") subparser.add_argument( - "-n", "--dry-run", dest="dry_run", action="store_true", help="do not change the repository" + "-n", + "--dry-run", + dest="dry_run", + action="store_true", + help="do not change the repository, just show what compact would free", + ) + subparser.add_argument( + "-s", "--stats", dest="stats", action="store_true", help="print repository size statistics" ) subparser.add_argument( - "-s", "--stats", dest="stats", action="store_true", help="print statistics (might be much slower)" + "--threshold", + metavar="PERCENT", + dest="threshold", + type=int, + default=10, + help="rewrite a pack when at least PERCENT of its bytes are unused (default: 10)", ) diff --git a/src/borg/testsuite/archiver/compact_cmd_test.py b/src/borg/testsuite/archiver/compact_cmd_test.py index ffcc802c05..c0d3f94829 100644 --- a/src/borg/testsuite/archiver/compact_cmd_test.py +++ b/src/borg/testsuite/archiver/compact_cmd_test.py @@ -4,11 +4,14 @@ import pytest from ...constants import * # NOQA -from ...helpers import get_cache_dir +from ...helpers import get_cache_dir, bin_to_hex +from ...hashindex import ChunkIndex +from ...repository import Repository from ...cache import files_cache_name, discover_files_cache_names, list_chunkindex_hashes from ...manifest import Manifest from . import cmd, create_regular_file, create_src_archive, generate_archiver_tests, open_repository, RK_ENCRYPTION from . import changedir +from ..repository_test import H, fchunk, pdchunk pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -119,7 +122,7 @@ def test_compact_interrupted_does_not_poison_chunk_index(archivers, request, mon repository = open_repository(archiver) with repository: manifest = Manifest.load(repository, (Manifest.Operation.DELETE,)) - gc = ArchiveGarbageCollector(repository, manifest, stats=stats, iec=False) + gc = ArchiveGarbageCollector(repository, manifest, stats=stats, iec=False, threshold=40.0) def interrupt(): raise KeyboardInterrupt("simulated interruption before save_chunk_index") @@ -143,6 +146,63 @@ def interrupt(): assert fd.read() == content +def test_compact_packs_respects_threshold(tmp_path): + # Two multi-object packs in one repo, then pack-level compaction at a 40% threshold. The pack that + # wastes 2/3 of its bytes is rewritten down to its single survivor (and its old file deleted); the + # pack that wastes only 1/3 stays untouched, since copying its survivors to reclaim that little is + # not worth it. This covers the rewrite, leave-alone and keep/drop split that N=1 can't reach. + from ...archiver.compact_cmd import ArchiveGarbageCollector + + location = os.fspath(tmp_path / "repo") + with Repository(location, exclusive=True, create=True) as repository: + repository._pack_writer.max_count = 4 # buffer several objects, so each flush() writes one pack + for i in range(3): # H0..H2 -> wasteful pack + repository.put(H(i), fchunk(f"DATA{i}".encode(), chunk_id=H(i))) + repository.flush() + for i in range(3, 6): # H3..H5 -> frugal pack + repository.put(H(i), fchunk(f"DATA{i}".encode(), chunk_id=H(i))) + repository.flush() + + wasteful_pack = repository.chunks[H(0)].pack_id + frugal_pack = repository.chunks[H(3)].pack_id + # wasteful pack keeps only H0 (2/3 wasted), frugal pack keeps H3 and H4 (1/3 wasted) + used = {H(0), H(3), H(4)} + for i in range(6): + entry = repository.chunks[H(i)] + flags = ChunkIndex.F_USED if H(i) in used else ChunkIndex.F_NONE + repository.chunks[H(i)] = entry._replace(flags=flags) + + gc = ArchiveGarbageCollector(repository, manifest=None, stats=False, iec=False, threshold=40) + gc.chunks = repository.chunks + gc.compact_packs() + + # wasteful pack was rewritten: the survivor reads back, the dropped objects and the old file are gone + assert pdchunk(repository.get(H(0))) == b"DATA0" + assert repository.get(H(1), raise_missing=False) is None + assert repository.get(H(2), raise_missing=False) is None + # frugal pack stayed below threshold: untouched, every object (even the unused H5) still present + for i in range(3, 6): + assert pdchunk(repository.get(H(i))) == f"DATA{i}".encode() + pack_names = [info.name for info in repository.store_list("packs")] + assert bin_to_hex(wasteful_pack) not in pack_names + assert bin_to_hex(frugal_pack) in pack_names + + +def test_compact_dry_run_reports_and_changes_nothing(archivers, request): + # --dry-run prints the would-free estimate (issue #9379) and must not touch the repository. + archiver = request.getfixturevalue(archivers) + cmd(archiver, "repo-create", RK_ENCRYPTION) + create_src_archive(archiver, "archive") + cmd(archiver, "delete", "-a", "archive") + + out = cmd(archiver, "compact", "--dry-run", "-v", exit_code=0) + assert "Would free" in out # the estimate is reported + + # proof nothing was removed: a real compact afterwards still finds the unused objects to delete + out2 = cmd(archiver, "compact", "-v", exit_code=0) + assert "Deleting 0 unused objects" not in out2 + + def test_compact_files_cache_cleanup(archivers, request): """Test that files cache files for deleted archives are removed during compact.""" archiver = request.getfixturevalue(archivers)