diff --git a/pulpcore/app/tasks/migrate.py b/pulpcore/app/tasks/migrate.py index dd15adc1ed..02286b3a80 100644 --- a/pulpcore/app/tasks/migrate.py +++ b/pulpcore/app/tasks/migrate.py @@ -1,6 +1,8 @@ import logging +from concurrent.futures import ThreadPoolExecutor, as_completed from gettext import gettext as _ +from django.conf import settings from django.utils.timezone import now from rest_framework.serializers import ValidationError @@ -10,6 +12,35 @@ _logger = logging.getLogger(__name__) +MIGRATION_WORKERS = getattr(settings, "MIGRATION_WORKERS", 8) +MIGRATION_BATCH_SIZE = getattr(settings, "MIGRATION_BATCH_SIZE", 500) + + +def _copy_artifact(old_storage, new_storage, filename): + """Copy a single artifact between storage backends. Returns (filename, error).""" + if new_storage.exists(filename): + return filename, None + try: + file = old_storage.open(filename) + except FileNotFoundError: + return filename, FileNotFoundError(filename) + try: + new_storage.save(filename, file) + finally: + file.close() + return filename, None + + +def _process_batch(batch, pb): + """Wait for a batch of copy futures, increment progress, collect errors.""" + errors = [] + for future in as_completed(batch): + filename, error = future.result() + if error is not None: + errors.append(filename) + pb.increment() + return errors + def migrate_backend(data): """ @@ -25,25 +56,28 @@ def migrate_backend(data): artifacts = Artifact.objects.filter(pulp_domain=domain) date = now() + missing = [] with ProgressReport( message=_("Migrating Artifacts"), code="migrate", total=artifacts.count() ) as pb: while True: - for digest in pb.iter(artifacts.values_list("sha256", flat=True)): - filename = storage.get_artifact_path(digest) - if not new_storage.exists(filename): - try: - file = old_storage.open(filename) - except FileNotFoundError: - raise ValidationError( - _( - "Found missing file for artifact(sha256={}). Please run the repair " - "task or delete the offending artifact." - ).format(digest) - ) - new_storage.save(filename, file) - file.close() - # Handle new artifacts saved by the content app + batch = [] + with ThreadPoolExecutor(max_workers=MIGRATION_WORKERS) as executor: + for digest in artifacts.values_list("sha256", flat=True): + filename = storage.get_artifact_path(digest) + future = executor.submit( + _copy_artifact, old_storage, new_storage, filename, + ) + batch.append(future) + + if len(batch) >= MIGRATION_BATCH_SIZE: + missing.extend(_process_batch(batch, pb)) + batch = [] + + if batch: + missing.extend(_process_batch(batch, pb)) + + # Handle new artifacts saved by the content app during migration artifacts = Artifact.objects.filter(pulp_domain=domain, pulp_created__gte=date) if count := artifacts.count(): pb.total += count @@ -52,6 +86,14 @@ def migrate_backend(data): continue break + if missing: + raise ValidationError( + _( + "Found missing file(s) for {} artifact(s). Please run the repair " + "task or delete the offending artifacts. First missing: {}" + ).format(len(missing), missing[0]) + ) + # Update the current domain to the new storage backend settings msg = _("Update Domain({domain})'s Backend Settings").format(domain=domain.name) with ProgressReport(message=msg, code="update", total=1) as pb: