Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pulp_file/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def synchronize(remote_pk, repository_pk, mirror, optimize=False, url=None, **kw
optimize = optimize or settings.FILE_SYNC_OPTIMIZATION

if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))
raise SyncError(_("A remote must have a url specified to synchronize."))

if isinstance(remote, FileGitRemote):
first_stage = GitFirstStage(remote)
Expand Down
55 changes: 30 additions & 25 deletions pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from django.db.models import Min
from pulp_glue.common import __version__ as pulp_glue_version
from pulp_glue.common.context import PluginRequirement
from requests.exceptions import SSLError

from pulpcore.app.apps import PulpAppConfig, pulp_plugin_configs
from pulpcore.app.models import Distribution, Repository, Task, TaskGroup, UpstreamPulp
from pulpcore.app.replica import ReplicaContext
from pulpcore.constants import TASK_STATES
from pulpcore.exceptions import SslConnectionError
from pulpcore.tasking.tasks import dispatch


Expand Down Expand Up @@ -79,32 +81,35 @@ def replicate_distributions(server_pk):

distro_repo_pairs = []
for replicator in supported_replicators:
distros = replicator.upstream_distributions(q=server.q_select)
distro_names = []
for distro in distros:
# Create remote
remote = replicator.create_or_update_remote(upstream_distribution=distro)
if not remote:
# The upstream distribution is not serving any content,
# let if fall through the cracks and be cleanup below.
continue
# Check if there is already a repository
repository = replicator.create_or_update_repository(remote=remote)
if not repository:
# No update occurred because server.policy==LABELED and there was
# an already existing local repository with the same name
continue

# Dispatch a sync task if needed
if replicator.requires_syncing(distro):
replicator.sync(repository, remote)

# Get or create a distribution
replicator.create_or_update_distribution(repository, distro)

# Add name to the list of known distribution names
distro_names.append(distro["name"])
distro_repo_pairs.append((distro["name"], str(repository.pk)))
try:
distros = replicator.upstream_distributions(q=server.q_select)
for distro in distros:
# Create remote
remote = replicator.create_or_update_remote(upstream_distribution=distro)
if not remote:
# The upstream distribution is not serving any content,
# let if fall through the cracks and be cleanup below.
continue
# Check if there is already a repository
repository = replicator.create_or_update_repository(remote=remote)
if not repository:
# No update occurred because server.policy==LABELED and there was
# an already existing local repository with the same name
continue

# Dispatch a sync task if needed
if replicator.requires_syncing(distro):
replicator.sync(repository, remote)

# Get or create a distribution
replicator.create_or_update_distribution(repository, distro)

# Add name to the list of known distribution names
distro_names.append(distro["name"])
distro_repo_pairs.append((distro["name"], str(repository.pk)))
except SSLError as e:
raise SslConnectionError(url=server.base_url, details=str(e))

replicator.remove_missing(distro_names)

Expand Down
9 changes: 2 additions & 7 deletions pulpcore/app/tasks/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

from asgiref.sync import sync_to_async
from django.db import transaction
from rest_framework.serializers import ValidationError

from pulpcore.app import models
from pulpcore.app.models import ProgressReport
from pulpcore.app.util import get_domain
from pulpcore.exceptions.base import RepositoryVersionDeleteError

log = getLogger(__name__)

Expand Down Expand Up @@ -44,12 +44,7 @@ def delete_version(pk):
return

if version.repository.versions.complete().count() <= 1:
raise ValidationError(
_(
"Cannot delete repository version. Repositories must have at least one "
"repository version."
)
)
raise RepositoryVersionDeleteError()

log.info(
"Deleting and squashing version {num} of repository '{repo}'".format(
Expand Down
33 changes: 27 additions & 6 deletions pulpcore/download/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

from pulpcore.exceptions import (
DigestValidationError,
DnsDomainNameException,
HttpResponseError,
ProxyAuthenticationError,
RemoteConnectionError,
SizeValidationError,
SslConnectionError,
TimeoutException,
)

Expand Down Expand Up @@ -236,6 +241,7 @@ async def run(self, extra_data=None):
aiohttp.ClientPayloadError,
aiohttp.ClientResponseError,
aiohttp.ServerDisconnectedError,
DnsDomainNameException,
TimeoutError,
TimeoutException,
DigestValidationError,
Expand Down Expand Up @@ -269,9 +275,21 @@ async def download_wrapper():
e.message,
)
)
raise e
raise ProxyAuthenticationError(self.proxy)

return await download_wrapper()
try:
return await download_wrapper()
except aiohttp.ClientResponseError as e:
raise HttpResponseError(url=self.url, status=e.status, message=e.message)
except aiohttp.ClientConnectorSSLError as e:
raise SslConnectionError(url=self.url, details=str(e))
except (
aiohttp.ClientConnectorError,
aiohttp.ClientOSError,
aiohttp.ClientPayloadError,
aiohttp.ServerDisconnectedError,
) as e:
raise RemoteConnectionError(url=self.url, details=str(e))

async def _run(self, extra_data=None):
"""
Expand All @@ -296,10 +314,13 @@ async def _run(self, extra_data=None):
}
if extra_data and extra_data.get("request_kwargs"):
request_kwargs.update(extra_data["request_kwargs"])
async with self.session.get(self.url, **request_kwargs) as response:
self.raise_for_status(response)
to_return = await self._handle_response(response)
await response.release()
try:
async with self.session.get(self.url, **request_kwargs) as response:
self.raise_for_status(response)
to_return = await self._handle_response(response)
await response.release()
except aiohttp.ClientConnectorDNSError:
raise DnsDomainNameException(self.url)
if self._close_session_on_finalize:
await self.session.close()
return to_return
Expand Down
5 changes: 5 additions & 0 deletions pulpcore/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
ReplicateError,
SyncError,
PublishError,
TaskConfigurationError,
TaskTimeoutException,
HttpResponseError,
SslConnectionError,
RemoteConnectionError,
)
from .validation import (
DigestValidationError,
Expand Down
104 changes: 104 additions & 0 deletions pulpcore/exceptions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,107 @@ class ReplicateError(PulpException):

def __str__(self):
return f"[{self.error_code}] " + _("Replication failed")


class TaskConfigurationError(PulpException):
"""
Raised when a task is incorrectly configured.
"""

error_code = "PLP0023"
Comment thread
aKlimau marked this conversation as resolved.

def __init__(self, task_name, message):
"""
:param task_name: the fully qualified name of the task function
:type task_name: str
:param message: description of the configuration error
:type message: str
"""
self.task_name = task_name
self.message = message

def __str__(self):
return f"[{self.error_code}] " + _(
"Task type '{task_name}' is misconfigured: {message}"
).format(task_name=self.task_name, message=self.message)


class TaskTimeoutException(PulpException):
"""
Raised when an immediate task exceeds its execution timeout.
"""

error_code = "PLP0024"

def __init__(self, task_name, task_pk, timeout_seconds):
"""
:param task_name: the fully qualified name of the task function
:type task_name: str
:param task_pk: the unique task identifier
:type task_pk: str
:param timeout_seconds: the timeout value that was exceeded
:type timeout_seconds: int
"""
self.task_name = task_name
self.task_pk = task_pk
self.timeout_seconds = timeout_seconds

def __str__(self):
return f"[{self.error_code}] " + _(
"Immediate task {task_pk} (type: {task_name}) timed out after {timeout} seconds."
).format(task_pk=self.task_pk, task_name=self.task_name, timeout=self.timeout_seconds)


class HttpResponseError(PulpException):
"""
Raised when a remote server returns an HTTP error response after retries are exhausted.
"""

error_code = "PLP0025"

def __init__(self, url, status, message):
super().__init__()
self.url = url
self.status = status
self.message = message

def __str__(self):
return f"[{self.error_code}] " + _(
"HTTP error {status} when downloading {url}: {message}"
).format(url=self.url, status=self.status, message=self.message)


class SslConnectionError(PulpException):
"""
Raised when an SSL/TLS connection fails after retries are exhausted.
"""

error_code = "PLP0026"

def __init__(self, url, details):
super().__init__()
self.url = url
self.details = details

def __str__(self):
return f"[{self.error_code}] " + _("SSL connection failed for {url}: {details}").format(
url=self.url, details=self.details
)


class RemoteConnectionError(PulpException):
"""
Raised when a connection to a remote server fails after retries are exhausted.
"""

error_code = "PLP0027"

def __init__(self, url, details):
super().__init__()
self.url = url
self.details = details

def __str__(self):
return f"[{self.error_code}] " + _("Connection failed for {url}: {details}").format(
url=self.url, details=self.details
)
10 changes: 4 additions & 6 deletions pulpcore/exceptions/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,12 @@ class UnsupportedDigestValidationError(ValidationError):

error_code = "PLP0020"

def __init__(self, digest_name=None):
self.digest_name = digest_name
def __init__(self, message=None):
self.message = message

def __str__(self):
if self.digest_name:
return f"[{self.error_code}] " + _(
"Checksum type '{digest}' is not supported or enabled."
).format(digest=self.digest_name)
if self.message:
return f"[{self.error_code}] {self.message}"
return f"[{self.error_code}] " + _("Unsupported checksum type.")


Expand Down
6 changes: 6 additions & 0 deletions pulpcore/plugin/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from pulpcore.exceptions import (
DigestValidationError,
ExternalServiceError,
HttpResponseError,
InvalidSignatureError,
MissingDigestValidationError,
PublishError,
PulpException,
RemoteConnectionError,
ReplicateError,
SizeValidationError,
SslConnectionError,
SyncError,
SystemStateError,
TimeoutException,
Expand All @@ -28,4 +31,7 @@
"ExternalServiceError",
"SystemStateError",
"ReplicateError",
"HttpResponseError",
"SslConnectionError",
"RemoteConnectionError",
]
Loading
Loading