diff --git a/cuda_core/cuda/core/_context.pxd b/cuda_core/cuda/core/_context.pxd index 92fa5700a06..b0edf5a0674 100644 --- a/cuda_core/cuda/core/_context.pxd +++ b/cuda_core/cuda/core/_context.pxd @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 diff --git a/cuda_core/cuda/core/_device.pyx b/cuda_core/cuda/core/_device.pyx index 233ce6de791..67255506a2d 100644 --- a/cuda_core/cuda/core/_device.pyx +++ b/cuda_core/cuda/core/_device.pyx @@ -1394,14 +1394,12 @@ class Device: cdef Context ctx = self._context return cyEvent._init(cyEvent, self._device_id, ctx._h_context, options, True) - def allocate(self, size, stream: Stream | GraphBuilder | None = None) -> Buffer: + def allocate(self, size, *, stream: Stream | GraphBuilder) -> Buffer: """Allocate device memory from a specified stream. Allocates device memory of `size` bytes on the specified `stream` using the memory resource currently associated with this Device. - Parameter `stream` is optional, using a default stream by default. - Note ---- Device must be initialized. @@ -1410,9 +1408,10 @@ class Device: ---------- size : int Number of bytes to allocate. - stream : :obj:`~_stream.Stream`, optional - The stream establishing the stream ordering semantic. - Default value of `None` uses default stream. + stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder` + Keyword-only. The stream establishing the stream ordering semantic. + Must be passed explicitly; pass ``self.default_stream`` to use + the default stream. Returns ------- @@ -1421,7 +1420,7 @@ class Device: """ self._check_context_initialized() - return self.memory_resource.allocate(size, stream) + return self.memory_resource.allocate(size, stream=stream) def sync(self): """Synchronize the device. diff --git a/cuda_core/cuda/core/_graphics.pyx b/cuda_core/cuda/core/_graphics.pyx index fc053da8cb8..a377589dfc2 100644 --- a/cuda_core/cuda/core/_graphics.pyx +++ b/cuda_core/cuda/core/_graphics.pyx @@ -12,7 +12,7 @@ from cuda.core._resource_handles cimport ( as_intptr, ) from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle -from cuda.core._stream cimport Stream, Stream_accept, default_stream +from cuda.core._stream cimport Stream, Stream_accept from cuda.core._utils.cuda_utils cimport HANDLE_RETURN __all__ = ['GraphicsResource'] @@ -206,7 +206,7 @@ cdef class GraphicsResource: return None return self._mapped_buffer - def map(self, *, stream: Stream | None = None) -> Buffer: + def map(self, *, stream: Stream) -> Buffer: """Map this graphics resource for CUDA access. After mapping, a CUDA device pointer into the underlying graphics @@ -220,9 +220,10 @@ cdef class GraphicsResource: Parameters ---------- - stream : :class:`~cuda.core.Stream`, optional - The CUDA stream on which to perform the mapping. If ``None``, - the current default stream is used. + stream : :class:`~cuda.core.Stream` + Keyword-only. The CUDA stream on which to perform the mapping. + Must be passed explicitly; pass ``device.default_stream`` to use + the default stream. Returns ------- @@ -248,7 +249,7 @@ cdef class GraphicsResource: if self._get_mapped_buffer() is not None: raise RuntimeError("GraphicsResource is already mapped") - s_obj = default_stream() if stream is None else Stream_accept(stream) + s_obj = Stream_accept(stream) raw = as_cu(self._handle) cy_stream = as_cu(s_obj._h_stream) with nogil: diff --git a/cuda_core/cuda/core/_layout.pyx b/cuda_core/cuda/core/_layout.pyx index 3c9392430b6..796a6243fd4 100644 --- a/cuda_core/cuda/core/_layout.pyx +++ b/cuda_core/cuda/core/_layout.pyx @@ -460,7 +460,7 @@ cdef class _StridedLayout: required_size = layout.required_size_in_bytes() # allocate the memory on the device device.set_current() - mem = device.allocate(required_size) + mem = device.allocate(required_size, stream=device.default_stream) # create a view on the newly allocated device memory b_view = StridedMemoryView.from_buffer(mem, layout, a_view.dtype) return b_view diff --git a/cuda_core/cuda/core/_memory/_buffer.pyx b/cuda_core/cuda/core/_memory/_buffer.pyx index dd12ae005d4..5d3bdbb873c 100644 --- a/cuda_core/cuda/core/_memory/_buffer.pyx +++ b/cuda_core/cuda/core/_memory/_buffer.pyx @@ -24,7 +24,7 @@ from cuda.core._resource_handles cimport ( ) from cuda.core.typing import DevicePointerType -from cuda.core._stream cimport Stream, Stream_accept +from cuda.core._stream cimport Stream, Stream_accept, default_stream from cuda.core._utils.cuda_utils cimport HANDLE_RETURN, _parse_fill_value import sys @@ -49,12 +49,24 @@ cdef void _mr_dealloc_callback( size_t size, const StreamHandle& h_stream, ) noexcept: - """Called by the C++ deleter to deallocate via MemoryResource.deallocate.""" + """Called by the C++ deleter to deallocate via MemoryResource.deallocate. + + This is the C++ teardown path: there is no Python caller frame from + which to obtain a stream. If the device-pointer handle was created + without ``set_deallocation_stream`` being called (e.g. buffers minted + via ``Buffer.from_handle(ptr, size, mr=mr)`` from DLPack import, + third-party adapters, or other foreign sources), ``h_stream`` is + empty here. Stream-ordered MR ``deallocate`` overrides reject + ``stream=None`` (issue #2001), so without a fallback the destructor + would print a warning and leak the allocation. Fall back to the + legacy/per-thread default stream so the free still happens; this is + the unique exception to the "no implicit default-stream fallback" + policy because the teardown has no other source of truth. + """ + cdef Stream stream try: - stream = None - if h_stream: - stream = Stream._from_handle(Stream, h_stream) - mr.deallocate(int(ptr), size, stream) + stream = Stream._from_handle(Stream, h_stream) if h_stream else default_stream() + mr.deallocate(int(ptr), size, stream=stream) except Exception as exc: print(f"Warning: mr.deallocate() failed during Buffer destruction: {exc}", file=sys.stderr) @@ -119,7 +131,11 @@ cdef class Buffer: @staticmethod def _reduce_helper(mr, ipc_descriptor): - return Buffer.from_ipc_descriptor(mr, ipc_descriptor) + # The parent process's stream is not portable across processes, so the + # pickle path cannot thread an explicit stream through. Seed the + # imported buffer's deallocation with the current context's default + # stream; the receiver can override via buffer.close(stream). + return Buffer.from_ipc_descriptor(mr, ipc_descriptor, stream=default_stream()) def __reduce__(self): # Must not serialize the parent's stream! @@ -158,9 +174,20 @@ cdef class Buffer: @classmethod def from_ipc_descriptor( cls, mr: DeviceMemoryResource | PinnedMemoryResource, ipc_descriptor: IPCBufferDescriptor, - stream: Stream = None + *, stream: Stream ) -> Buffer: - """Import a buffer that was exported from another process.""" + """Import a buffer that was exported from another process. + + Parameters + ---------- + mr : :obj:`~_memory.DeviceMemoryResource` | :obj:`~_memory.PinnedMemoryResource` + The IPC-enabled memory resource matching the exporting process. + ipc_descriptor : :obj:`~_memory.IPCBufferDescriptor` + The descriptor exported from another process. + stream : :obj:`~_stream.Stream` + Keyword-only. The stream used for asynchronous deallocation when + the buffer is closed or garbage collected. + """ return _ipc.Buffer_from_ipc_descriptor(cls, mr, ipc_descriptor, stream) @property @@ -215,7 +242,7 @@ cdef class Buffer: if self._memory_resource is None: raise ValueError("a destination buffer must be provided (this " "buffer does not have a memory_resource)") - dst = self._memory_resource.allocate(src_size, s) + dst = self._memory_resource.allocate(src_size, stream=s) cdef size_t dst_size = dst._size if dst_size != src_size: @@ -490,17 +517,17 @@ cdef class MemoryResource: resource's respective property.) """ - def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer: + def allocate(self, size_t size, *, stream: Stream | GraphBuilder) -> Buffer: """Allocate a buffer of the requested size. Parameters ---------- size : int The size of the buffer to allocate, in bytes. - stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`, optional - The stream on which to perform the allocation asynchronously. - If None, it is up to each memory resource implementation to decide - and document the behavior. + stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder` + Keyword-only. The stream on which to perform the allocation + asynchronously. Must be passed explicitly; pass + ``device.default_stream`` to use the default stream. Returns ------- @@ -510,7 +537,7 @@ cdef class MemoryResource: """ raise TypeError("MemoryResource.allocate must be implemented by subclasses.") - def deallocate(self, ptr: DevicePointerType, size_t size, stream: Stream | GraphBuilder | None = None): + def deallocate(self, ptr: DevicePointerType, size_t size, *, stream: Stream | GraphBuilder): """Deallocate a buffer previously allocated by this resource. Parameters @@ -519,10 +546,10 @@ cdef class MemoryResource: The pointer or handle to the buffer to deallocate. size : int The size of the buffer to deallocate, in bytes. - stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`, optional - The stream on which to perform the deallocation asynchronously. - If None, it is up to each memory resource implementation to decide - and document the behavior. + stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder` + Keyword-only. The stream on which to perform the deallocation + asynchronously. Must be passed explicitly; pass + ``device.default_stream`` to use the default stream. """ raise TypeError("MemoryResource.deallocate must be implemented by subclasses.") diff --git a/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx b/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx index 2180276ed87..8fdc324dc59 100644 --- a/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx +++ b/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx @@ -14,7 +14,7 @@ from cuda.core._resource_handles cimport ( as_cu, ) -from cuda.core._stream cimport default_stream, Stream_accept, Stream +from cuda.core._stream cimport Stream_accept, Stream from cuda.core._utils.cuda_utils cimport HANDLE_RETURN from functools import cache @@ -104,19 +104,19 @@ cdef class cyGraphMemoryResource(MemoryResource): def __cinit__(self, int device_id): self._device_id = device_id - def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer: + def allocate(self, size_t size, *, stream: Stream | GraphBuilder) -> Buffer: """ Allocate a buffer of the requested size. See documentation for :obj:`~_memory.MemoryResource`. """ - stream = Stream_accept(stream) if stream is not None else default_stream() - return GMR_allocate(self, size, stream) + cdef Stream s = Stream_accept(stream) + return GMR_allocate(self, size, s) - def deallocate(self, ptr: "DevicePointerType", size_t size, stream: Stream | GraphBuilder | None = None): + def deallocate(self, ptr: "DevicePointerType", size_t size, *, stream: Stream | GraphBuilder): """ Deallocate a buffer of the requested size. See documentation for :obj:`~_memory.MemoryResource`. """ - stream = Stream_accept(stream) if stream is not None else default_stream() - return GMR_deallocate(ptr, size, stream) + cdef Stream s = Stream_accept(stream) + return GMR_deallocate(ptr, size, s) def close(self): """No operation (provided for compatibility).""" diff --git a/cuda_core/cuda/core/_memory/_ipc.pyx b/cuda_core/cuda/core/_memory/_ipc.pyx index 88a1d9c1695..1c7b25c14fb 100644 --- a/cuda_core/cuda/core/_memory/_ipc.pyx +++ b/cuda_core/cuda/core/_memory/_ipc.pyx @@ -7,7 +7,7 @@ cimport cpython from cuda.bindings cimport cydriver from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle from cuda.core._memory._memory_pool cimport _MemPool -from cuda.core._stream cimport Stream +from cuda.core._stream cimport Stream, Stream_accept from cuda.core._resource_handles cimport ( DevicePtrHandle, create_fd_handle, @@ -19,7 +19,6 @@ from cuda.core._resource_handles cimport ( as_py, ) -from cuda.core._stream cimport default_stream from cuda.core._utils.cuda_utils cimport HANDLE_RETURN from cuda.core._utils.cuda_utils import check_multiprocessing_start_method @@ -171,10 +170,7 @@ cdef Buffer Buffer_from_ipc_descriptor( """Import a buffer that was exported from another process.""" if not mr.is_ipc_enabled: raise RuntimeError("Memory resource is not IPC-enabled") - if stream is None: - # Note: match this behavior to _MemPool.allocate() - stream = default_stream() - cdef Stream s = stream + cdef Stream s = Stream_accept(stream) cdef DevicePtrHandle h_ptr = deviceptr_import_ipc( mr._h_pool, ipc_descriptor.payload_ptr(), diff --git a/cuda_core/cuda/core/_memory/_legacy.py b/cuda_core/cuda/core/_memory/_legacy.py index 036b89abdcc..510974364da 100644 --- a/cuda_core/cuda/core/_memory/_legacy.py +++ b/cuda_core/cuda/core/_memory/_legacy.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from cuda.core._memory._buffer import DevicePointerType + from cuda.core._stream import Stream from cuda.core._memory._buffer import Buffer, MemoryResource from cuda.core._utils.cuda_utils import ( @@ -27,25 +28,30 @@ class LegacyPinnedMemoryResource(MemoryResource): # TODO: support creating this MR with flags that are later passed to cuMemHostAlloc? - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size, *, stream: Stream | None = None) -> Buffer: """Allocate a buffer of the requested size. + ``cuMemAllocHost`` is synchronous, so this resource ignores any + supplied stream. The argument is accepted (and validated when + non-``None``) for interface conformance with stream-ordered + memory resources. + Parameters ---------- size : int The size of the buffer to allocate, in bytes. stream : Stream, optional - Currently ignored + Keyword-only. Validated when provided but otherwise unused. Returns ------- Buffer The allocated buffer object, which is accessible on both host and device. """ - if stream is None: - from cuda.core._stream import default_stream + from cuda.core._stream import Stream_accept - stream = default_stream() + if stream is not None: + Stream_accept(stream) if size: err, ptr = driver.cuMemAllocHost(size) raise_if_driver_error(err) @@ -53,7 +59,7 @@ def allocate(self, size, stream=None) -> Buffer: ptr = 0 return Buffer._init(ptr, size, self) - def deallocate(self, ptr: DevicePointerType, size, stream): + def deallocate(self, ptr: DevicePointerType, size, *, stream: Stream | None = None): """Deallocate a buffer previously allocated by this resource. Parameters @@ -62,11 +68,14 @@ def deallocate(self, ptr: DevicePointerType, size, stream): The pointer or handle to the buffer to deallocate. size : int The size of the buffer to deallocate, in bytes. - stream : Stream - The stream on which to perform the deallocation synchronously. + stream : Stream, optional + Keyword-only. If provided, ``stream.sync()`` is called before the + host allocation is freed. ``None`` skips the sync. """ + from cuda.core._stream import Stream_accept + if stream is not None: - stream.sync() + Stream_accept(stream).sync() if size: (err,) = driver.cuMemFreeHost(ptr) @@ -96,11 +105,13 @@ def __init__(self, device_id): self._device_id = Device(device_id).device_id - def allocate(self, size, stream=None) -> Buffer: - if stream is None: - from cuda.core._stream import default_stream + def allocate(self, size, *, stream: Stream | None = None) -> Buffer: + # cuMemAlloc is synchronous; stream is accepted (and validated) + # for interface conformance but not used. + from cuda.core._stream import Stream_accept - stream = default_stream() + if stream is not None: + Stream_accept(stream) if size: err, ptr = driver.cuMemAlloc(size) raise_if_driver_error(err) @@ -108,9 +119,11 @@ def allocate(self, size, stream=None) -> Buffer: ptr = 0 return Buffer._init(ptr, size, self) - def deallocate(self, ptr, size, stream): + def deallocate(self, ptr, size, *, stream: Stream | None = None): + from cuda.core._stream import Stream_accept + if stream is not None: - stream.sync() + Stream_accept(stream).sync() if size: (err,) = driver.cuMemFree(ptr) raise_if_driver_error(err) diff --git a/cuda_core/cuda/core/_memory/_memory_pool.pyx b/cuda_core/cuda/core/_memory/_memory_pool.pyx index 4e0f99d4529..4da5e26ea92 100644 --- a/cuda_core/cuda/core/_memory/_memory_pool.pyx +++ b/cuda_core/cuda/core/_memory/_memory_pool.pyx @@ -11,7 +11,7 @@ from libc.string cimport memset from cuda.bindings cimport cydriver from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle, MemoryResource from cuda.core._memory cimport _ipc -from cuda.core._stream cimport default_stream, Stream_accept, Stream +from cuda.core._stream cimport Stream_accept, Stream from cuda.core._resource_handles cimport ( MemoryPoolHandle, DevicePtrHandle, @@ -122,16 +122,17 @@ cdef class _MemPool(MemoryResource): """ _MP_close(self) - def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer: + def allocate(self, size_t size, *, stream: Stream | GraphBuilder) -> Buffer: """Allocate a buffer of the requested size. Parameters ---------- size : int The size of the buffer to allocate, in bytes. - stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`, optional - The stream on which to perform the allocation asynchronously. - If None, an internal stream is used. + stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder` + Keyword-only. The stream on which to perform the allocation + asynchronously. Must be passed explicitly; pass + ``device.default_stream`` to use the default stream. Returns ------- @@ -141,10 +142,10 @@ cdef class _MemPool(MemoryResource): """ if self.is_mapped: raise TypeError("Cannot allocate from a mapped IPC-enabled memory resource") - stream = Stream_accept(stream) if stream is not None else default_stream() - return _MP_allocate(self, size, stream) + cdef Stream s = Stream_accept(stream) + return _MP_allocate(self, size, s) - def deallocate(self, ptr: "DevicePointerType", size_t size, stream: Stream | GraphBuilder | None = None): + def deallocate(self, ptr: "DevicePointerType", size_t size, *, stream: Stream | GraphBuilder): """Deallocate a buffer previously allocated by this resource. Parameters @@ -153,13 +154,13 @@ cdef class _MemPool(MemoryResource): The pointer or handle to the buffer to deallocate. size : int The size of the buffer to deallocate, in bytes. - stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`, optional - The stream on which to perform the deallocation asynchronously. - If the buffer is deallocated without an explicit stream, the allocation stream - is used. + stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder` + Keyword-only. The stream on which to perform the deallocation + asynchronously. Must be passed explicitly; pass + ``device.default_stream`` to use the default stream. """ - stream = Stream_accept(stream) if stream is not None else default_stream() - _MP_deallocate(self, ptr, size, stream) + cdef Stream s = Stream_accept(stream) + _MP_deallocate(self, ptr, size, s) @property def attributes(self) -> _MemPoolAttributes: diff --git a/cuda_core/cuda/core/_memory/_virtual_memory_resource.py b/cuda_core/cuda/core/_memory/_virtual_memory_resource.py index a60436a4305..78a35e850fb 100644 --- a/cuda_core/cuda/core/_memory/_virtual_memory_resource.py +++ b/cuda_core/cuda/core/_memory/_virtual_memory_resource.py @@ -474,7 +474,7 @@ def _build_access_descriptors(self, prop: driver.CUmemAllocationProp) -> list: return descs - def allocate(self, size: int, stream: Stream | None = None) -> Buffer: + def allocate(self, size: int, *, stream: Stream | None = None) -> Buffer: """ Allocate a buffer of the given size using CUDA virtual memory. @@ -483,7 +483,8 @@ def allocate(self, size: int, stream: Stream | None = None) -> Buffer: size : int The size in bytes of the buffer to allocate. stream : Stream, optional - CUDA stream to associate with the allocation (not currently supported). + Keyword-only. Unused because virtual memory operations are + synchronous. Returns ------- @@ -492,8 +493,6 @@ def allocate(self, size: int, stream: Stream | None = None) -> Buffer: Raises ------ - NotImplementedError - If a stream is provided or if the location type is not device memory. CUDAError If any CUDA driver API call fails during allocation. @@ -505,7 +504,9 @@ def allocate(self, size: int, stream: Stream | None = None) -> Buffer: specified in the resource's configuration. """ if stream is not None: - raise NotImplementedError("Stream is not supported with VirtualMemoryResource") + from cuda.core._stream import Stream_accept + + Stream_accept(stream) config = self.config # ---- Build allocation properties ---- @@ -558,10 +559,24 @@ def allocate(self, size: int, stream: Stream | None = None) -> Buffer: buf = Buffer.from_handle(ptr=ptr, size=aligned_size, mr=self) return buf - def deallocate(self, ptr: int, size: int, stream: Stream | None = None) -> None: # noqa: ARG002 + def deallocate(self, ptr: int, size: int, *, stream: Stream | None = None) -> None: """ Deallocate memory on the device using CUDA VMM APIs. + + Parameters + ---------- + ptr : int + The pointer to the memory to deallocate. + size : int + The size in bytes of the memory to deallocate. + stream : Stream, optional + Keyword-only. Unused because virtual memory operations are + synchronous. """ + if stream is not None: + from cuda.core._stream import Stream_accept + + Stream_accept(stream) result, handle = driver.cuMemRetainAllocationHandle(ptr) raise_if_driver_error(result) (result,) = driver.cuMemUnmap(ptr, size) diff --git a/cuda_core/cuda/core/_module.pyx b/cuda_core/cuda/core/_module.pyx index fee979b6130..4a8601f8573 100644 --- a/cuda_core/cuda/core/_module.pyx +++ b/cuda_core/cuda/core/_module.pyx @@ -11,7 +11,7 @@ from collections import namedtuple from cuda.core._device import Device from cuda.core._launch_config cimport LaunchConfig from cuda.core._launch_config import LaunchConfig -from cuda.core._stream cimport Stream +from cuda.core._stream cimport Stream, Stream_accept from cuda.core._program import ObjectCodeFormatType from cuda.core._resource_handles cimport ( LibraryHandle, @@ -368,7 +368,7 @@ cdef class KernelOccupancy: )) return dynamic_smem_size - def max_potential_cluster_size(self, config: LaunchConfig, stream: Stream | None = None) -> int: + def max_potential_cluster_size(self, config: LaunchConfig, *, stream: Stream) -> int: """Maximum potential cluster size. The maximum potential cluster size for this kernel and given launch configuration. @@ -377,8 +377,10 @@ cdef class KernelOccupancy: ---------- config: :obj:`~_launch_config.LaunchConfig` Kernel launch configuration. Cluster dimensions in the configuration are ignored. - stream: :obj:`~Stream`, optional - The stream on which this kernel is to be launched. + stream: :obj:`~Stream` + Keyword-only. The stream on which this kernel is to be launched. + Must be passed explicitly; pass ``device.default_stream`` to + use the default stream. Returns ------- @@ -386,17 +388,15 @@ cdef class KernelOccupancy: The maximum cluster size that can be launched for this kernel and launch configuration. """ cdef cydriver.CUlaunchConfig drv_cfg = (config)._to_native_launch_config() - cdef Stream s - if stream is not None: - s = stream - drv_cfg.hStream = as_cu(s._h_stream) + cdef Stream s = Stream_accept(stream) + drv_cfg.hStream = as_cu(s._h_stream) cdef int cluster_size cdef cydriver.CUfunction func = as_cu(self._h_kernel) with nogil: HANDLE_RETURN(cydriver.cuOccupancyMaxPotentialClusterSize(&cluster_size, func, &drv_cfg)) return cluster_size - def max_active_clusters(self, config: LaunchConfig, stream: Stream | None = None) -> int: + def max_active_clusters(self, config: LaunchConfig, *, stream: Stream) -> int: """Maximum number of active clusters on the target device. The maximum number of clusters that could concurrently execute on the target device. @@ -405,8 +405,10 @@ cdef class KernelOccupancy: ---------- config: :obj:`~_launch_config.LaunchConfig` Kernel launch configuration. - stream: :obj:`~Stream`, optional - The stream on which this kernel is to be launched. + stream: :obj:`~Stream` + Keyword-only. The stream on which this kernel is to be launched. + Must be passed explicitly; pass ``device.default_stream`` to + use the default stream. Returns ------- @@ -414,10 +416,8 @@ cdef class KernelOccupancy: The maximum number of clusters that could co-exist on the target device. """ cdef cydriver.CUlaunchConfig drv_cfg = (config)._to_native_launch_config() - cdef Stream s - if stream is not None: - s = stream - drv_cfg.hStream = as_cu(s._h_stream) + cdef Stream s = Stream_accept(stream) + drv_cfg.hStream = as_cu(s._h_stream) cdef int num_clusters cdef cydriver.CUfunction func = as_cu(self._h_kernel) with nogil: diff --git a/cuda_core/cuda/core/_stream.pxd b/cuda_core/cuda/core/_stream.pxd index c47ff1ea289..c9ffb4c80a7 100644 --- a/cuda_core/cuda/core/_stream.pxd +++ b/cuda_core/cuda/core/_stream.pxd @@ -22,4 +22,4 @@ cdef class Stream: cpdef Stream default_stream() -cdef Stream Stream_accept(arg, bint allow_stream_protocol=*) +cpdef Stream Stream_accept(arg, bint allow_stream_protocol=*) diff --git a/cuda_core/cuda/core/_stream.pyx b/cuda_core/cuda/core/_stream.pyx index a93f8906969..f487a0a53e5 100644 --- a/cuda_core/cuda/core/_stream.pyx +++ b/cuda_core/cuda/core/_stream.pyx @@ -515,10 +515,17 @@ cdef cydriver.CUstream _handle_from_stream_protocol(obj) except*: return (info[1]) # Helper for API functions that accept either Stream or GraphBuilder. Performs -# needed checks and returns the relevant stream. -cdef Stream Stream_accept(arg, bint allow_stream_protocol=False): +# needed checks and returns the relevant stream. Rejects None so that callers +# cannot rely on an implicit fallback to the default stream; if the default +# stream is wanted, pass `device.default_stream` explicitly. +cpdef Stream Stream_accept(arg, bint allow_stream_protocol=False): from cuda.core.graph._graph_builder import GraphBuilder + if arg is None: + raise TypeError( + "stream is required and must not be None; " + "pass device.default_stream explicitly to use the default stream." + ) if isinstance(arg, Stream): return (arg) elif isinstance(arg, GraphBuilder): diff --git a/cuda_core/cuda/core/graph/_graph_builder.pyx b/cuda_core/cuda/core/graph/_graph_builder.pyx index 526c95e04ad..b745598abab 100644 --- a/cuda_core/cuda/core/graph/_graph_builder.pyx +++ b/cuda_core/cuda/core/graph/_graph_builder.pyx @@ -868,7 +868,7 @@ class Graph: Parameters ---------- stream : :obj:`~_stream.Stream` - The stream in which to launch the graph + The stream in which to launch the graph. """ handle_return(driver.cuGraphLaunch(self._mnff.graph, stream.handle)) diff --git a/cuda_core/docs/source/release/1.0.0-notes.rst b/cuda_core/docs/source/release/1.0.0-notes.rst index 7f0ced8c10b..bacd5c3a4be 100644 --- a/cuda_core/docs/source/release/1.0.0-notes.rst +++ b/cuda_core/docs/source/release/1.0.0-notes.rst @@ -120,6 +120,28 @@ Breaking changes ``CUgraphConditionalHandle`` value. Previously, ``.handle`` had to be extracted explicitly. +- ``stream`` is now a required keyword-only argument on APIs that schedule + work on a stream + (`#2001 `__). + Pass ``device.default_stream`` (or any :class:`Stream`) explicitly to + retain the previous behavior. Affected APIs: + + - :meth:`MemoryResource.allocate` / :meth:`MemoryResource.deallocate` + and overrides on :class:`DeviceMemoryResource`, + :class:`PinnedMemoryResource`, :class:`ManagedMemoryResource`, + and :class:`graph.GraphMemoryResource`. + - :meth:`Device.allocate`. + - :meth:`GraphicsResource.map`. + - :meth:`KernelOccupancy.max_potential_cluster_size` and + :meth:`KernelOccupancy.max_active_clusters`. + - :meth:`Buffer.from_ipc_descriptor` (no longer falls back to the default + stream when ``stream=None`` is passed). + + Synchronous memory resources are exempt: their allocate/deallocate + methods accept an optional ``stream`` (validated when non-``None``) + but do not use it. This applies to :class:`LegacyPinnedMemoryResource` + and :class:`VirtualMemoryResource`. + - Consistent naming of types annotation helpers (`#2016 `__): - :obj:`cuda.core.typing.DevicePointerT` -> :obj:`cuda.core.typing.DevicePointerType` diff --git a/cuda_core/examples/memory_ops.py b/cuda_core/examples/memory_ops.py index 438c40b333d..549686b466a 100644 --- a/cuda_core/examples/memory_ops.py +++ b/cuda_core/examples/memory_ops.py @@ -82,7 +82,7 @@ def main(): device_array = cp.from_dlpack(device_buffer).view(dtype=dtype) # 2. Pinned Memory (CPU memory, GPU accessible) - pinned_buffer = pinned_mr.allocate(total_size, stream=stream) + pinned_buffer = pinned_mr.allocate(total_size) pinned_array = np.from_dlpack(pinned_buffer).view(dtype=dtype) # Initialize data diff --git a/cuda_core/tests/graph/test_device_launch.py b/cuda_core/tests/graph/test_device_launch.py index 0bd4ba12634..cb143a17328 100644 --- a/cuda_core/tests/graph/test_device_launch.py +++ b/cuda_core/tests/graph/test_device_launch.py @@ -95,7 +95,7 @@ def test_device_launch_basic(init_cuda): # Allocate and initialize memory mr = LegacyPinnedMemoryResource() - buf = mr.allocate(4, stream=stream) + buf = mr.allocate(4) arr = np.from_dlpack(buf).view(np.int32) arr[0] = 0 stream.sync() @@ -145,7 +145,7 @@ def test_device_launch_multiple(init_cuda): # Allocate and initialize memory mr = LegacyPinnedMemoryResource() - buf = mr.allocate(4, stream=stream) + buf = mr.allocate(4) arr = np.from_dlpack(buf).view(np.int32) arr[0] = 0 stream.sync() diff --git a/cuda_core/tests/graph/test_graph_memory_resource.py b/cuda_core/tests/graph/test_graph_memory_resource.py index 13e6745d748..7f71fc95852 100644 --- a/cuda_core/tests/graph/test_graph_memory_resource.py +++ b/cuda_core/tests/graph/test_graph_memory_resource.py @@ -166,7 +166,7 @@ def test_graph_alloc_with_output(mempool_device, mode): # buffer allocated within the graph. The auto_free_on_launch option # is required to properly use the output buffer. gb = device.create_graph_builder().begin_building(mode) - out = gmr.allocate(NBYTES, gb) + out = gmr.allocate(NBYTES, stream=gb) out.copy_from(in_, stream=gb) launch(gb, LaunchConfig(grid=1, block=1), add_one, out, NBYTES) options = GraphCompleteOptions(auto_free_on_launch=True) diff --git a/cuda_core/tests/helpers/buffers.py b/cuda_core/tests/helpers/buffers.py index fbd5428c28b..44f84693089 100644 --- a/cuda_core/tests/helpers/buffers.py +++ b/cuda_core/tests/helpers/buffers.py @@ -19,14 +19,16 @@ class DummyUnifiedMemoryResource(MemoryResource): + # cuMemAllocManaged / cuMemFree are synchronous; stream is accepted + # for interface conformance with stream-ordered MRs but ignored. def __init__(self, device): self.device = device - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size, *, stream=None) -> Buffer: ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) return Buffer.from_handle(ptr=ptr, size=size, mr=self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr, size, *, stream=None): handle_return(driver.cuMemFree(ptr)) @property @@ -51,12 +53,14 @@ class TrackingMR(MemoryResource): def __init__(self): self.active = {} - def allocate(self, size, stream=None): + # cuMemAlloc / cuMemFree are synchronous; stream is accepted for + # interface conformance but ignored. + def allocate(self, size, *, stream=None): ptr = handle_return(driver.cuMemAlloc(size)) self.active[int(ptr)] = size return Buffer.from_handle(ptr=ptr, size=size, mr=self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr, size, *, stream=None): handle_return(driver.cuMemFree(ptr)) del self.active[int(ptr)] diff --git a/cuda_core/tests/memory_ipc/test_errors.py b/cuda_core/tests/memory_ipc/test_errors.py index f82164ca37c..d17e63dc90a 100644 --- a/cuda_core/tests/memory_ipc/test_errors.py +++ b/cuda_core/tests/memory_ipc/test_errors.py @@ -70,7 +70,7 @@ def PARENT_ACTION(self, queue): def CHILD_ACTION(self, queue): mr = queue.get(timeout=CHILD_TIMEOUT_SEC) - mr.allocate(NBYTES) + mr.allocate(NBYTES, stream=self.device.default_stream) def ASSERT(self, exc_type, exc_msg): assert exc_type is TypeError @@ -84,12 +84,12 @@ def PARENT_ACTION(self, queue): options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mr2 = DeviceMemoryResource(self.device, options=options) self._extra_mrs.append(mr2) - buffer = mr2.allocate(NBYTES) + buffer = mr2.allocate(NBYTES, stream=self.device.default_stream) queue.put([self.mr, buffer.ipc_descriptor]) # Note: mr does not own this buffer def CHILD_ACTION(self, queue): mr, buffer_desc = queue.get(timeout=CHILD_TIMEOUT_SEC) - Buffer.from_ipc_descriptor(mr, buffer_desc) + Buffer.from_ipc_descriptor(mr, buffer_desc, stream=self.device.default_stream) def ASSERT(self, exc_type, exc_msg): assert exc_type is CUDAError @@ -102,12 +102,12 @@ class TestImportBuffer(ChildErrorHarness): def PARENT_ACTION(self, queue): # Note: if the buffer is not attached to something to prolong its life, # CUDA_ERROR_INVALID_CONTEXT is raised from Buffer.__del__ - self.buffer = self.mr.allocate(NBYTES) + self.buffer = self.mr.allocate(NBYTES, stream=self.device.default_stream) queue.put(self.buffer) def CHILD_ACTION(self, queue): buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) - Buffer.from_ipc_descriptor(self.mr, buffer) + Buffer.from_ipc_descriptor(self.mr, buffer, stream=self.device.default_stream) def ASSERT(self, exc_type, exc_msg): assert exc_type is TypeError @@ -124,7 +124,7 @@ def PARENT_ACTION(self, queue): options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mr2 = DeviceMemoryResource(self.device, options=options) self._extra_mrs.append(mr2) - self.buffer = mr2.allocate(NBYTES) + self.buffer = mr2.allocate(NBYTES, stream=self.device.default_stream) buffer_s = pickle.dumps(self.buffer) queue.put(buffer_s) # Note: mr2 not sent diff --git a/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py b/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py index cab6b44aa3f..f0c4951e8e6 100644 --- a/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py +++ b/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py @@ -34,8 +34,8 @@ def child_main(log, queue): buffer_desc2 = queue.get() # Import the same buffer twice - should return same handle due to cache - buffer1 = Buffer.from_ipc_descriptor(mr, buffer_desc1) - buffer2 = Buffer.from_ipc_descriptor(mr, buffer_desc2) + buffer1 = Buffer.from_ipc_descriptor(mr, buffer_desc1, stream=device.default_stream) + buffer2 = Buffer.from_ipc_descriptor(mr, buffer_desc2, stream=device.default_stream) log(f"buffer1.handle = {buffer1.handle}") log(f"buffer2.handle = {buffer2.handle}") @@ -68,7 +68,7 @@ def test_main(self, ipc_device, ipc_memory_resource): mr = ipc_memory_resource log("allocating buffer") - buffer = mr.allocate(NBYTES) + buffer = mr.allocate(NBYTES, stream=ipc_device.default_stream) # Start the child process. log("starting child") diff --git a/cuda_core/tests/memory_ipc/test_leaks.py b/cuda_core/tests/memory_ipc/test_leaks.py index 8debd71c3f9..e2a7e8d096b 100644 --- a/cuda_core/tests/memory_ipc/test_leaks.py +++ b/cuda_core/tests/memory_ipc/test_leaks.py @@ -84,19 +84,19 @@ def __reduce__(self): @pytest.mark.parametrize( "getobject", [ - lambda mr: mr.allocation_handle, - lambda mr: mr, - lambda mr: mr.allocate(NBYTES), - lambda mr: mr.allocate(NBYTES).ipc_descriptor, + lambda mr, _stream: mr.allocation_handle, + lambda mr, _stream: mr, + lambda mr, stream: mr.allocate(NBYTES, stream=stream), + lambda mr, stream: mr.allocate(NBYTES, stream=stream).ipc_descriptor, ], ids=["alloc_handle", "mr", "buffer", "buffer_desc"], ) @pytest.mark.parametrize("launcher", [exec_success, exec_launch_failure, exec_reduce_failure]) -def test_pass_object(ipc_memory_resource, launcher, getobject): +def test_pass_object(ipc_device, ipc_memory_resource, launcher, getobject): """Check for fd leaks when an object is sent as a subprocess argument.""" mr = ipc_memory_resource with CheckFDLeaks(): - obj = getobject(mr) + obj = getobject(mr, ipc_device.default_stream) try: launcher(obj, number=2) finally: diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index 0996c71d2cc..54159d81130 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -30,7 +30,7 @@ def test_main(self, ipc_device, ipc_memory_resource): process.start() # Allocate and fill memory. - buffer = mr.allocate(NBYTES) + buffer = mr.allocate(NBYTES, stream=device.default_stream) assert not buffer.is_mapped pgen.fill_buffer(buffer, seed=False) @@ -66,10 +66,10 @@ def test_main(self, ipc_device, ipc_memory_resource): q1, q2 = (mp.Queue() for _ in range(2)) # Allocate memory buffers and export them to each child. - buffer1 = mr.allocate(NBYTES) + buffer1 = mr.allocate(NBYTES, stream=device.default_stream) q1.put(buffer1) q2.put(buffer1) - buffer2 = mr.allocate(NBYTES) + buffer2 = mr.allocate(NBYTES, stream=device.default_stream) q1.put(buffer2) q2.put(buffer2) @@ -127,8 +127,8 @@ def test_main(self, ipc_device, ipc_memory_resource): p2.start() # Allocate and share memory. - buffer1 = mr.allocate(NBYTES) - buffer2 = mr.allocate(NBYTES) + buffer1 = mr.allocate(NBYTES, stream=device.default_stream) + buffer2 = mr.allocate(NBYTES, stream=device.default_stream) q1.put(buffer1.ipc_descriptor) q2.put(buffer2.ipc_descriptor) @@ -152,7 +152,7 @@ def child_main(self, device, alloc_handle, seed, queue): device.set_current() mr = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) buffer_descriptor = queue.get(timeout=CHILD_TIMEOUT_SEC) - buffer = Buffer.from_ipc_descriptor(mr, buffer_descriptor) + buffer = Buffer.from_ipc_descriptor(mr, buffer_descriptor, stream=device.default_stream) pgen = PatternGen(device, NBYTES) pgen.fill_buffer(buffer, seed=seed) buffer.close() @@ -177,8 +177,8 @@ def test_main(self, ipc_device, ipc_memory_resource): p2.start() # Allocate and share memory. - buffer1 = mr.allocate(NBYTES) - buffer2 = mr.allocate(NBYTES) + buffer1 = mr.allocate(NBYTES, stream=device.default_stream) + buffer2 = mr.allocate(NBYTES, stream=device.default_stream) q1.put(buffer1) q2.put(buffer2) diff --git a/cuda_core/tests/memory_ipc/test_peer_access.py b/cuda_core/tests/memory_ipc/test_peer_access.py index a3f83986701..de0baef9b8b 100644 --- a/cuda_core/tests/memory_ipc/test_peer_access.py +++ b/cuda_core/tests/memory_ipc/test_peer_access.py @@ -73,7 +73,7 @@ def test_main(self, mempool_device_x2, grant_access_in_parent): assert mr.peer_accessible_by == (0,) else: assert mr.peer_accessible_by == () - buffer = mr.allocate(NBYTES) + buffer = mr.allocate(NBYTES, stream=dev1.default_stream) pgen = PatternGen(dev1, NBYTES) pgen.fill_buffer(buffer, seed=False) diff --git a/cuda_core/tests/memory_ipc/test_send_buffers.py b/cuda_core/tests/memory_ipc/test_send_buffers.py index 6c9f1769142..041a8539da3 100644 --- a/cuda_core/tests/memory_ipc/test_send_buffers.py +++ b/cuda_core/tests/memory_ipc/test_send_buffers.py @@ -28,7 +28,7 @@ def test_main(self, ipc_device, nmrs): try: # Allocate and fill memory. - buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] + buffers = [mr.allocate(NBYTES, stream=device.default_stream) for mr, _ in zip(cycle(mrs), range(NTASKS))] pgen = PatternGen(device, NBYTES) for buffer in buffers: pgen.fill_buffer(buffer, seed=False) @@ -82,7 +82,7 @@ def test_main(self, ipc_device, ipc_memory_resource): # Allocate, fill a buffer. mr = ipc_memory_resource pgen = PatternGen(device, NBYTES) - buffer = mr.allocate(NBYTES) + buffer = mr.allocate(NBYTES, stream=device.default_stream) pgen.fill_buffer(buffer, seed=0) # Set up communication. diff --git a/cuda_core/tests/memory_ipc/test_serialize.py b/cuda_core/tests/memory_ipc/test_serialize.py index 63e6ccf1dfd..78d26387c8a 100644 --- a/cuda_core/tests/memory_ipc/test_serialize.py +++ b/cuda_core/tests/memory_ipc/test_serialize.py @@ -38,10 +38,10 @@ def test_main(self, ipc_device, ipc_memory_resource): mp.reduction.send_handle(parent_conn, alloc_handle.handle, process.pid) # Send a buffer. - buffer1 = mr.allocate(NBYTES) + buffer1 = mr.allocate(NBYTES, stream=device.default_stream) parent_conn.send(buffer1) # directly - buffer2 = mr.allocate(NBYTES) + buffer2 = mr.allocate(NBYTES, stream=device.default_stream) parent_conn.send(buffer2.ipc_descriptor) # by descriptor # Wait for the child process. @@ -68,7 +68,7 @@ def child_main(self, conn): # Receive the buffers. buffer1 = conn.recv() # directly buffer_desc = conn.recv() - buffer2 = Buffer.from_ipc_descriptor(mr, buffer_desc) # by descriptor + buffer2 = Buffer.from_ipc_descriptor(mr, buffer_desc, stream=device.default_stream) # by descriptor # Modify the buffers. pgen = PatternGen(device, NBYTES) @@ -98,7 +98,7 @@ def test_main(self, ipc_device, ipc_memory_resource): assert uuid == mr.uuid # Send a buffer. - buffer = mr.allocate(NBYTES) + buffer = mr.allocate(NBYTES, stream=device.default_stream) pipe[0].put(buffer) # Wait for the child process. @@ -141,7 +141,7 @@ def test_main(self, ipc_device, ipc_memory_resource): device = ipc_device mr = ipc_memory_resource alloc_handle = mr.allocation_handle - buffer = mr.allocate(NBYTES) + buffer = mr.allocate(NBYTES, stream=device.default_stream) buffer_desc = buffer.ipc_descriptor pgen = PatternGen(device, NBYTES) diff --git a/cuda_core/tests/memory_ipc/test_workerpool.py b/cuda_core/tests/memory_ipc/test_workerpool.py index cfaa776ac9e..5d0c7b1a0f5 100644 --- a/cuda_core/tests/memory_ipc/test_workerpool.py +++ b/cuda_core/tests/memory_ipc/test_workerpool.py @@ -35,7 +35,7 @@ def test_main(self, ipc_device, nmrs): mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] try: - buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] + buffers = [mr.allocate(NBYTES, stream=device.default_stream) for mr, _ in zip(cycle(mrs), range(NTASKS))] with mp.Pool(NWORKERS) as pool: pool.map(self.process_buffer, buffers) @@ -77,7 +77,7 @@ def test_main(self, ipc_device, nmrs): mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] try: - buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] + buffers = [mr.allocate(NBYTES, stream=device.default_stream) for mr, _ in zip(cycle(mrs), range(NTASKS))] with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool: pool.starmap( @@ -97,7 +97,7 @@ def process_buffer(self, mr_idx, buffer_desc): mr = self.mrs[mr_idx] device = Device(mr.device_id) device.set_current() - buffer = Buffer.from_ipc_descriptor(mr, buffer_desc) + buffer = Buffer.from_ipc_descriptor(mr, buffer_desc, stream=device.default_stream) pgen = PatternGen(device, NBYTES) pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -127,7 +127,7 @@ def test_main(self, ipc_device, nmrs): mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] try: - buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] + buffers = [mr.allocate(NBYTES, stream=device.default_stream) for mr, _ in zip(cycle(mrs), range(NTASKS))] with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool: pool.starmap(self.process_buffer, [(device, pickle.dumps(buffer)) for buffer in buffers]) diff --git a/cuda_core/tests/test_device.py b/cuda_core/tests/test_device.py index 56a97f5185c..26f5c772ea4 100644 --- a/cuda_core/tests/test_device.py +++ b/cuda_core/tests/test_device.py @@ -63,7 +63,7 @@ def test_device_repr(deinit_cuda): def test_device_alloc(deinit_cuda): device = Device() device.set_current() - buffer = device.allocate(1024) + buffer = device.allocate(1024, stream=device.default_stream) device.sync() assert buffer.handle != 0 assert buffer.size == 1024 @@ -73,7 +73,7 @@ def test_device_alloc(deinit_cuda): def test_device_alloc_zero_bytes(deinit_cuda): device = Device() device.set_current() - buffer = device.allocate(0) + buffer = device.allocate(0, stream=device.default_stream) device.sync() assert buffer.handle >= 0 assert buffer.size == 0 diff --git a/cuda_core/tests/test_graphics.py b/cuda_core/tests/test_graphics.py index 12184815b02..49d4744bfc5 100644 --- a/cuda_core/tests/test_graphics.py +++ b/cuda_core/tests/test_graphics.py @@ -337,14 +337,14 @@ def test_map_with_stream(self): assert buf.size > 0 resource.close() - def test_map_with_default_stream(self): + def test_map_requires_explicit_stream(self): with _gl_context_and_buffer(nbytes=4096) as (gl_buf, _): resource = GraphicsResource.from_gl_buffer(gl_buf, flags="write_discard") - with resource.map() as buf: - assert isinstance(buf, Buffer) - assert buf.size > 0 - assert not resource.is_mapped - resource.close() + try: + with pytest.raises(TypeError, match=r"keyword-only argument"): + resource.map() + finally: + resource.close() # --------------------------------------------------------------------------- diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 3ae6960fc29..b3740ef2f51 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -60,14 +60,16 @@ class DummyDeviceMemoryResource(MemoryResource): + # cuMemAlloc / cuMemFree are synchronous; stream is accepted for + # interface conformance but ignored. def __init__(self, device): self.device = device - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size, *, stream=None) -> Buffer: ptr = handle_return(driver.cuMemAlloc(size)) return Buffer.from_handle(ptr=ptr, size=size, mr=self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr, size, *, stream=None): handle_return(driver.cuMemFree(ptr)) @property @@ -84,16 +86,18 @@ def device_id(self) -> int: class DummyHostMemoryResource(MemoryResource): + # Pure-host ctypes allocation; stream is accepted for interface + # conformance but ignored. def __init__(self): pass - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size, *, stream=None) -> Buffer: # Allocate a ctypes buffer of size `size` ptr = (ctypes.c_byte * size)() self._ptr = ptr return Buffer.from_handle(ptr=ctypes.addressof(ptr), size=size, mr=self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr, size, *, stream=None): del self._ptr @property @@ -110,14 +114,16 @@ def device_id(self) -> int: class DummyPinnedMemoryResource(MemoryResource): + # cuMemAllocHost / cuMemFreeHost are synchronous; stream is accepted + # for interface conformance but ignored. def __init__(self, device): self.device = device - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size, *, stream=None) -> Buffer: ptr = handle_return(driver.cuMemAllocHost(size)) return Buffer.from_handle(ptr=ptr, size=size, mr=self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr, size, *, stream=None): handle_return(driver.cuMemFreeHost(ptr)) @property @@ -381,7 +387,7 @@ def test_buffer_external_device(change_device): dev_id = n - 1 d = Device(dev_id) d.set_current() - buffer_ = d.allocate(size=32) + buffer_ = d.allocate(size=32, stream=d.default_stream) if change_device: # let's switch to a different device if possibe @@ -513,9 +519,9 @@ def test_mr_deallocate_receives_stream(): received = {} class StreamCaptureMR(TrackingMR): - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr, size, *, stream=None): received["stream"] = stream - super().deallocate(ptr, size, stream) + super().deallocate(ptr, size, stream=stream) mr = StreamCaptureMR() buf = mr.allocate(1024) @@ -523,6 +529,56 @@ def deallocate(self, ptr, size, stream=None): assert received["stream"].handle == stream.handle +def test_mr_dealloc_callback_falls_back_to_default_stream(): + """When a Buffer's device-pointer handle has no attached deallocation + stream (e.g. buffers minted via :meth:`Buffer.from_handle` from DLPack + import, IPC import, or third-party adapters), the C++ deleter callback + must fall back to the default stream rather than passing ``stream=None`` + to ``mr.deallocate``. Stream-ordered MRs validate the stream and would + otherwise raise ``TypeError`` from inside the ``noexcept`` callback, + which only logs a warning and silently leaks the allocation. See + `#2001 `__. + """ + import gc + + from cuda.core._stream import Stream_accept, default_stream + + device = Device() + device.set_current() + captured = {} + + class StrictCapturingMR(MemoryResource): + # Models a stream-ordered MR: deallocate validates the stream + # the same way DeviceMemoryResource.deallocate does. + @property + def is_device_accessible(self): + return True + + @property + def is_host_accessible(self): + return False + + @property + def device_id(self): + return device.device_id + + def allocate(self, size, *, stream): + raise NotImplementedError # not used; we use from_handle below + + def deallocate(self, ptr, size, *, stream): + captured["stream"] = Stream_accept(stream) + + mr = StrictCapturingMR() + # Buffer.from_handle binds mr but does not attach a deallocation stream. + # ptr=1 is fine because StrictCapturingMR.deallocate does not free. + buf = Buffer.from_handle(1, 1024, mr=mr) + del buf + gc.collect() + + assert "stream" in captured, "deallocate was not invoked (callback raised and leaked)" + assert captured["stream"].handle == default_stream().handle + + def test_memory_resource_and_owner_disallowed(): with pytest.raises(ValueError, match="cannot be both specified together"): a = (ctypes.c_byte * 20)() @@ -627,7 +683,7 @@ def test_managed_memory_resource_buffer_dlpack_device_type(): device.set_current() skip_if_managed_memory_unsupported(device) mr = create_managed_memory_resource_or_skip(ManagedMemoryResourceOptions(preferred_location=device.device_id)) - buf = mr.allocate(1024) + buf = mr.allocate(1024, stream=device.default_stream) assert mr.is_managed assert buf.is_managed @@ -650,7 +706,7 @@ def test_non_managed_resources_report_not_managed(mr_kind): skip_if_pinned_memory_unsupported(device) mr = create_pinned_memory_resource_or_xfail(xfail_device=device) assert mr.is_managed is False - buf = mr.allocate(1024) + buf = mr.allocate(1024, stream=device.default_stream) assert buf.is_managed is False buf.close() @@ -681,7 +737,7 @@ def test_device_memory_resource_initialization(use_device_object): assert not mr.is_ipc_enabled # Test allocation/deallocation works - buffer = mr.allocate(1024) + buffer = mr.allocate(1024, stream=device.default_stream) assert buffer.size == 1024 assert buffer.device_id == device.device_id buffer.close() @@ -699,7 +755,7 @@ def test_pinned_memory_resource_initialization(init_cuda): # Test allocation/deallocation works try: - buffer = mr.allocate(1024) + buffer = mr.allocate(1024, stream=device.default_stream) except CUDAError as exc: msg = str(exc) if "CUDA_ERROR_OUT_OF_MEMORY" in msg: @@ -727,7 +783,7 @@ def test_managed_memory_resource_initialization(init_cuda): assert mr.is_host_accessible # Test allocation/deallocation works - buffer = mr.allocate(1024) + buffer = mr.allocate(1024, stream=device.default_stream) assert buffer.size == 1024 assert buffer.is_host_accessible # But accessible from host assert buffer.memory_resource == mr @@ -952,15 +1008,15 @@ def test_device_memory_resource_with_options(init_cuda): assert not mr.is_ipc_enabled # Test allocation and deallocation - buffer1 = mr.allocate(1024) + buffer1 = mr.allocate(1024, stream=device.default_stream) assert buffer1.handle != 0 assert buffer1.size == 1024 assert buffer1.memory_resource == mr buffer1.close() # Test multiple allocations - buffer1 = mr.allocate(1024) - buffer2 = mr.allocate(2048) + buffer1 = mr.allocate(1024, stream=device.default_stream) + buffer2 = mr.allocate(2048, stream=device.default_stream) assert buffer1.handle != buffer2.handle assert buffer1.size == 1024 assert buffer2.size == 2048 @@ -974,8 +1030,8 @@ def test_device_memory_resource_with_options(init_cuda): buffer.close(stream) # Test memory copying between buffers from same pool - src_buffer = mr.allocate(64) - dst_buffer = mr.allocate(64) + src_buffer = mr.allocate(64, stream=device.default_stream) + dst_buffer = mr.allocate(64, stream=device.default_stream) stream = device.create_stream() src_buffer.copy_to(dst_buffer, stream=stream) device.sync() @@ -998,15 +1054,15 @@ def test_pinned_memory_resource_with_options(init_cuda): assert not mr.is_ipc_enabled # Test allocation and deallocation - buffer1 = mr.allocate(1024) + buffer1 = mr.allocate(1024, stream=device.default_stream) assert buffer1.handle != 0 assert buffer1.size == 1024 assert buffer1.memory_resource == mr buffer1.close() # Test multiple allocations - buffer1 = mr.allocate(1024) - buffer2 = mr.allocate(2048) + buffer1 = mr.allocate(1024, stream=device.default_stream) + buffer2 = mr.allocate(2048, stream=device.default_stream) assert buffer1.handle != buffer2.handle assert buffer1.size == 1024 assert buffer2.size == 2048 @@ -1020,8 +1076,8 @@ def test_pinned_memory_resource_with_options(init_cuda): buffer.close(stream) # Test memory copying between buffers from same pool - src_buffer = mr.allocate(64) - dst_buffer = mr.allocate(64) + src_buffer = mr.allocate(64, stream=device.default_stream) + dst_buffer = mr.allocate(64, stream=device.default_stream) stream = device.create_stream() src_buffer.copy_to(dst_buffer, stream=stream) device.sync() @@ -1043,15 +1099,15 @@ def test_managed_memory_resource_with_options(init_cuda): assert not mr.is_ipc_enabled # Test allocation and deallocation - buffer1 = mr.allocate(1024) + buffer1 = mr.allocate(1024, stream=device.default_stream) assert buffer1.handle != 0 assert buffer1.size == 1024 assert buffer1.memory_resource == mr buffer1.close() # Test multiple allocations - buffer1 = mr.allocate(1024) - buffer2 = mr.allocate(2048) + buffer1 = mr.allocate(1024, stream=device.default_stream) + buffer2 = mr.allocate(2048, stream=device.default_stream) assert buffer1.handle != buffer2.handle assert buffer1.size == 1024 assert buffer2.size == 2048 @@ -1065,8 +1121,8 @@ def test_managed_memory_resource_with_options(init_cuda): buffer.close(stream) # Test memory copying between buffers from same pool - src_buffer = mr.allocate(64) - dst_buffer = mr.allocate(64) + src_buffer = mr.allocate(64, stream=device.default_stream) + dst_buffer = mr.allocate(64, stream=device.default_stream) stream = device.create_stream() src_buffer.copy_to(dst_buffer, stream=stream) device.sync() @@ -1228,7 +1284,7 @@ def test_mempool_ipc_errors(mempool_device): device = mempool_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=False) mr = DeviceMemoryResource(device, options=options) - buffer = mr.allocate(64) + buffer = mr.allocate(64, stream=device.default_stream) ipc_error_msg = "Memory resource is not IPC-enabled" with pytest.raises(RuntimeError, match=ipc_error_msg): @@ -1239,7 +1295,7 @@ def test_mempool_ipc_errors(mempool_device): with pytest.raises(RuntimeError, match=ipc_error_msg): handle = IPCBufferDescriptor._init(b"", 0) - Buffer.from_ipc_descriptor(mr, handle) + Buffer.from_ipc_descriptor(mr, handle, stream=device.default_stream) buffer.close() @@ -1271,7 +1327,7 @@ def test_pinned_mempool_ipc_basic(): assert alloc_handle is not None # Test buffer allocation - buffer = mr.allocate(1024) + buffer = mr.allocate(1024, stream=device.default_stream) assert buffer.size == 1024 assert buffer.is_device_accessible assert buffer.is_host_accessible @@ -1299,7 +1355,7 @@ def test_pinned_mempool_ipc_errors(): assert mr.device_id == -1 assert mr.numa_id == -1 # Non-IPC uses OS-managed placement - buffer = mr.allocate(64) + buffer = mr.allocate(64, stream=device.default_stream) ipc_error_msg = "Memory resource is not IPC-enabled" with pytest.raises(RuntimeError, match=ipc_error_msg): @@ -1310,7 +1366,7 @@ def test_pinned_mempool_ipc_errors(): with pytest.raises(RuntimeError, match=ipc_error_msg): handle = IPCBufferDescriptor._init(b"", 0) - Buffer.from_ipc_descriptor(mr, handle) + Buffer.from_ipc_descriptor(mr, handle, stream=device.default_stream) buffer.close() mr.close() @@ -1451,7 +1507,7 @@ def test_mempool_attributes(ipc_enabled, memory_resource_factory, property_name, initial_value = value buffer = None try: - buffer = mr.allocate(1024) + buffer = mr.allocate(1024, stream=device.default_stream) new_value = getattr(mr.attributes, property_name) assert new_value >= initial_value, f"{property_name} should increase or stay same after allocation" finally: @@ -1487,8 +1543,8 @@ def test_mempool_attributes_repr(memory_resource_factory): elif MR is ManagedMemoryResource: mr = create_managed_memory_resource_or_skip(options={}) - buffer1 = mr.allocate(64) - buffer2 = mr.allocate(64) + buffer1 = mr.allocate(64, stream=device.default_stream) + buffer2 = mr.allocate(64, stream=device.default_stream) buffer1.close() assert re.match( r".*Attributes\(release_threshold=\d+, reserved_mem_current=\d+, reserved_mem_high=\d+, " @@ -1598,7 +1654,7 @@ def test_memory_resource_alloc_zero_bytes(init_cuda, memory_resource_factory): assert MR is DeviceMemoryResource mr = MR(device) - buffer = mr.allocate(0) + buffer = mr.allocate(0, stream=device.default_stream) device.sync() assert buffer.handle >= 0 assert buffer.size == 0 diff --git a/cuda_core/tests/test_memory_peer_access.py b/cuda_core/tests/test_memory_peer_access.py index 1a790645864..04324ceec81 100644 --- a/cuda_core/tests/test_memory_peer_access.py +++ b/cuda_core/tests/test_memory_peer_access.py @@ -18,7 +18,7 @@ def test_peer_access_basic(mempool_device_x2): stream_on_dev0 = dev0.create_stream() # Use owned pool to ensure clean initial state (no stale peer access). dmr_on_dev1 = DeviceMemoryResource(dev1, DeviceMemoryResourceOptions()) - buf_on_dev1 = dmr_on_dev1.allocate(NBYTES) + buf_on_dev1 = dmr_on_dev1.allocate(NBYTES, stream=dev1.default_stream) # No access at first. assert 0 not in dmr_on_dev1.peer_accessible_by @@ -69,7 +69,7 @@ def test_peer_access_transitions(mempool_device_x3): # Use owned pools (with options) to ensure clean initial state. # Default pools are shared and may have stale peer access from prior tests. dmrs = [DeviceMemoryResource(dev, DeviceMemoryResourceOptions()) for dev in devs] - bufs = [dmr.allocate(NBYTES) for dmr in dmrs] + bufs = [dmr.allocate(NBYTES, stream=dev.default_stream) for dmr, dev in zip(dmrs, devs)] def verify_state(state, pattern_seed): """ diff --git a/cuda_core/tests/test_module.py b/cuda_core/tests/test_module.py index 58f09564971..5c994b6f5e3 100644 --- a/cuda_core/tests/test_module.py +++ b/cuda_core/tests/test_module.py @@ -488,9 +488,8 @@ def test_occupancy_max_active_clusters(get_saxpy_kernel_cubin, cluster): pytest.skip("Device with compute capability 90 or higher is required for cluster support") launch_config = cuda.core.LaunchConfig(grid=128, block=64, cluster=cluster) query_fn = kernel.occupancy.max_active_clusters - max_active_clusters = query_fn(launch_config) - assert isinstance(max_active_clusters, int) - assert max_active_clusters >= 0 + with pytest.raises(TypeError, match=r"keyword-only argument"): + query_fn(launch_config) max_active_clusters = query_fn(launch_config, stream=dev.default_stream) assert isinstance(max_active_clusters, int) assert max_active_clusters >= 0 @@ -503,9 +502,8 @@ def test_occupancy_max_potential_cluster_size(get_saxpy_kernel_cubin): pytest.skip("Device with compute capability 90 or higher is required for cluster support") launch_config = cuda.core.LaunchConfig(grid=128, block=64) query_fn = kernel.occupancy.max_potential_cluster_size - max_potential_cluster_size = query_fn(launch_config) - assert isinstance(max_potential_cluster_size, int) - assert max_potential_cluster_size >= 0 + with pytest.raises(TypeError, match=r"keyword-only argument"): + query_fn(launch_config) max_potential_cluster_size = query_fn(launch_config, stream=dev.default_stream) assert isinstance(max_potential_cluster_size, int) assert max_potential_cluster_size >= 0 @@ -685,7 +683,7 @@ def get_kernel_only(): result = np.from_dlpack(host_buf).view(np.int32) result[:] = 0 - dev_buf = device.memory_resource.allocate(4) + dev_buf = device.memory_resource.allocate(4, stream=device.default_stream) # Launch kernel config = cuda.core.LaunchConfig(grid=1, block=1) diff --git a/cuda_core/tests/test_object_protocols.py b/cuda_core/tests/test_object_protocols.py index 457debc0903..72f7891a711 100644 --- a/cuda_core/tests/test_object_protocols.py +++ b/cuda_core/tests/test_object_protocols.py @@ -68,7 +68,7 @@ def sample_context(sample_device): @pytest.fixture def sample_buffer(sample_device): """A sample Buffer object.""" - return sample_device.allocate(64) + return sample_device.allocate(64, stream=sample_device.default_stream) @pytest.fixture @@ -197,7 +197,7 @@ def sample_context_alt(sample_device_alt): @pytest.fixture def sample_buffer_alt(sample_device): """An alternate Buffer object.""" - return sample_device.allocate(1024) + return sample_device.allocate(1024, stream=sample_device.default_stream) @pytest.fixture @@ -231,7 +231,7 @@ def sample_ipc_buffer_descriptor(ipc_device): """An IPCBufferDescriptor.""" options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mr = DeviceMemoryResource(ipc_device, options=options) - buf = mr.allocate(64) + buf = mr.allocate(64, stream=ipc_device.default_stream) return buf.ipc_descriptor diff --git a/cuda_core/tests/test_stream.py b/cuda_core/tests/test_stream.py index 6c843bca004..4e5813ee226 100644 --- a/cuda_core/tests/test_stream.py +++ b/cuda_core/tests/test_stream.py @@ -6,7 +6,7 @@ from cuda.core import Device, Stream, StreamOptions from cuda.core._event import Event -from cuda.core._stream import LEGACY_DEFAULT_STREAM, PER_THREAD_DEFAULT_STREAM +from cuda.core._stream import LEGACY_DEFAULT_STREAM, PER_THREAD_DEFAULT_STREAM, Stream_accept from cuda.core._utils.cuda_utils import driver @@ -15,6 +15,13 @@ def test_stream_init_disabled(): Stream() # Reject at front door. +def test_stream_accept_rejects_none(): + """Stream_accept(None) raises TypeError so APIs cannot silently fall back + to the default stream (issue #2001).""" + with pytest.raises(TypeError, match=r"stream is required and must not be None"): + Stream_accept(None) + + def test_stream_init_with_options(init_cuda): stream = Device().create_stream(options=StreamOptions(nonblocking=True, priority=0)) assert stream.is_nonblocking is True diff --git a/cuda_core/tests/test_tensor_map.py b/cuda_core/tests/test_tensor_map.py index 9ca8790d2b8..68b6da0a56f 100644 --- a/cuda_core/tests/test_tensor_map.py +++ b/cuda_core/tests/test_tensor_map.py @@ -112,7 +112,7 @@ def test_cannot_instantiate_directly(self): TensorMapDescriptor() def test_from_tiled_1d(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) # 1024 float32 elements + buf = dev.allocate(1024 * 4, stream=dev.default_stream) # 1024 float32 elements desc = _as_view(buf).as_tensor_map( box_dim=(64,), data_type=TensorMapDataType.FLOAT32, @@ -121,7 +121,7 @@ def test_from_tiled_1d(self, dev, skip_if_no_tma): assert repr(desc) == "TensorMapDescriptor(tiled, rank=1, dtype=FLOAT32, swizzle=NONE)" def test_device_property(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) desc = _as_view(buf).as_tensor_map( box_dim=(64,), data_type=TensorMapDataType.FLOAT32, @@ -129,7 +129,7 @@ def test_device_property(self, dev, skip_if_no_tma): assert desc.device.device_id == dev.device_id def test_from_tiled_2d(self, dev, skip_if_no_tma): - buf = dev.allocate(64 * 64 * 4) # 64x64 float32 + buf = dev.allocate(64 * 64 * 4, stream=dev.default_stream) # 64x64 float32 tensor = _DeviceArray(buf, (64, 64)) desc = _as_view(tensor).as_tensor_map( box_dim=(32, 32), @@ -138,7 +138,7 @@ def test_from_tiled_2d(self, dev, skip_if_no_tma): assert desc is not None def test_strided_memory_view_as_tensor_map(self, dev, skip_if_no_tma): - buf = dev.allocate(64 * 64 * 4) + buf = dev.allocate(64 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (64, 64)) view = StridedMemoryView.from_any_interface(tensor, stream_ptr=-1) desc = view.as_tensor_map( @@ -148,7 +148,7 @@ def test_strided_memory_view_as_tensor_map(self, dev, skip_if_no_tma): assert desc is not None def test_strided_memory_view_as_tensor_map_options(self, dev, skip_if_no_tma): - buf = dev.allocate(64 * 64 * 4) + buf = dev.allocate(64 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (64, 64)) view = StridedMemoryView.from_any_interface(tensor, stream_ptr=-1) desc = view.as_tensor_map( @@ -161,7 +161,7 @@ def test_strided_memory_view_as_tensor_map_options(self, dev, skip_if_no_tma): assert desc is not None def test_strided_memory_view_as_tensor_map_options_dict(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) desc = _as_view(buf).as_tensor_map( options={ "box_dim": (64,), @@ -172,7 +172,7 @@ def test_strided_memory_view_as_tensor_map_options_dict(self, dev, skip_if_no_tm assert desc is not None def test_strided_memory_view_as_tensor_map_rejects_options_with_kwargs(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) with pytest.raises(TypeError, match="Specify either options or the individual tensor map arguments"): _as_view(buf).as_tensor_map( box_dim=(64,), @@ -180,7 +180,7 @@ def test_strided_memory_view_as_tensor_map_rejects_options_with_kwargs(self, dev ) def test_from_tiled_3d(self, dev, skip_if_no_tma): - buf = dev.allocate(16 * 16 * 16 * 4) # 16x16x16 float32 + buf = dev.allocate(16 * 16 * 16 * 4, stream=dev.default_stream) # 16x16x16 float32 tensor = _DeviceArray(buf, (16, 16, 16)) desc = _as_view(tensor).as_tensor_map( box_dim=(8, 8, 8), @@ -192,7 +192,7 @@ def test_from_tiled_5d(self, dev, skip_if_no_tma): # 5D: exercises all 5 c_global_dim / 4 c_global_strides slots shape = (2, 4, 4, 4, 8) n_bytes = 2 * 4 * 4 * 4 * 8 * 4 # float32 - buf = dev.allocate(n_bytes) + buf = dev.allocate(n_bytes, stream=dev.default_stream) tensor = _DeviceArray(buf, shape) desc = _as_view(tensor).as_tensor_map( box_dim=(1, 2, 2, 2, 8), @@ -201,7 +201,7 @@ def test_from_tiled_5d(self, dev, skip_if_no_tma): def test_from_tiled_with_element_strides_buffer(self, dev, skip_if_no_tma): # Use a Buffer input (DLPack path) and explicit element_strides. - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) desc = _as_view(buf).as_tensor_map( box_dim=(64,), element_strides=(2,), @@ -211,7 +211,7 @@ def test_from_tiled_with_element_strides_buffer(self, dev, skip_if_no_tma): def test_from_tiled_with_element_strides_cai(self, dev, skip_if_no_tma): # Use a CAI-style tensor wrapper and explicit element_strides. - buf = dev.allocate(64 * 64 * 4) + buf = dev.allocate(64 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (64, 64)) desc = _as_view(tensor).as_tensor_map( box_dim=(32, 32), @@ -221,7 +221,7 @@ def test_from_tiled_with_element_strides_cai(self, dev, skip_if_no_tma): assert desc is not None def test_from_tiled_with_swizzle(self, dev, skip_if_no_tma): - buf = dev.allocate(64 * 64 * 4) + buf = dev.allocate(64 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (64, 64)) desc = _as_view(tensor).as_tensor_map( box_dim=(32, 32), @@ -231,7 +231,7 @@ def test_from_tiled_with_swizzle(self, dev, skip_if_no_tma): assert desc is not None def test_from_tiled_with_l2_promotion(self, dev, skip_if_no_tma): - buf = dev.allocate(64 * 64 * 4) + buf = dev.allocate(64 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (64, 64)) desc = _as_view(tensor).as_tensor_map( box_dim=(32, 32), @@ -241,7 +241,7 @@ def test_from_tiled_with_l2_promotion(self, dev, skip_if_no_tma): assert desc is not None def test_from_tiled_with_oob_fill(self, dev, skip_if_no_tma): - buf = dev.allocate(64 * 64 * 4) + buf = dev.allocate(64 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (64, 64)) desc = _as_view(tensor).as_tensor_map( box_dim=(32, 32), @@ -255,7 +255,7 @@ class TestTensorMapDescriptorValidation: """Test validation in TensorMapDescriptor factory methods.""" def test_invalid_rank_zero(self, dev, skip_if_no_tma): - buf = dev.allocate(64) + buf = dev.allocate(64, stream=dev.default_stream) tensor = _DeviceArray(buf, ()) # 0-dim tensor with pytest.raises(ValueError, match="rank must be between 1 and 5"): _as_view(tensor).as_tensor_map( @@ -268,7 +268,7 @@ def test_invalid_rank_six(self, dev, skip_if_no_tma): n_elements = 1 for s in shape: n_elements *= s - buf = dev.allocate(n_elements * 4) + buf = dev.allocate(n_elements * 4, stream=dev.default_stream) arr = _DeviceArray(buf, shape) with pytest.raises(ValueError, match="rank must be between 1 and 5"): _as_view(arr).as_tensor_map( @@ -276,7 +276,7 @@ def test_invalid_rank_six(self, dev, skip_if_no_tma): ) def test_box_dim_rank_mismatch(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) with pytest.raises(ValueError, match="box_dim must have 1 elements"): _as_view(buf).as_tensor_map( box_dim=(32, 32), @@ -284,7 +284,7 @@ def test_box_dim_rank_mismatch(self, dev, skip_if_no_tma): ) def test_box_dim_out_of_range(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) with pytest.raises(ValueError, match=r"box_dim\[0\] must be in \[1, 256\]"): _as_view(buf).as_tensor_map( box_dim=(512,), @@ -292,7 +292,7 @@ def test_box_dim_out_of_range(self, dev, skip_if_no_tma): ) def test_element_strides_rank_mismatch(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) with pytest.raises(ValueError, match="element_strides must have 1 elements"): _as_view(buf).as_tensor_map( box_dim=(64,), @@ -301,7 +301,7 @@ def test_element_strides_rank_mismatch(self, dev, skip_if_no_tma): ) def test_invalid_data_type(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) with pytest.raises(TypeError, match="data_type must be"): _as_view(buf).as_tensor_map( box_dim=(64,), @@ -346,18 +346,18 @@ class TestTensorMapReplaceAddress: """Test replace_address functionality.""" def test_replace_address(self, dev, skip_if_no_tma): - buf1 = dev.allocate(1024 * 4) + buf1 = dev.allocate(1024 * 4, stream=dev.default_stream) desc = _as_view(buf1).as_tensor_map( box_dim=(64,), data_type=TensorMapDataType.FLOAT32, ) - buf2 = dev.allocate(1024 * 4) + buf2 = dev.allocate(1024 * 4, stream=dev.default_stream) desc.replace_address(buf2) # No exception means success def test_replace_address_requires_device_accessible(self, dev, skip_if_no_tma): - buf1 = dev.allocate(1024 * 4) + buf1 = dev.allocate(1024 * 4, stream=dev.default_stream) desc = _as_view(buf1).as_tensor_map( box_dim=(64,), data_type=TensorMapDataType.FLOAT32, @@ -375,14 +375,14 @@ def test_replace_address_rejects_tensor_from_other_device(self, dev, skip_if_no_ dev1 = Device(1) dev0.set_current() - buf0 = dev0.allocate(1024 * 4) + buf0 = dev0.allocate(1024 * 4, stream=dev0.default_stream) desc = _as_view(buf0).as_tensor_map( box_dim=(64,), data_type=TensorMapDataType.FLOAT32, ) dev1.set_current() - buf1 = dev1.allocate(1024 * 4) + buf1 = dev1.allocate(1024 * 4, stream=dev1.default_stream) dev0.set_current() with pytest.raises(ValueError, match=r"replace_address expects tensor on device 0, got 1"): @@ -398,13 +398,13 @@ def test_replace_address_accepts_managed_buffer_on_nonzero_device(self, init_cud skip_if_managed_memory_unsupported(dev1) dev1.set_current() - desc = _as_view(dev1.allocate(1024 * 4)).as_tensor_map( + desc = _as_view(dev1.allocate(1024 * 4, stream=dev1.default_stream)).as_tensor_map( box_dim=(64,), data_type=TensorMapDataType.FLOAT32, ) mr = create_managed_memory_resource_or_skip(ManagedMemoryResourceOptions(preferred_location=dev1.device_id)) - managed_buf = mr.allocate(1024 * 4) + managed_buf = mr.allocate(1024 * 4, stream=dev1.default_stream) desc.replace_address(managed_buf) @@ -420,7 +420,7 @@ def test_from_tiled_rejects_tensor_from_other_device(self, init_cuda): dev1 = Device(1) dev1.set_current() - buf1 = dev1.allocate(1024 * 4) + buf1 = dev1.allocate(1024 * 4, stream=dev1.default_stream) dev0.set_current() with pytest.raises( @@ -443,7 +443,7 @@ def test_from_tiled_accepts_managed_buffer_on_nonzero_device(self, init_cuda): dev1.set_current() mr = create_managed_memory_resource_or_skip(ManagedMemoryResourceOptions(preferred_location=dev1.device_id)) - managed_buf = mr.allocate(1024 * 4) + managed_buf = mr.allocate(1024 * 4, stream=dev1.default_stream) desc = _as_view(managed_buf).as_tensor_map( box_dim=(64,), @@ -474,7 +474,7 @@ class TestTensorMapIm2col: def test_from_im2col_3d(self, dev, skip_if_no_tma): # 3D tensor: batch=1, height=32, channels=64 - buf = dev.allocate(1 * 32 * 64 * 4) + buf = dev.allocate(1 * 32 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (1, 32, 64)) desc = TensorMapDescriptor._from_im2col( _as_view(tensor), @@ -487,7 +487,7 @@ def test_from_im2col_3d(self, dev, skip_if_no_tma): assert desc is not None def test_from_im2col_rank_validation(self, dev, skip_if_no_tma): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) with pytest.raises(ValueError, match="Im2col tensor rank must be between 3 and 5"): TensorMapDescriptor._from_im2col( _as_view(buf), @@ -499,7 +499,7 @@ def test_from_im2col_rank_validation(self, dev, skip_if_no_tma): ) def test_from_im2col_corner_rank_mismatch(self, dev, skip_if_no_tma): - buf = dev.allocate(1 * 32 * 64 * 4) + buf = dev.allocate(1 * 32 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (1, 32, 64)) # 3D: n_spatial = 1 with pytest.raises(ValueError, match="pixel_box_lower_corner must have 1 elements"): TensorMapDescriptor._from_im2col( @@ -516,7 +516,7 @@ def test_from_im2col_4d(self, dev, skip_if_no_tma): # Exercises spatial corner reversal with n_spatial=2: # Python [H_lower, W_lower] -> driver [W_lower, H_lower] shape = (1, 8, 8, 64) - buf = dev.allocate(1 * 8 * 8 * 64 * 4) + buf = dev.allocate(1 * 8 * 8 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, shape) desc = TensorMapDescriptor._from_im2col( _as_view(tensor), @@ -532,7 +532,7 @@ def test_from_im2col_5d(self, dev, skip_if_no_tma): # Exercises the full spatial corner reversal: # Python [D, H, W] -> driver [W, H, D] shape = (1, 4, 8, 8, 64) - buf = dev.allocate(1 * 4 * 8 * 8 * 64 * 4) + buf = dev.allocate(1 * 4 * 8 * 8 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, shape) desc = TensorMapDescriptor._from_im2col( _as_view(tensor), @@ -558,7 +558,7 @@ def skip_if_no_im2col_wide(self, dev): # or with driver/GPU combinations that reject im2col-wide descriptor # encoding for otherwise valid inputs. Probe once per test invocation # and skip only for those known unsupported cases. - buf = dev.allocate(1 * 32 * 64 * 4) + buf = dev.allocate(1 * 32 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (1, 32, 64)) try: TensorMapDescriptor._from_im2col_wide( @@ -580,7 +580,7 @@ def skip_if_no_im2col_wide(self, dev): def test_from_im2col_wide_3d(self, dev, skip_if_no_im2col_wide): # 3D tensor: batch=1, width=32, channels=64 - buf = dev.allocate(1 * 32 * 64 * 4) + buf = dev.allocate(1 * 32 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, (1, 32, 64)) desc = TensorMapDescriptor._from_im2col_wide( _as_view(tensor), @@ -596,7 +596,7 @@ def test_from_im2col_wide_4d(self, dev, skip_if_no_im2col_wide): # NHWC layout: N=1, H=8, W=8, C=64 # Wide mode only uses scalar W corners, even with higher rank shape = (1, 8, 8, 64) - buf = dev.allocate(1 * 8 * 8 * 64 * 4) + buf = dev.allocate(1 * 8 * 8 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, shape) desc = TensorMapDescriptor._from_im2col_wide( _as_view(tensor), @@ -611,7 +611,7 @@ def test_from_im2col_wide_5d(self, dev, skip_if_no_im2col_wide): # NDHWC layout: N=1, D=4, H=8, W=8, C=64 # Max rank boundary — verifies all 5 dim/stride slots are filled shape = (1, 4, 8, 8, 64) - buf = dev.allocate(1 * 4 * 8 * 8 * 64 * 4) + buf = dev.allocate(1 * 4 * 8 * 8 * 64 * 4, stream=dev.default_stream) tensor = _DeviceArray(buf, shape) desc = TensorMapDescriptor._from_im2col_wide( _as_view(tensor), @@ -623,7 +623,7 @@ def test_from_im2col_wide_5d(self, dev, skip_if_no_im2col_wide): assert desc is not None def test_from_im2col_wide_rank_validation(self, dev, skip_if_no_im2col_wide): - buf = dev.allocate(1024 * 4) + buf = dev.allocate(1024 * 4, stream=dev.default_stream) with pytest.raises(ValueError, match="Im2col-wide tensor rank must be between 3 and 5"): TensorMapDescriptor._from_im2col_wide( _as_view(buf), diff --git a/cuda_core/tests/test_utils.py b/cuda_core/tests/test_utils.py index 5bcdead92c6..18379cd7a24 100644 --- a/cuda_core/tests/test_utils.py +++ b/cuda_core/tests/test_utils.py @@ -316,7 +316,7 @@ def test_strided_memory_view_dlpack_export_cupy_roundtrip(init_cuda): def test_strided_memory_view_dlpack_export_requires_dtype(init_cuda): - buffer = init_cuda.memory_resource.allocate(16) + buffer = init_cuda.memory_resource.allocate(16, stream=init_cuda.default_stream) view = StridedMemoryView.from_buffer( buffer, shape=(16,), @@ -393,7 +393,7 @@ def test_from_buffer(shape, dtype, stride_order, readonly): layout = _StridedLayout.dense(shape=shape, itemsize=dtype.itemsize, stride_order=stride_order) required_size = layout.required_size_in_bytes() assert required_size == math.prod(shape) * dtype.itemsize - buffer = dev.memory_resource.allocate(required_size) + buffer = dev.memory_resource.allocate(required_size, stream=dev.default_stream) view = StridedMemoryView.from_buffer(buffer, shape=shape, strides=layout.strides, dtype=dtype, is_readonly=readonly) assert view.exporting_obj is buffer assert view._layout == layout @@ -417,7 +417,7 @@ def test_from_buffer_incompatible_dtype_and_itemsize(dtype, itemsize, msg): layout = _StridedLayout.dense((5,), 2) device = Device() device.set_current() - buffer = device.memory_resource.allocate(layout.required_size_in_bytes()) + buffer = device.memory_resource.allocate(layout.required_size_in_bytes(), stream=device.default_stream) with pytest.raises(ValueError, match=msg): StridedMemoryView.from_buffer(buffer, (5,), dtype=dtype, itemsize=itemsize) @@ -427,7 +427,7 @@ def test_from_buffer_sliced(stride_order): layout = _StridedLayout.dense((5, 7), 2, stride_order=stride_order) device = Device() device.set_current() - buffer = device.memory_resource.allocate(layout.required_size_in_bytes()) + buffer = device.memory_resource.allocate(layout.required_size_in_bytes(), stream=device.default_stream) view = StridedMemoryView.from_buffer(buffer, (5, 7), dtype=np.dtype(np.int16)) assert view.shape == (5, 7) assert int(buffer.handle) == view.ptr @@ -445,7 +445,7 @@ def test_from_buffer_too_small(): layout = _StridedLayout.dense((5, 4), 2) d = Device() d.set_current() - buffer = d.memory_resource.allocate(20) + buffer = d.memory_resource.allocate(20, stream=d.default_stream) with pytest.raises(ValueError, match="Expected at least 40 bytes, got 20 bytes."): StridedMemoryView.from_buffer( buffer, @@ -459,7 +459,7 @@ def test_from_buffer_disallowed_negative_offset(): layout = _StridedLayout((5, 4), (-4, 1), 1) d = Device() d.set_current() - buffer = d.memory_resource.allocate(20) + buffer = d.memory_resource.allocate(20, stream=d.default_stream) with pytest.raises(ValueError): StridedMemoryView.from_buffer( buffer, @@ -591,7 +591,7 @@ def test_from_buffer_with_non_power_of_two_itemsize(): layout = _StridedLayout(shape=shape, strides=None, itemsize=dtype.itemsize) required_size = layout.required_size_in_bytes() assert required_size == math.prod(shape) * dtype.itemsize - buffer = dev.memory_resource.allocate(required_size) + buffer = dev.memory_resource.allocate(required_size, stream=dev.default_stream) view = StridedMemoryView.from_buffer(buffer, shape=shape, strides=layout.strides, dtype=dtype, is_readonly=True) assert view.dtype == dtype