Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7559.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a configurable ResourceBudget for preventing over-subscription of the disk "properly". Adds a backpressure mechanism + flushing mechanism in order to ensure that batches get fully processed even if minsize hasn't yet been reached. Allows previous performance-reducing mitigations to be removed.
34 changes: 31 additions & 3 deletions docs/admin/reference/settings.md
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is user facing documentation I would prefer dropping specific references to ArtifactDownloader and ArtifactSaver.

Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,38 @@ Defaults to `/var/lib/pulp/tmp/`.

### MAX\_CONCURRENT\_CONTENT

The size of the batch of content processed in one go when syncing content from
a remote.
The maximum number of concurrent downloads during sync. Controls how many HTTP
download tasks can run in parallel within the `ArtifactDownloader` pipeline stage.

Defaults to 25.
Defaults to 200.

!!! warning "Deprecated"
This setting is deprecated and may be removed in a future release.
Use `SYNC_MAX_IN_FLIGHT_ITEMS` instead, which provides similar
functionality. If `MAX_CONCURRENT_CONTENT` is set by the user and
`SYNC_MAX_IN_FLIGHT_ITEMS` is not, its value will be used as
`SYNC_MAX_IN_FLIGHT_ITEMS` automatically.

### SYNC\_MAX\_IN\_FLIGHT\_MB

The maximum total size (in megabytes) of downloaded artifacts that are waiting to be
saved. This limits the temporary disk space consumed by artifacts that have been
downloaded by `ArtifactDownloader` but not yet persisted by `ArtifactSaver`.

When set, small artifacts will download with high concurrency while large artifacts
will automatically throttle to avoid exhausting disk space.

Defaults to 5120 (5gb)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a hard limit? Will this option prevent syncs to succeed with one 6gb file?
(Not that this is a bad thing, just maybe worth mentioning. TBF the available diskspace is already a hard limit...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should always let one file through at a time no matter how large. In theory

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, then we don't need to mention it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's worth adding a note that this does not prevent bigger to be synced.


### SYNC\_MAX\_IN\_FLIGHT\_ITEMS

The maximum number of downloaded artifacts that are waiting to be saved. Like
`SYNC_MAX_IN_FLIGHT_MB`, this limits the buffer between `ArtifactDownloader` and
`ArtifactSaver`, but counts items rather than bytes.

This is useful as a fallback when artifact sizes are not known ahead of time.

Defaults to `None` (no limit).

## Redis Settings

Expand Down
9 changes: 8 additions & 1 deletion pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,14 @@

DOMAIN_ENABLED = False

MAX_CONCURRENT_CONTENT = 25
MAX_CONCURRENT_CONTENT = 200

# Resource budget for sync pipeline: limits total in-flight artifact data between
# the ArtifactDownloader and ArtifactSaver stages. When set, these allow higher download
# concurrency for small artifacts while preventing disk exhaustion for large ones.
# None means no limit for that dimension.
SYNC_MAX_IN_FLIGHT_MB = None # Maximum megabytes of in-flight downloaded artifacts
SYNC_MAX_IN_FLIGHT_ITEMS = None

SHELL_PLUS_IMPORTS = [
"from pulpcore.app.util import get_domain, get_domain_pk, set_domain, get_url, extract_pk",
Expand Down
1 change: 1 addition & 0 deletions pulpcore/plugin/stages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactResourceBudget,
ArtifactSaver,
GenericDownloader,
QueryExistingArtifacts,
Expand Down
23 changes: 19 additions & 4 deletions pulpcore/plugin/stages/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def run(self):
break
yield content

async def batches(self, minsize=500):
async def batches(self, minsize=500, flush_event=None):
"""
Asynchronous iterator yielding batches of [DeclarativeContent][] from `self._in_q`.

Expand All @@ -87,6 +87,9 @@ async def batches(self, minsize=500):

Args:
minsize (int): The minimum batch size to yield (unless it is the final batch)
flush_event (asyncio.Event): Optional event that, when set, causes the current
batch to be yielded immediately regardless of `minsize`. This is used by
`ArtifactSaver` to flush when the resource budget is under pressure.

Yields:
A list of [DeclarativeContent][] instances
Expand Down Expand Up @@ -124,13 +127,20 @@ def add_to_batch(content):

get_listener = asyncio.ensure_future(self._in_q.get())
thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait())
flush_event_listener = asyncio.ensure_future(flush_event.wait()) if flush_event else None
while not shutdown:
done, pending = await asyncio.wait(
[thaw_event_listener, get_listener], return_when=asyncio.FIRST_COMPLETED
)
waitables = [thaw_event_listener, get_listener]
if flush_event_listener:
waitables.append(flush_event_listener)
done, pending = await asyncio.wait(waitables, return_when=asyncio.FIRST_COMPLETED)
if thaw_event_listener in done:
thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait())
no_block = True
if flush_event_listener and flush_event_listener in done:
# Don't re-arm until after we yield a batch, to avoid a spin loop
# when the event stays set but the batch is empty.
flush_event_listener = None
no_block = True
if get_listener in done:
content = await get_listener
add_to_batch(content)
Expand All @@ -153,8 +163,13 @@ def add_to_batch(content):
yield batch
batch = []
no_block = False
# Re-arm the flush listener after yielding
if flush_event and flush_event_listener is None:
flush_event_listener = asyncio.ensure_future(flush_event.wait())
thaw_event_listener.cancel()
get_listener.cancel()
if flush_event_listener:
flush_event_listener.cancel()

async def put(self, item):
"""
Expand Down
163 changes: 156 additions & 7 deletions pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,103 @@
log = logging.getLogger(__name__)


class ArtifactResourceBudget:
"""Tracks aggregate resource consumption of in-flight artifacts.

Coordinates between `ArtifactDownloader` (acquires budget) and
`ArtifactSaver` (releases budget) to limit total temporary disk usage
from downloaded-but-not-yet-saved artifacts.

This allows higher download concurrency for small artifacts while still
protecting against disk exhaustion when syncing large artifacts.

Args:
max_bytes (int): Maximum total bytes of in-flight downloaded artifacts.
`None` means no byte limit (only item limit applies).
max_items (int): Maximum number of in-flight downloaded artifacts.
`None` means no item limit (only byte limit applies).
"""

def __init__(self, max_bytes=None, max_items=None):
self.max_bytes = max_bytes
self.max_items = max_items
self._current_bytes = 0
self._current_items = 0
self._available = asyncio.Event()
self._available.set()
self._lock = asyncio.Lock()
self.pressure = asyncio.Event()

@classmethod
def from_settings(cls):
"""Create an `ArtifactResourceBudget` from Django settings, or return `None`.

Reads `SYNC_MAX_IN_FLIGHT_MB` and `SYNC_MAX_IN_FLIGHT_ITEMS` from settings.
Falls back to the deprecated `MAX_CONCURRENT_CONTENT` for `max_items` if the
user set it and `SYNC_MAX_IN_FLIGHT_ITEMS` is not configured.
Returns `None` if no settings are configured.
"""
max_mb = settings.SYNC_MAX_IN_FLIGHT_MB
max_items = settings.SYNC_MAX_IN_FLIGHT_ITEMS

# Backward compatibility: honor deprecated MAX_CONCURRENT_CONTENT
if max_items is None:
max_items = settings.MAX_CONCURRENT_CONTENT

if max_mb is None and max_items is None:
return None
return cls(
max_bytes=max_mb * 1024 * 1024 if max_mb is not None else None,
max_items=max_items,
)

async def acquire(self, estimated_bytes=0):
"""Block until resource budget is available.

Always allows at least one item through (even if over budget) when nothing
is currently in flight, to prevent deadlock.

When the budget is exhausted and `acquire` must wait, the `pressure` event
is set to signal downstream stages (e.g. `ArtifactSaver`) to flush their
batches early and free up budget.

Args:
estimated_bytes (int): Estimated size of the artifact(s) to be downloaded.
"""
while True:
async with self._lock:
# Always allow if nothing is in flight (prevents deadlock)
if self._current_items == 0:
self._current_bytes += estimated_bytes
self._current_items += 1
return

bytes_ok = self.max_bytes is None or (
self._current_bytes + estimated_bytes <= self.max_bytes
)
items_ok = self.max_items is None or self._current_items < self.max_items

if bytes_ok and items_ok:
self._current_bytes += estimated_bytes
self._current_items += 1
return

self._available.clear()
self.pressure.set()
await self._available.wait()

def release(self, actual_bytes=0):
"""Release resources after an artifact is saved and its temp file deleted.

Args:
actual_bytes (int): Actual size of the artifact that was saved.
"""
self._current_bytes = max(0, self._current_bytes - actual_bytes)
self._current_items = max(0, self._current_items - 1)
self.pressure.clear()
self._available.set()


def _check_for_forbidden_checksum_type(artifact):
"""Check if content doesn't have forbidden checksum type.

Expand Down Expand Up @@ -220,28 +317,57 @@ class ArtifactDownloader(GenericDownloader):

Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `self._out_q` after all of
its [pulpcore.plugin.stages.DeclarativeArtifact][] objects have been handled.

Args:
resource_budget (ArtifactResourceBudget): Optional shared resource budget that
limits total in-flight artifact bytes/items between download and save.
max_concurrent_content (int): The maximum number of content units to handle
simultaneously. Default is from settings.MAX_CONCURRENT_CONTENT.
args: unused positional arguments passed along to
[pulpcore.plugin.stages.GenericDownloader][].
kwargs: unused keyword arguments passed along to
[pulpcore.plugin.stages.GenericDownloader][].
"""

PROGRESS_REPORTING_MESSAGE = "Downloading Artifacts"
PROGRESS_REPORTING_CODE = "sync.downloading.artifacts"

def __init__(self, resource_budget=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.resource_budget = resource_budget

async def _handle_content_unit(self, d_content):
"""Handle one content unit.

Returns:
The number of downloads
"""
downloaders_for_content = [
d_artifact.download()
d_artifacts_to_download = [
d_artifact
for d_artifact in d_content.d_artifacts
if d_artifact.artifact._state.adding
and not d_artifact.deferred_download
and not d_artifact.artifact.file
]
if downloaders_for_content:
await asyncio.gather(*downloaders_for_content)
await self.put(d_content)
return len(downloaders_for_content)

budget_acquired = 0
if d_artifacts_to_download and self.resource_budget:
budget_acquired = sum(
d_artifact.artifact.size or 0 for d_artifact in d_artifacts_to_download
)
await self.resource_budget.acquire(budget_acquired)

try:
if d_artifacts_to_download:
await asyncio.gather(*(da.download() for da in d_artifacts_to_download))

await self.put(d_content)
except BaseException:
if budget_acquired and self.resource_budget:
self.resource_budget.release(budget_acquired)
raise

return len(d_artifacts_to_download)


class ArtifactSaver(Stage):
Expand All @@ -259,16 +385,29 @@ class ArtifactSaver(Stage):

This stage drains all available items from `self._in_q` and batches everything into one large
call to the db for efficiency.

Args:
resource_budget (ArtifactResourceBudget): Optional shared resource budget.
When provided, budget is released after artifacts are saved and temp files deleted.
args: unused positional arguments passed along to [pulpcore.plugin.stages.Stage][].
kwargs: unused keyword arguments passed along to [pulpcore.plugin.stages.Stage][].
"""

def __init__(self, resource_budget=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.resource_budget = resource_budget

async def run(self):
"""
The coroutine for this stage.

Returns:
The coroutine for this stage.
"""
async for batch in self.batches(minsize=settings.MAX_CONCURRENT_CONTENT):
flush_event = self.resource_budget.pressure if self.resource_budget else None
async for batch in self.batches(
minsize=settings.MAX_CONCURRENT_CONTENT, flush_event=flush_event
):
da_to_save = []
for d_content in batch:
for d_artifact in d_content.d_artifacts:
Expand All @@ -291,6 +430,16 @@ async def run(self):
if await aos.path.exists(tmp_file_path):
await aos.remove(tmp_file_path)

# Release budget after temp files are cleaned up so the downloader can proceed
if self.resource_budget:
for d_content in batch:
budget_bytes = sum(
d_artifact.artifact.size or 0
for d_artifact in d_content.d_artifacts
if not d_artifact.deferred_download
)
self.resource_budget.release(budget_bytes)

for d_content in batch:
await self.put(d_content)

Expand Down
7 changes: 5 additions & 2 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pulpcore.plugin.stages.artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactResourceBudget,
ArtifactSaver,
QueryExistingArtifacts,
RemoteArtifactSaver,
Expand Down Expand Up @@ -129,6 +130,8 @@ def pipeline_stages(self, new_version):
list: List of [pulpcore.plugin.stages.Stage][] instances
"""
resource_budget = ArtifactResourceBudget.from_settings()

pipeline = [
self.first_stage,
QueryExistingArtifacts(),
Expand All @@ -137,8 +140,8 @@ def pipeline_stages(self, new_version):
pipeline.append(ACSArtifactHandler())
pipeline.extend(
[
ArtifactDownloader(),
ArtifactSaver(),
ArtifactDownloader(resource_budget=resource_budget),
ArtifactSaver(resource_budget=resource_budget),
QueryExistingContents(),
ContentSaver(),
RemoteArtifactSaver(),
Expand Down
Loading
Loading