Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .guix/modules/python-blosc2-package.scm
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@
(when tests?
(invoke "env" "PYTHONPATH=." "pytest")))))))
(inputs (list c-blosc2))
(propagated-inputs (list python-msgpack python-ndindex python-numpy
python-py-cpuinfo))
(propagated-inputs (list python-msgpack python-ndindex python-numpy))
(native-inputs (list cmake-minimal pkg-config python-cython-3
python-pytest python-scikit-build))
(home-page "https://github.com/blosc/python-blosc2")
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ dependencies = [
"numpy>=1.26",
"ndindex",
"msgpack",
"platformdirs",
"numexpr>=2.14.1; platform_machine != 'wasm32'",
"py-cpuinfo; platform_machine != 'wasm32'",
"requests",
]
version = "4.0.0-b2.dev0"
Expand Down
219 changes: 145 additions & 74 deletions src/blosc2/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# Avoid checking the name of type annotations at run time
from __future__ import annotations

import contextlib
import copy
import ctypes
import ctypes.util
Expand All @@ -22,31 +21,21 @@
import sys
from dataclasses import asdict
from functools import lru_cache
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, ClassVar

import numpy as np
import platformdirs
import requests

import blosc2
from blosc2 import blosc2_ext

if not blosc2.IS_WASM:
import cpuinfo

if TYPE_CHECKING:
from collections.abc import Callable

import tensorflow
import torch


_USER_CACHE_DIR: pathlib.Path = platformdirs.user_cache_path(
appname="python-blosc2",
appauthor="blosc",
)


def _check_typesize(typesize):
if not 1 <= typesize <= blosc2_ext.MAX_TYPESIZE:
raise ValueError(f"typesize can only be in the 1-{blosc2_ext.MAX_TYPESIZE} range.")
Expand Down Expand Up @@ -1151,11 +1140,12 @@ def print_versions():
print("-=" * 38)


def apple_silicon_cache_size(cache_level: int) -> int:
def apple_silicon_cache_size(cache_level: int) -> int | None:
"""Get the data cache_level size in bytes for Apple Silicon in MacOS.

Apple Silicon has two clusters, Performance (0) and Efficiency (1).
This function returns the data cache size for the Performance cluster.
Returns None if the cache size cannot be determined.
"""
libc = ctypes.CDLL(ctypes.util.find_library("c"))
size = ctypes.c_size_t()
Expand All @@ -1166,7 +1156,87 @@ def apple_silicon_cache_size(cache_level: int) -> int:
hwcachesize = f"hw.perflevel0.l{cache_level}cachesize"
hwcachesize = hwcachesize.encode("ascii")
libc.sysctlbyname(hwcachesize, ctypes.byref(size), ctypes.byref(ctypes.c_size_t(8)), None, 0)
return size.value
return size.value if size.value > 0 else None


def windows_cache_size(cache_level: int) -> int | None:
"""Get the data cache size in bytes for Windows.

Semantics:
- L1: data cache only
- L2/L3: unified cache (data + instruction), as no split exists

Returns None if the cache size cannot be determined.
"""
from ctypes import wintypes

if cache_level not in (1, 2, 3):
return None

# Windows constants
RelationCache = 2

# PROCESSOR_CACHE_TYPE enum values
CacheUnified = 0
CacheData = 2

# Header structure to read Relationship and Size first
class PROCESSOR_INFO_HEADER(ctypes.Structure):
_fields_: ClassVar[list] = [
("Relationship", ctypes.c_int),
("Size", ctypes.c_uint),
]

# Only the fields we need from CACHE_RELATIONSHIP (first 12 bytes)
class CACHE_RELATIONSHIP(ctypes.Structure):
_fields_: ClassVar[list] = [
("Level", ctypes.c_ubyte),
("Associativity", ctypes.c_ubyte),
("LineSize", ctypes.c_ushort),
("CacheSize", ctypes.c_uint),
("Type", ctypes.c_uint),
]

kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)

size = wintypes.DWORD(0)

# Query buffer size
kernel32.GetLogicalProcessorInformationEx(
RelationCache,
None,
ctypes.byref(size),
)

buffer = ctypes.create_string_buffer(size.value)

# Retrieve cache info
kernel32.GetLogicalProcessorInformationEx(
RelationCache,
buffer,
ctypes.byref(size),
)

offset = 0
header_size = ctypes.sizeof(PROCESSOR_INFO_HEADER)

while offset < size.value:
# Read header to get Size for advancing offset
header = PROCESSOR_INFO_HEADER.from_buffer_copy(buffer[offset : offset + header_size])

if header.Relationship == RelationCache:
# Read cache info starting after the header
cache = CACHE_RELATIONSHIP.from_buffer_copy(buffer[offset + header_size :])

if cache.Level == cache_level and (
(cache_level == 1 and cache.Type == CacheData)
or (cache_level > 1 and cache.Type == CacheUnified)
):
return cache.CacheSize

offset += header.Size

return None


def get_cache_info(cache_level: int) -> tuple:
Expand Down Expand Up @@ -1197,19 +1267,21 @@ def get_cache_info(cache_level: int) -> tuple:
raise ValueError(f"L{cache_level} cache not found in lscpu output")


def linux_cache_size(cache_level: int, default_size: int) -> int:
"""Get the data cache_level size in bytes for Linux."""
cache_size = default_size
def linux_cache_size(cache_level: int) -> int | None:
"""Get the data cache_level size in bytes for Linux.

Returns None if the cache size cannot be determined.
"""
try:
# Try to read the cache size from sysfs
with open(f"/sys/devices/system/cpu/cpu0/cache/index{cache_level}/size") as f:
size = f.read()
if size.endswith("K\n"):
cache_size = int(size[:-2]) * 2**10
return int(size[:-2]) * 2**10
elif size.endswith("M\n"):
cache_size = int(size[:-2]) * 2**20
return int(size[:-2]) * 2**20
elif size.endswith("G\n"):
cache_size = int(size[:-2]) * 2**30
return int(size[:-2]) * 2**30
except FileNotFoundError:
# Try with lscpu, if available.
try:
Expand All @@ -1219,70 +1291,69 @@ def linux_cache_size(cache_level: int, default_size: int) -> int:
# In general, dividing the cache size by the number of instances would bring
# best performance for private caches (L1 and L2). For shared caches (L3),
# this should be the case as well, but more experimentation is needed.
cache_size //= cache_instances
return cache_size
return cache_size // cache_instances
except (FileNotFoundError, ValueError):
# If lscpu is not available or the cache size cannot be read from sysfs,
# return the default size.
pass
return cache_size
return None


def _get_cpu_info():
if blosc2.IS_WASM:
# Emscripten/wasm32 does not have access to CPU information.
# Populate it with some reasonable defaults.
return {
"brand": "Emscripten",
"arch": "wasm32",
"count": 1,
"l1_data_cache_size": 32 * 1024,
"l2_cache_size": 256 * 1024,
"l3_cache_size": 1024 * 1024,
}
cpu_info = cpuinfo.get_cpu_info()
# cpuinfo does not correctly retrieve the cache sizes for Apple Silicon, so do it manually
if platform.system() == "Darwin":
cpu_info["l1_data_cache_size"] = apple_silicon_cache_size(1)
cpu_info["l2_cache_size"] = apple_silicon_cache_size(2)
cpu_info["l3_cache_size"] = apple_silicon_cache_size(3)
# cpuinfo does not correctly retrieve the cache sizes for all CPUs on Linux, so ask the kernel
if platform.system() == "Linux":
l1_data_cache_size = cpu_info.get("l1_data_cache_size", 32 * 1024)
# Cache level 0 is typically the L1 data cache, and level 1 is the L1 instruction cache
cpu_info["l1_data_cache_size"] = linux_cache_size(0, l1_data_cache_size)
l2_cache_size = cpu_info.get("l2_cache_size", 256 * 1024)
cpu_info["l2_cache_size"] = linux_cache_size(2, l2_cache_size)
l3_cache_size = cpu_info.get("l3_cache_size", 1024 * 1024)
cpu_info["l3_cache_size"] = linux_cache_size(3, l3_cache_size)
return cpu_info

def _available_cpus() -> int:
try:
# On Linux, this returns the number of CPUs available to the process,
# which may be less than os.cpu_count() due to CPU affinity settings.
return len(os.sched_getaffinity(0))
except AttributeError:
# os.sched_getaffinity is not available on all platforms
return os.cpu_count() or 1

def write_cached_cpu_info(cpu_info_dict: dict[str, Any]) -> None:
_USER_CACHE_DIR.mkdir(parents=True, exist_ok=True)
with (_USER_CACHE_DIR / "cpuinfo.json").open("w") as f:
json.dump(cpu_info_dict, f, indent=4)

def _update_cache_sizes(
cpu_info: dict, cache_size_func: Callable[[int], int | None], levels: tuple[int, int, int]
) -> None:
"""Update cpu_info with cache sizes from the given function.

def read_cached_cpu_info() -> dict[str, Any]:
try:
with (_USER_CACHE_DIR / "cpuinfo.json").open() as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
Args:
cpu_info: Dictionary to update with cache sizes.
cache_size_func: Function that takes a cache level and returns size or None.
levels: Tuple of (l1_level, l2_level, l3_level) to pass to cache_size_func.
"""
l1_level, l2_level, l3_level = levels
if (l1_data_cache_size := cache_size_func(l1_level)) is not None:
cpu_info["l1_data_cache_size"] = l1_data_cache_size
if (l2_cache_size := cache_size_func(l2_level)) is not None:
cpu_info["l2_cache_size"] = l2_cache_size
if (l3_cache_size := cache_size_func(l3_level)) is not None:
cpu_info["l3_cache_size"] = l3_cache_size


@lru_cache(maxsize=1)
def get_cpu_info() -> dict:
cached_info = read_cached_cpu_info()
if cached_info:
return cached_info

cpu_info_dict = _get_cpu_info()
with contextlib.suppress(OSError):
# In case cpu info cannot be stored, will need to be recomputed in the next process
write_cached_cpu_info(cpu_info_dict)
return cpu_info_dict
def get_cpu_info():
"""
Construct the result of cpuinfo.get_cpu_info(), without actually using
cpuinfo.get_cpu_info() since that function takes 1s to run and this method is ran
at import time.
"""
cpu_info = {
"count": _available_cpus(),
"l1_data_cache_size": 32 * 1024,
"l2_cache_size": 256 * 1024,
"l3_cache_size": 1024 * 1024,
}

if blosc2.IS_WASM:
# Emscripten/wasm32 does not have access to CPU information.
# Return defaults.
return cpu_info

if platform.system() == "Darwin":
_update_cache_sizes(cpu_info, apple_silicon_cache_size, (1, 2, 3))
elif platform.system() == "Linux":
# Cache level 0 is typically the L1 data cache, and level 1 is the L1 instruction cache
_update_cache_sizes(cpu_info, linux_cache_size, (0, 2, 3))
elif platform.system() == "Windows":
_update_cache_sizes(cpu_info, windows_cache_size, (1, 2, 3))

return cpu_info


def get_blocksize() -> int:
Expand Down
Loading