diff --git a/CHANGES/7559.feature b/CHANGES/7559.feature new file mode 100644 index 00000000000..5ddc9b47b2e --- /dev/null +++ b/CHANGES/7559.feature @@ -0,0 +1 @@ +Add a configurable ResourceBudget for preventing over-subscription of the disk "properly". Adds a backpressure mechanism + flushing mechanism in order to ensure that batches get fully processed even if minsize hasn't yet been reached. Allows previous performance-reducing mitigations to be removed. diff --git a/docs/admin/reference/settings.md b/docs/admin/reference/settings.md index 20cbf5b03fe..2f57fe5ac07 100644 --- a/docs/admin/reference/settings.md +++ b/docs/admin/reference/settings.md @@ -474,10 +474,38 @@ Defaults to `/var/lib/pulp/tmp/`. ### MAX\_CONCURRENT\_CONTENT -The size of the batch of content processed in one go when syncing content from -a remote. +The maximum number of concurrent downloads during sync. Controls how many HTTP +download tasks can run in parallel within the `ArtifactDownloader` pipeline stage. -Defaults to 25. +Defaults to 200. + +!!! warning "Deprecated" + This setting is deprecated and may be removed in a future release. + Use `SYNC_MAX_IN_FLIGHT_ITEMS` instead, which provides similar + functionality. If `MAX_CONCURRENT_CONTENT` is set by the user and + `SYNC_MAX_IN_FLIGHT_ITEMS` is not, its value will be used as + `SYNC_MAX_IN_FLIGHT_ITEMS` automatically. + +### SYNC\_MAX\_IN\_FLIGHT\_MB + +The maximum total size (in megabytes) of downloaded artifacts that are waiting to be +saved. This limits the temporary disk space consumed by artifacts that have been +downloaded by `ArtifactDownloader` but not yet persisted by `ArtifactSaver`. + +When set, small artifacts will download with high concurrency while large artifacts +will automatically throttle to avoid exhausting disk space. + +Defaults to 5120 (5gb) + +### SYNC\_MAX\_IN\_FLIGHT\_ITEMS + +The maximum number of downloaded artifacts that are waiting to be saved. Like +`SYNC_MAX_IN_FLIGHT_MB`, this limits the buffer between `ArtifactDownloader` and +`ArtifactSaver`, but counts items rather than bytes. + +This is useful as a fallback when artifact sizes are not known ahead of time. + +Defaults to `None` (no limit). ## Redis Settings diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index a4411f50eac..48d334063a0 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -403,7 +403,14 @@ DOMAIN_ENABLED = False -MAX_CONCURRENT_CONTENT = 25 +MAX_CONCURRENT_CONTENT = 200 + +# Resource budget for sync pipeline: limits total in-flight artifact data between +# the ArtifactDownloader and ArtifactSaver stages. When set, these allow higher download +# concurrency for small artifacts while preventing disk exhaustion for large ones. +# None means no limit for that dimension. +SYNC_MAX_IN_FLIGHT_MB = None # Maximum megabytes of in-flight downloaded artifacts +SYNC_MAX_IN_FLIGHT_ITEMS = None SHELL_PLUS_IMPORTS = [ "from pulpcore.app.util import get_domain, get_domain_pk, set_domain, get_url, extract_pk", diff --git a/pulpcore/plugin/stages/__init__.py b/pulpcore/plugin/stages/__init__.py index e526b52d8f8..bba5fc41b51 100644 --- a/pulpcore/plugin/stages/__init__.py +++ b/pulpcore/plugin/stages/__init__.py @@ -4,6 +4,7 @@ from .artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, + ArtifactResourceBudget, ArtifactSaver, GenericDownloader, QueryExistingArtifacts, diff --git a/pulpcore/plugin/stages/api.py b/pulpcore/plugin/stages/api.py index 1d8939fd66f..87694ad850c 100644 --- a/pulpcore/plugin/stages/api.py +++ b/pulpcore/plugin/stages/api.py @@ -77,7 +77,7 @@ async def run(self): break yield content - async def batches(self, minsize=500): + async def batches(self, minsize=500, flush_event=None): """ Asynchronous iterator yielding batches of [DeclarativeContent][] from `self._in_q`. @@ -87,6 +87,9 @@ async def batches(self, minsize=500): Args: minsize (int): The minimum batch size to yield (unless it is the final batch) + flush_event (asyncio.Event): Optional event that, when set, causes the current + batch to be yielded immediately regardless of `minsize`. This is used by + `ArtifactSaver` to flush when the resource budget is under pressure. Yields: A list of [DeclarativeContent][] instances @@ -124,13 +127,20 @@ def add_to_batch(content): get_listener = asyncio.ensure_future(self._in_q.get()) thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) + flush_event_listener = asyncio.ensure_future(flush_event.wait()) if flush_event else None while not shutdown: - done, pending = await asyncio.wait( - [thaw_event_listener, get_listener], return_when=asyncio.FIRST_COMPLETED - ) + waitables = [thaw_event_listener, get_listener] + if flush_event_listener: + waitables.append(flush_event_listener) + done, pending = await asyncio.wait(waitables, return_when=asyncio.FIRST_COMPLETED) if thaw_event_listener in done: thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) no_block = True + if flush_event_listener and flush_event_listener in done: + # Don't re-arm until after we yield a batch, to avoid a spin loop + # when the event stays set but the batch is empty. + flush_event_listener = None + no_block = True if get_listener in done: content = await get_listener add_to_batch(content) @@ -153,8 +163,13 @@ def add_to_batch(content): yield batch batch = [] no_block = False + # Re-arm the flush listener after yielding + if flush_event and flush_event_listener is None: + flush_event_listener = asyncio.ensure_future(flush_event.wait()) thaw_event_listener.cancel() get_listener.cancel() + if flush_event_listener: + flush_event_listener.cancel() async def put(self, item): """ diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index eeff04c2416..06756c1de16 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -23,6 +23,103 @@ log = logging.getLogger(__name__) +class ArtifactResourceBudget: + """Tracks aggregate resource consumption of in-flight artifacts. + + Coordinates between `ArtifactDownloader` (acquires budget) and + `ArtifactSaver` (releases budget) to limit total temporary disk usage + from downloaded-but-not-yet-saved artifacts. + + This allows higher download concurrency for small artifacts while still + protecting against disk exhaustion when syncing large artifacts. + + Args: + max_bytes (int): Maximum total bytes of in-flight downloaded artifacts. + `None` means no byte limit (only item limit applies). + max_items (int): Maximum number of in-flight downloaded artifacts. + `None` means no item limit (only byte limit applies). + """ + + def __init__(self, max_bytes=None, max_items=None): + self.max_bytes = max_bytes + self.max_items = max_items + self._current_bytes = 0 + self._current_items = 0 + self._available = asyncio.Event() + self._available.set() + self._lock = asyncio.Lock() + self.pressure = asyncio.Event() + + @classmethod + def from_settings(cls): + """Create an `ArtifactResourceBudget` from Django settings, or return `None`. + + Reads `SYNC_MAX_IN_FLIGHT_MB` and `SYNC_MAX_IN_FLIGHT_ITEMS` from settings. + Falls back to the deprecated `MAX_CONCURRENT_CONTENT` for `max_items` if the + user set it and `SYNC_MAX_IN_FLIGHT_ITEMS` is not configured. + Returns `None` if no settings are configured. + """ + max_mb = settings.SYNC_MAX_IN_FLIGHT_MB + max_items = settings.SYNC_MAX_IN_FLIGHT_ITEMS + + # Backward compatibility: honor deprecated MAX_CONCURRENT_CONTENT + if max_items is None: + max_items = settings.MAX_CONCURRENT_CONTENT + + if max_mb is None and max_items is None: + return None + return cls( + max_bytes=max_mb * 1024 * 1024 if max_mb is not None else None, + max_items=max_items, + ) + + async def acquire(self, estimated_bytes=0): + """Block until resource budget is available. + + Always allows at least one item through (even if over budget) when nothing + is currently in flight, to prevent deadlock. + + When the budget is exhausted and `acquire` must wait, the `pressure` event + is set to signal downstream stages (e.g. `ArtifactSaver`) to flush their + batches early and free up budget. + + Args: + estimated_bytes (int): Estimated size of the artifact(s) to be downloaded. + """ + while True: + async with self._lock: + # Always allow if nothing is in flight (prevents deadlock) + if self._current_items == 0: + self._current_bytes += estimated_bytes + self._current_items += 1 + return + + bytes_ok = self.max_bytes is None or ( + self._current_bytes + estimated_bytes <= self.max_bytes + ) + items_ok = self.max_items is None or self._current_items < self.max_items + + if bytes_ok and items_ok: + self._current_bytes += estimated_bytes + self._current_items += 1 + return + + self._available.clear() + self.pressure.set() + await self._available.wait() + + def release(self, actual_bytes=0): + """Release resources after an artifact is saved and its temp file deleted. + + Args: + actual_bytes (int): Actual size of the artifact that was saved. + """ + self._current_bytes = max(0, self._current_bytes - actual_bytes) + self._current_items = max(0, self._current_items - 1) + self.pressure.clear() + self._available.set() + + def _check_for_forbidden_checksum_type(artifact): """Check if content doesn't have forbidden checksum type. @@ -220,28 +317,57 @@ class ArtifactDownloader(GenericDownloader): Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `self._out_q` after all of its [pulpcore.plugin.stages.DeclarativeArtifact][] objects have been handled. + + Args: + resource_budget (ArtifactResourceBudget): Optional shared resource budget that + limits total in-flight artifact bytes/items between download and save. + max_concurrent_content (int): The maximum number of content units to handle + simultaneously. Default is from settings.MAX_CONCURRENT_CONTENT. + args: unused positional arguments passed along to + [pulpcore.plugin.stages.GenericDownloader][]. + kwargs: unused keyword arguments passed along to + [pulpcore.plugin.stages.GenericDownloader][]. """ PROGRESS_REPORTING_MESSAGE = "Downloading Artifacts" PROGRESS_REPORTING_CODE = "sync.downloading.artifacts" + def __init__(self, resource_budget=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.resource_budget = resource_budget + async def _handle_content_unit(self, d_content): """Handle one content unit. Returns: The number of downloads """ - downloaders_for_content = [ - d_artifact.download() + d_artifacts_to_download = [ + d_artifact for d_artifact in d_content.d_artifacts if d_artifact.artifact._state.adding and not d_artifact.deferred_download and not d_artifact.artifact.file ] - if downloaders_for_content: - await asyncio.gather(*downloaders_for_content) - await self.put(d_content) - return len(downloaders_for_content) + + budget_acquired = 0 + if d_artifacts_to_download and self.resource_budget: + budget_acquired = sum( + d_artifact.artifact.size or 0 for d_artifact in d_artifacts_to_download + ) + await self.resource_budget.acquire(budget_acquired) + + try: + if d_artifacts_to_download: + await asyncio.gather(*(da.download() for da in d_artifacts_to_download)) + + await self.put(d_content) + except BaseException: + if budget_acquired and self.resource_budget: + self.resource_budget.release(budget_acquired) + raise + + return len(d_artifacts_to_download) class ArtifactSaver(Stage): @@ -259,8 +385,18 @@ class ArtifactSaver(Stage): This stage drains all available items from `self._in_q` and batches everything into one large call to the db for efficiency. + + Args: + resource_budget (ArtifactResourceBudget): Optional shared resource budget. + When provided, budget is released after artifacts are saved and temp files deleted. + args: unused positional arguments passed along to [pulpcore.plugin.stages.Stage][]. + kwargs: unused keyword arguments passed along to [pulpcore.plugin.stages.Stage][]. """ + def __init__(self, resource_budget=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.resource_budget = resource_budget + async def run(self): """ The coroutine for this stage. @@ -268,7 +404,10 @@ async def run(self): Returns: The coroutine for this stage. """ - async for batch in self.batches(minsize=settings.MAX_CONCURRENT_CONTENT): + flush_event = self.resource_budget.pressure if self.resource_budget else None + async for batch in self.batches( + minsize=settings.MAX_CONCURRENT_CONTENT, flush_event=flush_event + ): da_to_save = [] for d_content in batch: for d_artifact in d_content.d_artifacts: @@ -291,6 +430,16 @@ async def run(self): if await aos.path.exists(tmp_file_path): await aos.remove(tmp_file_path) + # Release budget after temp files are cleaned up so the downloader can proceed + if self.resource_budget: + for d_content in batch: + budget_bytes = sum( + d_artifact.artifact.size or 0 + for d_artifact in d_content.d_artifacts + if not d_artifact.deferred_download + ) + self.resource_budget.release(budget_bytes) + for d_content in batch: await self.put(d_content) diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 18022e9ab38..22cd54173ca 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -6,6 +6,7 @@ from pulpcore.plugin.stages.artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, + ArtifactResourceBudget, ArtifactSaver, QueryExistingArtifacts, RemoteArtifactSaver, @@ -129,6 +130,8 @@ def pipeline_stages(self, new_version): list: List of [pulpcore.plugin.stages.Stage][] instances """ + resource_budget = ArtifactResourceBudget.from_settings() + pipeline = [ self.first_stage, QueryExistingArtifacts(), @@ -137,8 +140,8 @@ def pipeline_stages(self, new_version): pipeline.append(ACSArtifactHandler()) pipeline.extend( [ - ArtifactDownloader(), - ArtifactSaver(), + ArtifactDownloader(resource_budget=resource_budget), + ArtifactSaver(resource_budget=resource_budget), QueryExistingContents(), ContentSaver(), RemoteArtifactSaver(), diff --git a/pulpcore/tests/unit/stages/test_resource_budget.py b/pulpcore/tests/unit/stages/test_resource_budget.py new file mode 100644 index 00000000000..f53caf12641 --- /dev/null +++ b/pulpcore/tests/unit/stages/test_resource_budget.py @@ -0,0 +1,216 @@ +import asyncio + +import pytest + +from pulpcore.plugin.stages import ArtifactResourceBudget + + +class TestAcquireRelease: + """Basic acquire/release semantics.""" + + @pytest.mark.asyncio + async def test_acquire_and_release_items(self): + budget = ArtifactResourceBudget(max_items=3) + await budget.acquire(0) + await budget.acquire(0) + await budget.acquire(0) + assert budget._current_items == 3 + budget.release(0) + budget.release(0) + budget.release(0) + assert budget._current_items == 0 + + @pytest.mark.asyncio + async def test_acquire_and_release_bytes(self): + budget = ArtifactResourceBudget(max_bytes=1000) + await budget.acquire(400) + await budget.acquire(400) + assert budget._current_bytes == 800 + budget.release(400) + assert budget._current_bytes == 400 + budget.release(400) + assert budget._current_bytes == 0 + + @pytest.mark.asyncio + async def test_release_does_not_go_negative(self): + budget = ArtifactResourceBudget(max_bytes=100, max_items=2) + budget.release(500) + assert budget._current_bytes == 0 + assert budget._current_items == 0 + + +class TestBlocking: + """Acquire blocks when budget is exhausted.""" + + @pytest.mark.asyncio + async def test_blocks_on_item_limit(self): + budget = ArtifactResourceBudget(max_items=1) + await budget.acquire(0) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(0) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "acquire should block when at item limit" + + budget.release(0) + await asyncio.sleep(0.05) + assert acquired.is_set(), "acquire should unblock after release" + task.cancel() + + @pytest.mark.asyncio + async def test_blocks_on_byte_limit(self): + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(80) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(50) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "acquire should block when bytes would exceed limit" + + budget.release(80) + await asyncio.sleep(0.05) + assert acquired.is_set(), "acquire should unblock after release" + task.cancel() + + @pytest.mark.asyncio + async def test_blocks_on_both_limits(self): + """When both limits are set, both must be satisfied.""" + budget = ArtifactResourceBudget(max_bytes=1000, max_items=1) + await budget.acquire(100) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(100) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set() + + budget.release(100) + await asyncio.sleep(0.05) + assert acquired.is_set() + task.cancel() + + +class TestDeadlockPrevention: + """The _current_items == 0 guard prevents deadlock.""" + + @pytest.mark.asyncio + async def test_allows_oversized_item_when_empty(self): + """A single item exceeding max_bytes is allowed when nothing is in flight.""" + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(500) # Should not block + assert budget._current_bytes == 500 + assert budget._current_items == 1 + + @pytest.mark.asyncio + async def test_allows_item_over_item_limit_when_empty(self): + """Even max_items=0 (if someone set it) doesn't block when nothing is in flight.""" + budget = ArtifactResourceBudget(max_items=0) + # This would deadlock without the guard -- it should return immediately + await budget.acquire(0) + assert budget._current_items == 1 + + @pytest.mark.asyncio + async def test_second_oversized_item_blocks(self): + """After allowing one oversized item through, the next must wait.""" + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(500) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(50) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "second item should block while oversized item is in flight" + + budget.release(500) + await asyncio.sleep(0.05) + assert acquired.is_set() + task.cancel() + + +class TestPressureEvent: + """The pressure event signals downstream stages to flush.""" + + @pytest.mark.asyncio + async def test_pressure_set_when_blocked(self): + budget = ArtifactResourceBudget(max_items=1) + assert not budget.pressure.is_set() + await budget.acquire(0) + + async def try_acquire(): + await budget.acquire(0) + + task = asyncio.ensure_future(try_acquire()) + await asyncio.sleep(0.05) + assert budget.pressure.is_set(), "pressure should be set when acquire blocks" + + budget.release(0) + await asyncio.sleep(0.05) + assert not budget.pressure.is_set(), "pressure should clear after release" + task.cancel() + + @pytest.mark.asyncio + async def test_pressure_not_set_when_budget_available(self): + budget = ArtifactResourceBudget(max_items=5) + await budget.acquire(0) + assert not budget.pressure.is_set() + await budget.acquire(0) + assert not budget.pressure.is_set() + + +class TestNoLimits: + """When max_bytes and max_items are both None, acquire never blocks.""" + + @pytest.mark.asyncio + async def test_unlimited_acquires(self): + budget = ArtifactResourceBudget(max_bytes=None, max_items=None) + for i in range(100): + await budget.acquire(1_000_000) + assert budget._current_items == 100 + assert budget._current_bytes == 100_000_000 + + +class TestConcurrentAcquireRelease: + """Multiple concurrent acquires and releases behave correctly.""" + + @pytest.mark.asyncio + async def test_concurrent_producers_and_consumer(self): + """Simulate multiple downloaders acquiring and a saver releasing.""" + budget = ArtifactResourceBudget(max_bytes=500, max_items=5) + completed = [] + + async def producer(item_id, size): + await budget.acquire(size) + await asyncio.sleep(0.01) # simulate download + completed.append(item_id) + return size + + async def consumer(): + """Release budget periodically, simulating ArtifactSaver.""" + while len(completed) < 10: + await asyncio.sleep(0.02) + if budget._current_items > 0: + budget.release(100) + + consumer_task = asyncio.ensure_future(consumer()) + producer_tasks = [asyncio.ensure_future(producer(i, 100)) for i in range(10)] + + await asyncio.gather(*producer_tasks, consumer_task) + assert len(completed) == 10 diff --git a/pulpcore/tests/unit/stages/test_stages.py b/pulpcore/tests/unit/stages/test_stages.py index e96fbbbfb54..230972eee8a 100644 --- a/pulpcore/tests/unit/stages/test_stages.py +++ b/pulpcore/tests/unit/stages/test_stages.py @@ -154,3 +154,93 @@ async def test_batch_queue_and_min_sizes(): last_stage._connect(queues[1], queues[2]) end_stage._connect(queues[2], None) await asyncio.gather(last_stage(), middle_stage(), first_stage(), end_stage()) + + +@pytest.mark.asyncio +async def test_flush_event_yields_early(stage, in_q): + """A flush_event causes batches() to yield before minsize is reached.""" + flush = asyncio.Event() + c1 = mock.Mock() + in_q.put_nowait(c1) + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # The single item is not enough to reach minsize=100, so the batch won't yield yet. + # Set the flush event to force an early yield. + flush.set() + + batch = await batch_it.__anext__() + assert batch == [c1] + + in_q.put_nowait(None) + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_no_spin_on_empty_batch(stage, in_q): + """When flush_event is set but the batch is empty, batches() must not spin.""" + flush = asyncio.Event() + flush.set() # Set before any items arrive + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # Put an item after a short delay -- if there were a spin loop, + # the test would hang or burn CPU before we get here. + async def put_later(): + await asyncio.sleep(0.05) + in_q.put_nowait(mock.Mock()) + in_q.put_nowait(None) + + asyncio.ensure_future(put_later()) + + batch = await batch_it.__anext__() + assert len(batch) == 1 + + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_rearms_after_yield(stage, in_q): + """After yielding a flush-triggered batch, the flush_event is re-armed.""" + flush = asyncio.Event() + c1 = mock.Mock() + c2 = mock.Mock() + + in_q.put_nowait(c1) + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # First flush-triggered yield + flush.set() + batch = await batch_it.__anext__() + assert batch == [c1] + + # Clear and re-set to trigger another early yield + flush.clear() + in_q.put_nowait(c2) + flush.set() + batch = await batch_it.__anext__() + assert batch == [c2] + + in_q.put_nowait(None) + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_not_needed_when_minsize_met(stage, in_q): + """Batches still yield normally when minsize is met, even without flush.""" + flush = asyncio.Event() # Never set + + for _ in range(5): + in_q.put_nowait(mock.Mock()) + in_q.put_nowait(None) + + batch_it = stage.batches(minsize=3, flush_event=flush) + batch = await batch_it.__anext__() + assert len(batch) >= 3 + + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__()