From c3d22a0181c2a40e7cb8381c5667f9323b314b07 Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Thu, 4 Jun 2026 13:31:19 -0400 Subject: [PATCH 1/9] Add PlutusData/Transaction roundtrip benchmark + profiler Measures decode/encode/to_json for typed PlutusData and untyped RawPlutusData across synthetic complexity sweeps, to locate the chain-indexing bottleneck. Run: python benchmarks/plutus_bench.py [iters] (set CBOR_C_EXTENSION=1 to compare a fast backend). Co-Authored-By: Claude Opus 4.8 (1M context) --- benchmarks/plutus_bench.py | 177 +++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 benchmarks/plutus_bench.py diff --git a/benchmarks/plutus_bench.py b/benchmarks/plutus_bench.py new file mode 100644 index 00000000..82eeca64 --- /dev/null +++ b/benchmarks/plutus_bench.py @@ -0,0 +1,177 @@ +"""PlutusData / Transaction roundtrip benchmark + profiler. + +Measures the real cost of decode (from_cbor), encode (to_cbor), and to_json for +both typed PlutusData and untyped RawPlutusData, across synthetic complexity +sweeps, plus Transaction fixtures. Designed to locate the indexing bottleneck. +""" + +import cProfile +import io +import pstats +import sys +import time +from dataclasses import dataclass +from typing import Dict, List + +from pycardano.cbor import cbor2 +from pycardano.plutus import PlutusData, RawPlutusData +from pycardano.serialization import IndefiniteList, default_encoder + +try: + from importlib.metadata import version + + BACKEND = f"{cbor2.__name__} {version('cbor2pure') if cbor2.__name__=='cbor2pure' else version('cbor2')}" +except Exception: + BACKEND = cbor2.__name__ + + +def best_us(fn, iters, repeats=5): + for _ in range(min(50, iters)): + fn() + best = float("inf") + for _ in range(repeats): + t0 = time.perf_counter() + for _ in range(iters): + fn() + best = min(best, time.perf_counter() - t0) + return best / iters * 1e6 + + +def make_constr(cid, fields): + return cbor2.CBORTag(121 + cid if cid < 7 else 1280 + (cid - 7), fields) + + +def gen_deep(depth): + node = make_constr(0, [0]) + for _ in range(depth): + node = make_constr(1, [node]) + return node + + +def gen_wide(n): + return make_constr(0, [i if i % 2 else b"\x01\x02\x03\x04" for i in range(n)]) + + +def gen_list(n): + return make_constr(0, [IndefiniteList(list(range(n)))]) + + +def gen_map(n): + return make_constr(0, [{i: bytes([i % 256]) * 8 for i in range(n)}]) + + +def gen_realistic(depth, width): + def build(d): + fields = [d, b"\xde\xad\xbe\xef" * 4, IndefiniteList(list(range(width)))] + if d > 0: + fields.append(build(d - 1)) + fields.append({j: build(0) for j in range(2)}) + return make_constr(d % 6, fields) + + return build(depth) + + +SYNTH = { + "deep(d=40)": gen_deep(40), + "wide(n=200)": gen_wide(200), + "list(n=500)": gen_list(500), + "map(n=200)": gen_map(200), + "realistic(d=6,w=10)": gen_realistic(6, 10), +} + + +def encode_datum(obj): + return cbor2.dumps(obj, default=default_encoder) + + +def run_section(title, items, iters): + print(f"\n=== {title} ===") + print( + f" {'case':24} {'bytes':>7} {'decode us':>11} {'encode us':>11} {'to_json us':>11}" + ) + for name, raw in items: + dec = best_us(lambda r=raw: RawPlutusData.from_cbor(r), iters) + obj = RawPlutusData.from_cbor(raw) + enc = best_us(lambda o=obj: o.to_cbor(), iters) + try: + tj = best_us(lambda o=obj: o.to_json(), iters) + except Exception: + tj = float("nan") + print(f" {name:24} {len(raw):>7} {dec:>11.1f} {enc:>11.1f} {tj:>11.1f}") + + +@dataclass +class Inner(PlutusData): + CONSTR_ID = 0 + a: int + b: bytes + + +@dataclass +class Mid(PlutusData): + CONSTR_ID = 1 + x: int + items: List[Inner] + mapping: Dict[int, Inner] + + +@dataclass +class Outer(PlutusData): + CONSTR_ID = 2 + a: bytes + mid: Mid + leaves: List[Inner] + + +def build_typed(n): + inners = [Inner(a=i, b=bytes([i % 256]) * 8) for i in range(n)] + mid = Mid(x=7, items=inners, mapping={i: inners[i] for i in range(min(n, 20))}) + return Outer(a=b"\xab" * 28, mid=mid, leaves=inners) + + +def run_typed(iters): + print("\n=== Typed PlutusData decode vs untyped on the SAME bytes ===") + print( + f" {'n_inner':>8} {'bytes':>7} {'typed dec us':>13} {'raw dec us':>11} {'typed/raw':>10}" + ) + for n in (10, 50, 200): + obj = build_typed(n) + raw = obj.to_cbor() + td = best_us(lambda r=raw: Outer.from_cbor(r), iters) + rd = best_us(lambda r=raw: RawPlutusData.from_cbor(r), iters) + print(f" {n:>8} {len(raw):>7} {td:>13.1f} {rd:>11.1f} {td/rd:>9.1f}x") + + +def main(): + iters = int(sys.argv[1]) if len(sys.argv) > 1 else 1000 + print( + f"### backend={BACKEND} | python {sys.version.split()[0]} | iters={iters} ###" + ) + + synth_items = [(n, encode_datum(o)) for n, o in SYNTH.items()] + run_section("RawPlutusData (untyped - typical indexer path)", synth_items, iters) + run_typed(iters) + + heaviest = max(synth_items, key=lambda kv: len(kv[1])) + print( + f"\n=== cProfile: RawPlutusData.from_cbor on '{heaviest[0]}' ({len(heaviest[1])}B) ===" + ) + pr = cProfile.Profile() + pr.enable() + for _ in range(2000): + RawPlutusData.from_cbor(heaviest[1]) + pr.disable() + s = io.StringIO() + pstats.Stats(pr, stream=s).sort_stats("tottime").print_stats(16) + for line in s.getvalue().splitlines(): + if ( + "pycardano" in line + or "cbor2" in line + or "function calls" in line + or "{" in line + ): + print(" " + line.strip()[:115]) + + +if __name__ == "__main__": + main() From 09350d1f04f7678a625429e3f79fb4ceeee45a54 Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Thu, 4 Jun 2026 13:48:12 -0400 Subject: [PATCH 2/9] Cache get_type_hints and from_primitive arg introspection per class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Typed PlutusData/dataclass decode recomputed get_type_hints(cls) once per decoded node and getfullargspec(t.from_primitive) once per typed field on every node. Both depend only on the class, not the data, yet dominated typed decode (~422 get_type_hints + ~421 getfullargspec calls per decode of a 200-element datum — together ~70% of decode time). Memoize both in module-level WeakKeyDictionary caches (so dynamically created classes can still be garbage collected). Generic aliases, which are not always weakly referenceable, are computed without caching. Result: ~3.6x faster typed PlutusData decode (200-inner datum 4.94ms -> 1.36ms, cbor2pure), backend-independent. All 568 tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/serialization.py | 40 ++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/pycardano/serialization.py b/pycardano/serialization.py index 141342b9..d83e98d7 100644 --- a/pycardano/serialization.py +++ b/pycardano/serialization.py @@ -14,6 +14,7 @@ from fractions import Fraction from functools import wraps from inspect import getfullargspec, isclass +from weakref import WeakKeyDictionary from typing import ( Any, Callable, @@ -359,7 +360,7 @@ def validate(self): Raises: InvalidDataException: When the data is invalid. """ - type_hints = get_type_hints(self.__class__) + type_hints = _cached_type_hints(self.__class__) def _check_recursive(value, type_hint): if type_hint is Any: @@ -686,6 +687,37 @@ def _restore_dataclass_field( return _restore_typed_primitive(cast(Any, f.type), v) +# Resolving type hints and introspecting from_primitive signatures is expensive and +# is otherwise repeated on every (de)serialization. Both depend only on the class, so +# cache them. WeakKeyDictionary lets dynamically-created classes be garbage collected. +_TYPE_HINTS_CACHE: "WeakKeyDictionary[type, Dict[str, Any]]" = WeakKeyDictionary() +_ACCEPTS_TYPE_ARGS_CACHE: "WeakKeyDictionary[type, bool]" = WeakKeyDictionary() + + +def _cached_type_hints(cls: type) -> Dict[str, Any]: + """Return ``get_type_hints(cls)``, memoized per class.""" + hints = _TYPE_HINTS_CACHE.get(cls) + if hints is None: + hints = get_type_hints(cls) + _TYPE_HINTS_CACHE[cls] = hints + return hints + + +def _accepts_type_args(t: type) -> bool: + """Whether ``t.from_primitive`` declares a ``type_args`` parameter, memoized per class. + + ``t`` may be a typing generic alias (e.g. ``OrderedSet[int]``) which is not always + weakly referenceable, so only concrete classes are cached. + """ + if not isclass(t): + return "type_args" in getfullargspec(t.from_primitive).args + accepts = _ACCEPTS_TYPE_ARGS_CACHE.get(t) + if accepts is None: + accepts = "type_args" in getfullargspec(t.from_primitive).args + _ACCEPTS_TYPE_ARGS_CACHE[t] = accepts + return accepts + + def _restore_typed_primitive( t: typing.Type, v: Primitive ) -> Union[Primitive, CBORSerializable]: @@ -714,7 +746,7 @@ def _restore_typed_primitive( if t is Any or (t in PRIMITIVE_TYPES and isinstance(v, t)): return v elif is_cbor_serializable: - if "type_args" in getfullargspec(t.from_primitive).args: + if _accepts_type_args(t): args = typing.get_args(t) return t.from_primitive(v, type_args=args) else: @@ -869,7 +901,7 @@ def from_primitive( all_fields = [f for f in fields(cls) if f.init] restored_vals = [] - type_hints = get_type_hints(cls) + type_hints = _cached_type_hints(cls) for f, v in zip(all_fields, values): if not isclass(f.type): f.type = type_hints[f.name] @@ -978,7 +1010,7 @@ def from_primitive(cls: Type[MapBase], values: Union[dict, FrozenDict]) -> MapBa all_fields = {f.metadata.get("key", f.name): f for f in fields(cls) if f.init} kwargs = {} - type_hints = get_type_hints(cls) + type_hints = _cached_type_hints(cls) for key in values: if key not in all_fields: raise DeserializeException(f"Unexpected map key {key} in CBOR.") From 20875749d4d3bb8eb3b58158c5523336d690dba9 Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Thu, 4 Jun 2026 14:54:20 -0400 Subject: [PATCH 3/9] Cache PlutusData __post_init__ field-type validation per class PlutusData.__post_init__ ran on every decoded instance, re-checking each field's declared type against the allowed set (a class-invariant check) and recomputing fields() each time. Cache the validated fields tuple per class in a WeakKeyDictionary (safe for dynamically created classes); cached instances run only the per-instance byte-length check. First instance preserves the original interleaved type/length validation exactly, and a class with an invalid field type is never cached so it keeps raising identically. Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/plutus.py | 44 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/pycardano/plutus.py b/pycardano/plutus.py index 98036de6..95c8d781 100644 --- a/pycardano/plutus.py +++ b/pycardano/plutus.py @@ -8,7 +8,8 @@ from dataclasses import dataclass, field, fields from enum import Enum from hashlib import sha256 -from typing import Any, List, Optional, Type, Union +from typing import Any, List, Optional, Tuple, Type, Union +from weakref import WeakKeyDictionary from cbor2 import CBORTag from nacl.encoding import RawEncoder @@ -515,6 +516,19 @@ def id_map(cls, skip_constructor=False): raise TypeError(f"Unexpected type for automatic constructor generation: {cls}") +# Per-class cache of the dataclass ``fields`` tuple for PlutusData subclasses. +# The set of fields and their declared types are class-invariant, so the +# (class-invariant) field-type validity check in ``PlutusData.__post_init__`` +# only needs to run once per class instead of once per instance. We key on the +# class object via a WeakKeyDictionary so dynamically-created classes (e.g. the +# many dataclasses defined inside test functions) do not leak and never collide. +# Presence of a class in this cache means its field types have already been +# validated; the cached value is the ``fields(cls)`` tuple, reused per instance. +_plutusdata_fields_cache: "WeakKeyDictionary[type, Tuple[Any, ...]]" = ( + WeakKeyDictionary() +) + + @dataclass(repr=False) class PlutusData(ArrayCBORSerializable): """ @@ -555,6 +569,30 @@ def CONSTR_ID(cls): return getattr(cls, k) def __post_init__(self): + cls = type(self) + # The field set and their declared types are class-invariant, so the + # field-type validity check is identical for every instance of a class. + # Once a class has been validated we cache its ``fields`` tuple; presence + # in the cache means the field types have already passed validation, so + # subsequent instances only run the per-instance bytes-length check. + cls_fields = _plutusdata_fields_cache.get(cls) + if cls_fields is not None: + # Fast path: class already validated. Only the bytes-length check, + # which depends on instance data, needs to run. + for f in cls_fields: + data = getattr(self, f.name) + if isinstance(data, bytes) and len(data) > 64: + raise InvalidArgumentException( + f"The size of {data} exceeds {self.MAX_BYTES_SIZE} bytes. " + "Use pycardano.serialization.ByteString for long bytes." + ) + return + + # Slow path: first instance of this class. Preserve the original + # behavior exactly, including the interleaved order of the type check + # and the bytes-length check across fields. Only cache the validated + # ``fields`` tuple if every field type passes the (class-invariant) + # validity check. valid_types = ( RawPlutusData, PlutusData, @@ -564,7 +602,8 @@ def __post_init__(self): ByteString, bytes, ) - for f in fields(self): + cls_fields = fields(self) + for f in cls_fields: if inspect.isclass(f.type) and not issubclass(f.type, valid_types): raise TypeError( f"Invalid field type: {f.type}. A field in PlutusData should be one of {valid_types}" @@ -576,6 +615,7 @@ def __post_init__(self): f"The size of {data} exceeds {self.MAX_BYTES_SIZE} bytes. " "Use pycardano.serialization.ByteString for long bytes." ) + _plutusdata_fields_cache[cls] = cls_fields def to_shallow_primitive(self) -> CBORTag: primitives: Primitive = super().to_shallow_primitive() From 5af8993036f0683a586bbf61ef333649ab295e3e Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Thu, 4 Jun 2026 14:54:20 -0400 Subject: [PATCH 4/9] Add per-class decode plans and an un-annotated encode worker Decode: _restore_typed_primitive re-derived a field's decode strategy (issubclass / __origin__ / isinstance / try-except chains) on every value even though it depends only on the field type. Resolve it once into a memoized "decode plan" callable per type, and build per-class array/map field plans, all cached in WeakKeyDictionaries (with safe fallbacks for unhashable or non-weakreferenceable types). Behavior is identical: same DeserializeException cases, Union fallback order, list/dict/Optional handling, IndefiniteList preservation, object_hook metadata, and the one-time f.type resolution. Encode: the recursive to_primitive descent re-validated the large Primitive Union return type via typeguard at every node. Route base-implementation recursion through an un-annotated _to_primitive worker (public to_primitive keeps its annotation and top-level check; overrides still dispatch polymorphically). Output is byte-for-byte identical. Result (typed PlutusData, cbor2pure, backend-independent): ~1.5x faster decode and ~4.3x faster encode on top of the type-hint caching. All 568 tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/serialization.py | 394 ++++++++++++++++++++++++++++++------- 1 file changed, 318 insertions(+), 76 deletions(-) diff --git a/pycardano/serialization.py b/pycardano/serialization.py index d83e98d7..e6982b26 100644 --- a/pycardano/serialization.py +++ b/pycardano/serialization.py @@ -14,7 +14,6 @@ from fractions import Fraction from functools import wraps from inspect import getfullargspec, isclass -from weakref import WeakKeyDictionary from typing import ( Any, Callable, @@ -31,6 +30,7 @@ cast, get_type_hints, ) +from weakref import WeakKeyDictionary from pycardano.cbor import cbor2 from pycardano.logging import logger @@ -305,10 +305,26 @@ def to_primitive(self) -> Primitive: SerializeException: When the object or its elements could not be converted to CBOR primitive types. """ + # Delegate to a private, *un-annotated* worker. The public method keeps its + # ``-> Primitive`` annotation (and the top-level typeguard return check), but the + # recursive descent runs through ``_to_primitive`` which has no annotations, so + # typeguard does not re-validate the (large) ``Primitive`` Union return type for + # every node of the tree. The produced value is byte-for-byte identical. + return self._to_primitive() + + def _to_primitive(self): + # NOTE: intentionally un-annotated so the ``@typechecked`` class decorator does + # not wrap this hot recursive worker with a return-type check. result = self.to_shallow_primitive() def _dfs(value, freeze=False): if isinstance(value, CBORSerializable): + # Preserve polymorphic dispatch: subclasses that override + # ``to_primitive`` must run their override (and its own type check). + # For everything that uses the base implementation, recurse through the + # cheap un-annotated worker to avoid redundant typeguard return checks. + if type(value).to_primitive is CBORSerializable.to_primitive: + return _dfs(value._to_primitive(), freeze) return _dfs(value.to_primitive(), freeze) elif isinstance(value, (dict, OrderedDict, defaultdict)): _dict = type(value)() @@ -710,7 +726,7 @@ def _accepts_type_args(t: type) -> bool: weakly referenceable, so only concrete classes are cached. """ if not isclass(t): - return "type_args" in getfullargspec(t.from_primitive).args + return "type_args" in getfullargspec(t.from_primitive).args # type: ignore[attr-defined] accepts = _ACCEPTS_TYPE_ARGS_CACHE.get(t) if accepts is None: accepts = "type_args" in getfullargspec(t.from_primitive).args @@ -718,17 +734,25 @@ def _accepts_type_args(t: type) -> bool: return accepts -def _restore_typed_primitive( - t: typing.Type, v: Primitive -) -> Union[Primitive, CBORSerializable]: - """Try to restore a value back to its original type based on information given in field. +# A "decode plan" is a callable ``plan(v) -> restored`` that resolves the per-field +# type dispatch once and is then reused for every value of that field type. The +# dispatch (issubclass / __origin__ / isinstance / try-except chains) depends only on +# the field TYPE, not on the value, so it is hoisted out of the per-value hot path. +# +# Plans are memoized per type. The cache is a WeakKeyDictionary so dynamically-created +# classes (and the typing aliases that reference them, e.g. ``List[SomeClass]``) are +# collected with their owning class. Types that are not weakly referenceable fall back +# to building the plan on the fly without caching, preserving behavior exactly. +_DECODE_PLAN_CACHE: "WeakKeyDictionary[Any, Callable[[Any], Any]]" = WeakKeyDictionary() - Args: - f (type): A type - v (:const:`Primitive`): A CBOR primitive. - Returns: - Union[:const:`Primitive`, CBORSerializable]: A CBOR primitive or a CBORSerializable. +def _build_decode_plan(t: typing.Type) -> Callable[[Any], Any]: + """Resolve the decode strategy for type ``t`` once and return a ``plan(v)`` callable. + + The returned callable reproduces exactly the branch of the original + ``_restore_typed_primitive`` chain that ``t`` would have taken, including the same + DeserializeException cases, the same Union fallback order, and list/dict/Optional + handling and IndefiniteList preservation. """ is_cbor_serializable = False @@ -743,61 +767,288 @@ def _restore_typed_primitive( except TypeError: pass - if t is Any or (t in PRIMITIVE_TYPES and isinstance(v, t)): - return v - elif is_cbor_serializable: + # NOTE: the original chain tests ``t is Any or (t in PRIMITIVE_TYPES and + # isinstance(v, t))`` first. For ``Any`` the value always passes through. For a + # primitive type, the value passes through only when ``isinstance(v, t)`` holds, + # otherwise the original code falls through to the remaining branches (which, for a + # primitive ``t``, ultimately raise). We must preserve that fall-through, so primitive + # types that are ALSO special-cased below (ByteString, IndefiniteList) resolve to the + # combined behavior rather than a pure pass-through. + if t is Any: + return _identity + + in_primitive = t in PRIMITIVE_TYPES + + if is_cbor_serializable: + # ``t`` is a CBORSerializable (possibly a generic alias). Resolve type_args once. + # ``from_primitive`` and ``args`` are bound once here and captured by the closure, + # so the per-value path does no attribute lookup or argspec work. + from_primitive = t.from_primitive if _accepts_type_args(t): args = typing.get_args(t) - return t.from_primitive(v, type_args=args) + + def plan(v): + return from_primitive(v, type_args=args) + else: - return t.from_primitive(v) - elif hasattr(t, "__origin__") and (t.__origin__ is list): + + def plan(v): + return from_primitive(v) + + if not in_primitive: + return plan + + # A CBORSerializable that is also a primitive type (e.g. IndefiniteList, + # ByteString subclasses): the original would short-circuit-return ``v`` when + # ``isinstance(v, t)``; otherwise it would take the is_cbor_serializable branch. + def plan_primitive_cbor(v, _t=t, _plan=plan): + if isinstance(v, _t): + return v + return _plan(v) + + return plan_primitive_cbor + + has_origin = hasattr(t, "__origin__") + origin = t.__origin__ if has_origin else None + + if has_origin and origin is list: t_args = t.__args__ if len(t_args) != 1: - raise DeserializeException( - f"List types need exactly one type argument, but got {t_args}" - ) - t_subtype = t_args[0] - if not isinstance(v, (list, IndefiniteList)): - raise DeserializeException(f"Expected type list but got {type(v)}") - v_list = [_restore_typed_primitive(t_subtype, w) for w in v] - return v.__class__(v_list) - elif isclass(t) and t == ByteString: - if not isinstance(v, bytes): - raise DeserializeException(f"Expected type bytes but got {type(v)}") - return ByteString(v) - elif hasattr(t, "__origin__") and (t.__origin__ is dict): + # Defer the error to call time to match original (it raised during decode). + def plan_bad_list(v, _t_args=t_args): + raise DeserializeException( + f"List types need exactly one type argument, but got {_t_args}" + ) + + return plan_bad_list + sub_plan = _decode_plan(t_args[0]) + + def plan_list(v, _sub=sub_plan): + if not isinstance(v, (list, IndefiniteList)): + raise DeserializeException(f"Expected type list but got {type(v)}") + return v.__class__([_sub(w) for w in v]) + + if not in_primitive: + return plan_list + + def plan_primitive_list(v, _t=t, _plan=plan_list): + if isinstance(v, _t): + return v + return _plan(v) + + return plan_primitive_list + + if isclass(t) and t == ByteString: + # ByteString is in PRIMITIVE_TYPES, so the original returns ``v`` unchanged when + # ``isinstance(v, ByteString)``; only a raw ``bytes`` reaches the ByteString + # branch and gets wrapped. Anything else raises. + def plan_bytestring(v): + if isinstance(v, ByteString): + return v + if not isinstance(v, bytes): + raise DeserializeException(f"Expected type bytes but got {type(v)}") + return ByteString(v) + + return plan_bytestring + + if has_origin and origin is dict: t_args = t.__args__ if len(t_args) != 2: + + def plan_bad_dict(v, _t_args=t_args): + raise DeserializeException( + f"Dict types need exactly two type arguments, but got {_t_args}" + ) + + return plan_bad_dict + key_plan = _decode_plan(t_args[0]) + val_plan = _decode_plan(t_args[1]) + + def plan_dict(v, _kp=key_plan, _vp=val_plan): + if not isinstance(v, dict): + raise DeserializeException(f"Expected dict type but got {type(v)}") + return {_kp(key): _vp(val) for key, val in v.items()} + + if not in_primitive: + return plan_dict + + def plan_primitive_dict(v, _t=t, _plan=plan_dict): + if isinstance(v, _t): + return v + return _plan(v) + + return plan_primitive_dict + + if has_origin and (origin is Union or origin is Optional): + t_args = t.__args__ + sub_plans = [_decode_plan(a) for a in t_args] + + def plan_union(v, _subs=sub_plans, _t_args=t_args): + for sub in _subs: + try: + return sub(v) + except DeserializeException: + pass raise DeserializeException( - f"Dict types need exactly two type arguments, but got {t_args}" + f"Cannot deserialize object: \n{v}\n in any valid type from {_t_args}." ) - key_t = t_args[0] - val_t = t_args[1] - if not isinstance(v, dict): - raise DeserializeException(f"Expected dict type but got {type(v)}") - return { - _restore_typed_primitive(key_t, key): _restore_typed_primitive(val_t, val) - for key, val in v.items() - } - elif hasattr(t, "__origin__") and ( - t.__origin__ is Union or t.__origin__ is Optional - ): - t_args = t.__args__ - for t in t_args: + + return plan_union + + if isclass(t) and issubclass(t, IndefiniteList): + # IndefiniteList is in PRIMITIVE_TYPES: original returns ``v`` unchanged when it + # is already an instance; otherwise it constructs ``t(v)``. + def plan_indefinite(v, _t=t): + if isinstance(v, _t): + return v try: - return _restore_typed_primitive(t, v) - except DeserializeException: - pass - raise DeserializeException( - f"Cannot deserialize object: \n{v}\n in any valid type from {t_args}." - ) - elif isclass(t) and issubclass(t, IndefiniteList): - try: - return t(v) - except TypeError: - raise DeserializeException(f"Can not initialize IndefiniteList from {v}") - raise DeserializeException(f"Cannot deserialize object: \n{v}\n to type {t}.") + return _t(v) + except TypeError: + raise DeserializeException( + f"Can not initialize IndefiniteList from {v}" + ) + + return plan_indefinite + + if in_primitive: + # Plain primitive type (int, bytes, str, ...): pass through when the value + # matches, otherwise the original chain raised at the end. + def plan_primitive(v, _t=t): + if isinstance(v, _t): + return v + raise DeserializeException( + f"Cannot deserialize object: \n{v}\n to type {_t}." + ) + + return plan_primitive + + def plan_unsupported(v, _t=t): + raise DeserializeException(f"Cannot deserialize object: \n{v}\n to type {_t}.") + + return plan_unsupported + + +def _decode_plan(t: typing.Type) -> Callable[[Any], Any]: + """Return a memoized decode plan for ``t``, building it on first use.""" + try: + plan = _DECODE_PLAN_CACHE.get(t) + except TypeError: + # ``t`` is not hashable; should not happen for real field types, but be safe. + return _build_decode_plan(t) + if plan is not None: + return plan + plan = _build_decode_plan(t) + try: + _DECODE_PLAN_CACHE[t] = plan + except TypeError: + # ``t`` is not weakly referenceable on this interpreter; skip caching. + pass + return plan + + +def _lazy_field_handler(t: typing.Type) -> Callable[[Any], Any]: + """Return a handler ``h(v)`` that decodes a field value of type ``t``. + + The decode plan for ``t`` is resolved on first use rather than eagerly, exactly + mirroring the original lazy behavior: a field that is never decoded (e.g. an absent + optional whose annotation cannot even be turned into a plan) never triggers plan + construction. After the first call the resolved plan is invoked directly. + """ + box: List[Callable[[Any], Any]] = [] + + def handler(v): + if box: + return box[0](v) + plan = _decode_plan(t) + box.append(plan) + return plan(v) + + return handler + + +def _restore_typed_primitive( + t: typing.Type, v: Primitive +) -> Union[Primitive, CBORSerializable]: + """Try to restore a value back to its original type based on information given in field. + + Args: + f (type): A type + v (:const:`Primitive`): A CBOR primitive. + + Returns: + Union[:const:`Primitive`, CBORSerializable]: A CBOR primitive or a CBORSerializable. + """ + return _decode_plan(t)(v) + + +# Per-class plan for restoring an ArrayCBORSerializable from a list of primitives. Each +# entry is ``(field_name, handler)`` where ``handler(v)`` restores one field's value. +# Resolving the field list, type hints, and the per-field decode strategy depends only on +# the class, so it is computed once and reused for every instance. +_ARRAY_FIELD_PLAN_CACHE: ( + "WeakKeyDictionary[type, List[typing.Tuple[str, Callable[[Any], Any]]]]" +) = WeakKeyDictionary() + + +def _array_field_plan( + cls: type, +) -> List[typing.Tuple[str, Callable[[Any], Any]]]: + plan = _ARRAY_FIELD_PLAN_CACHE.get(cls) + if plan is not None: + return plan + type_hints = _cached_type_hints(cls) + plan = [] + for f in fields(cls): + if not f.init: + continue + # Preserve the original lazy resolution of the (possibly string) annotation to a + # concrete type, including the in-place mutation of ``f.type`` other code relies on. + if not isclass(f.type): + f.type = type_hints[f.name] + if "object_hook" in f.metadata: + hook = f.metadata["object_hook"] + handler: Callable[[Any], Any] = hook + else: + handler = _lazy_field_handler(cast(Any, f.type)) + plan.append((f.name, handler)) + try: + _ARRAY_FIELD_PLAN_CACHE[cls] = plan + except TypeError: + pass + return plan + + +# Per-class plan for restoring a MapCBORSerializable. Maps each CBOR key to +# ``(field_name, handler)`` where ``handler(v)`` restores the value for that field. +_MAP_FIELD_PLAN_CACHE: ( + "WeakKeyDictionary[type, Dict[Any, typing.Tuple[str, Callable[[Any], Any]]]]" +) = WeakKeyDictionary() + + +def _map_field_plan( + cls: type, +) -> Dict[Any, typing.Tuple[str, Callable[[Any], Any]]]: + plan = _MAP_FIELD_PLAN_CACHE.get(cls) + if plan is not None: + return plan + type_hints = _cached_type_hints(cls) + plan = {} + for f in fields(cls): + if not f.init: + continue + key = f.metadata.get("key", f.name) + if not isclass(f.type): + f.type = type_hints[f.name] + if "object_hook" in f.metadata: + handler: Callable[[Any], Any] = f.metadata["object_hook"] + else: + handler = _lazy_field_handler(cast(Any, f.type)) + plan[key] = (f.name, handler) + try: + _MAP_FIELD_PLAN_CACHE[cls] = plan + except TypeError: + pass + return plan ArrayBase = TypeVar("ArrayBase", bound="ArrayCBORSerializable") @@ -898,18 +1149,13 @@ def from_primitive( Raises: DeserializeException: When the object could not be restored from primitives. """ - all_fields = [f for f in fields(cls) if f.init] - - restored_vals = [] - type_hints = _cached_type_hints(cls) - for f, v in zip(all_fields, values): - if not isclass(f.type): - f.type = type_hints[f.name] - v = _restore_dataclass_field(f, v) - restored_vals.append(v) + field_plan = _array_field_plan(cls) + + restored_vals = [handler(v) for (_, handler), v in zip(field_plan, values)] obj = cls(*restored_vals) - for i in range(len(all_fields), len(values)): - setattr(obj, f"unknown_field{i - len(all_fields)}", values[i]) + n_fields = len(field_plan) + for i in range(n_fields, len(values)): + setattr(obj, f"unknown_field{i - n_fields}", values[i]) return obj def __repr__(self): @@ -1007,19 +1253,15 @@ def from_primitive(cls: Type[MapBase], values: Union[dict, FrozenDict]) -> MapBa Raises: :class:`pycardano.exception.DeserializeException`: When the object could not be restored from primitives. """ - all_fields = {f.metadata.get("key", f.name): f for f in fields(cls) if f.init} + field_plan = _map_field_plan(cls) kwargs = {} - type_hints = _cached_type_hints(cls) for key in values: - if key not in all_fields: + entry = field_plan.get(key) + if entry is None: raise DeserializeException(f"Unexpected map key {key} in CBOR.") - f = all_fields[key] - v = values[key] - if not isclass(f.type): - f.type = type_hints[f.name] - v = _restore_dataclass_field(f, v) - kwargs[f.name] = v + field_name, handler = entry + kwargs[field_name] = handler(values[key]) return cls(**kwargs) def __repr__(self): From 59d76ad6d0c2ca230b888e0268796686960c7fa5 Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Fri, 5 Jun 2026 09:23:52 -0400 Subject: [PATCH 5/9] Avoid redundant CBOR re-encoding in OrderedSet and to_validated_primitive OrderedSet.append/remove re-encoded each element via dumps() twice (once for the membership check, once for the dict key); compute the CBOR de-dup key once. This dominated decode of set-heavy transactions (dumps was ~78% of decode cumtime on real fixtures). to_validated_primitive carried a `-> Primitive` return annotation, so the @typechecked class decorator re-validated the result against the 26-member Primitive Union even though to_primitive (which it calls) already return-checks it once. Drop the annotation (mirrors the existing _to_primitive worker). Result: set-heavy tx decode 886 -> 384 us (2.3x); tx encode ~1.4x. Byte-identical output, all 568 tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/serialization.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pycardano/serialization.py b/pycardano/serialization.py index e6982b26..57371ed4 100644 --- a/pycardano/serialization.py +++ b/pycardano/serialization.py @@ -418,7 +418,7 @@ def _check_recursive(value, type_hint): f"got {repr(field_value)} instead." ) - def to_validated_primitive(self) -> Primitive: + def to_validated_primitive(self): """Convert the instance and its elements to CBOR primitives recursively with data validated by :meth:`validate` method. @@ -429,6 +429,9 @@ def to_validated_primitive(self) -> Primitive: SerializeException: When the object or its elements could not be converted to CBOR primitive types. """ + # NOTE: intentionally un-annotated return type so the ``@typechecked`` class + # decorator does not re-validate the result against the large ``Primitive`` Union. + # ``to_primitive`` (called below) already return-checks the value exactly once. self.validate() return self.to_primitive() @@ -1424,10 +1427,15 @@ def __init__( self.extend(iterable) def append(self, item: T) -> None: - if item in self: + # Encode the element to its CBOR de-dup key exactly once. Previously the + # membership check (``item in self``) and the insertion each re-encoded the + # element via dumps(), doubling the cost — which dominated decode of + # set-heavy transactions. + key = dumps(item, default=default_encoder) + if key in self._dict: return self._list.append(item) - self._dict[dumps(item, default=default_encoder)] = len(self._list) - 1 + self._dict[key] = len(self._list) - 1 def extend(self, items: Iterable[T]) -> None: self._is_indefinite_list = isinstance(items, IndefiniteList) @@ -1435,9 +1443,10 @@ def extend(self, items: Iterable[T]) -> None: self.append(item) def remove(self, item: T) -> None: - if item not in self: + key = dumps(item, default=default_encoder) + if key not in self._dict: return - index = self._dict.pop(dumps(item, default=default_encoder)) + index = self._dict.pop(key) self._list.pop(index) # Update the indices in the dictionary for key, idx in self._dict.items(): From 51ee90da04bfdf9b1a6eea371730e8c66f1e27e0 Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Fri, 5 Jun 2026 09:23:52 -0400 Subject: [PATCH 6/9] Skip deepcopy in Asset/MultiAsset.to_shallow_primitive when no zeros present to_shallow_primitive did deepcopy(self).normalize() on every encode solely to avoid mutating self while stripping zero/empty entries. Scan first and skip the deepcopy when there is nothing to strip (the common case). Result: MultiAsset.to_shallow_primitive 130 -> 33 us (~3.9x) for a typical token-transfer multi-asset. Byte-identical output, all 568 tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/transaction.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pycardano/transaction.py b/pycardano/transaction.py index fa3f1666..0c02078a 100644 --- a/pycardano/transaction.py +++ b/pycardano/transaction.py @@ -154,8 +154,13 @@ def from_primitive(cls: Type[DictBase], value: dict) -> DictBase: return res def to_shallow_primitive(self) -> dict: - x = deepcopy(self).normalize() - return super(self.__class__, x).to_shallow_primitive() + # normalize() only removes zero-valued entries and mutates in place, so the + # deepcopy exists solely to avoid mutating self. Skip it when there is nothing + # to strip (the common case) — output is identical. + if any(v == 0 for v in self.data.values()): + x = deepcopy(self).normalize() + return super(self.__class__, x).to_shallow_primitive() + return super(self.__class__, self).to_shallow_primitive() @typechecked @@ -277,8 +282,16 @@ def from_primitive(cls: Type[DictBase], value: dict) -> DictBase: return res def to_shallow_primitive(self) -> dict: - x = deepcopy(self).normalize() - return super(self.__class__, x).to_shallow_primitive() + # As with Asset: normalize() only strips empty sub-assets / zero values and + # mutates in place, so skip the deepcopy unless something would actually be + # removed. Output is identical. + if any( + (not v.data) or any(av == 0 for av in v.data.values()) + for v in self.data.values() + ): + x = deepcopy(self).normalize() + return super(self.__class__, x).to_shallow_primitive() + return super(self.__class__, self).to_shallow_primitive() @typechecked From 69b42d93e2614f07511173d610bfefcf353e40e3 Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Fri, 5 Jun 2026 09:56:29 -0400 Subject: [PATCH 7/9] Optimize OrderedSet de-dup, encode recursion, and field lookups - OrderedSet (#4): key de-duplication by the element's native hash, falling back to CBOR bytes only for unhashable elements (namespaced so it can't collide). This avoids a dumps() per element entirely for hashable set members. BEHAVIOR NOTE: de-dup for hashable elements is now by Python __eq__/__hash__ rather than CBOR-byte equality. These coincide for pycardano's set element types (TransactionInput, key hashes, witnesses); unhashable elements keep the original CBOR-byte semantics. Added tests for the unhashable/mixed paths. - _dfs encode recursion (#5): scalar-leaf fast path + iterate IndefiniteList.data directly (avoids the slow collections.abc.Sequence.__iter__). - Cache dataclasses.fields() per class in to_shallow_primitive (#6). Result (cbor2pure, backend-independent): set-heavy tx decode 395 -> 224 us (1.76x), typed PlutusData encode 3257 -> 2285 us (1.43x), datum_hash 3959 -> 3027 us (1.31x). All 569 tests pass; byte-identical output (except the documented OrderedSet de-dup-key semantics). Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/serialization.py | 74 +++++++++++++++++++++++----- test/pycardano/test_serialization.py | 18 +++++++ 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/pycardano/serialization.py b/pycardano/serialization.py index 57371ed4..4345473d 100644 --- a/pycardano/serialization.py +++ b/pycardano/serialization.py @@ -236,8 +236,12 @@ def default_encoder( # handling here to explicitly write header (b'\x9f'), each body item, and footer (b'\xff') to # the output bytestring. encoder.write(b"\x9f") - for item in value: - encoder.encode(item) + # Iterate the underlying list for a plain IndefiniteList (UserList) to avoid the + # slow Sequence.__iter__; IndefiniteFrozenList has no usable .data, so use identity. + items = value.data if type(value) is IndefiniteList else value + encode = encoder.encode + for item in items: + encode(item) encoder.write(b"\xff") elif isinstance(value, ByteString): if len(value.value) > 64: @@ -318,6 +322,18 @@ def _to_primitive(self): result = self.to_shallow_primitive() def _dfs(value, freeze=False): + tv = type(value) + # Fast path for scalar leaves (the large majority of nodes), skipping the + # isinstance cascade below. + if ( + tv is int + or tv is str + or tv is bytes + or tv is bool + or tv is float + or value is None + ): + return value if isinstance(value, CBORSerializable): # Preserve polymorphic dispatch: subclasses that override # ``to_primitive`` must run their override (and its own type check). @@ -345,7 +361,11 @@ def _dfs(value, freeze=False): elif isinstance( value, (IndefiniteFrozenList, FrozenList, IndefiniteList, list) ): - _list = [_dfs(v, freeze) for v in value] + # Iterate the underlying storage for a plain IndefiniteList (a UserList) + # to avoid the slow collections.abc.Sequence.__iter__ generator. Must use + # an identity check: IndefiniteFrozenList is a subclass with no usable .data. + src = value.data if tv is IndefiniteList else value + _list = [_dfs(v, freeze) for v in src] already_frozen = isinstance(value, (IndefiniteFrozenList, FrozenList)) should_freeze = already_frozen or freeze @@ -737,6 +757,19 @@ def _accepts_type_args(t: type) -> bool: return accepts +_FIELDS_CACHE: "WeakKeyDictionary[type, tuple]" = WeakKeyDictionary() + + +def _cached_fields(cls: type) -> tuple: + """Return ``dataclasses.fields(cls)``, memoized per class. The field set is + class-invariant, so recomputing it on every (de)serialization is wasted work.""" + flds = _FIELDS_CACHE.get(cls) + if flds is None: + flds = fields(cls) + _FIELDS_CACHE[cls] = flds + return flds + + # A "decode plan" is a callable ``plan(v) -> restored`` that resolves the per-field # type dispatch once and is then reused for every value of that field type. The # dispatch (issubclass / __origin__ / isinstance / try-except chains) depends only on @@ -1128,7 +1161,7 @@ def to_shallow_primitive(self) -> Primitive: types. """ primitives = [] - for f in fields(self): + for f in _cached_fields(type(self)): val = getattr(self, f.name) if val is None and f.metadata.get("optional"): continue @@ -1228,7 +1261,7 @@ class MapCBORSerializable(CBORSerializable): def to_shallow_primitive(self) -> Primitive: primitives = {} - for f in fields(self): + for f in _cached_fields(type(self)): if "key" in f.metadata: key = f.metadata["key"] else: @@ -1418,7 +1451,7 @@ def __init__( use_tag: bool = True, ): super().__init__() - self._dict: Dict[bytes, int] = {} + self._dict: Dict[Any, int] = {} self._list: List[T] = [] self._use_tag = use_tag self._is_indefinite_list = False @@ -1426,12 +1459,27 @@ def __init__( self._is_indefinite_list = isinstance(iterable, IndefiniteList) self.extend(iterable) + # Sentinel used to namespace CBOR-bytes de-dup keys (for unhashable elements) so + # they can never collide with a hashable element used directly as a dict key. + _CBOR_KEY = object() + + def _dedup_key(self, item): + """De-duplication key for an element. Hashable elements (the common case: + TransactionInput, key hashes, etc.) are used directly as the dict key — fast, + and consistent with their value equality. Unhashable elements (e.g. list-valued + plutus data) fall back to their CBOR bytes, the original behavior, namespaced by + a sentinel so they cannot collide with a hashable key.""" + try: + hash(item) + except TypeError: + return (self._CBOR_KEY, dumps(item, default=default_encoder)) + return item + def append(self, item: T) -> None: - # Encode the element to its CBOR de-dup key exactly once. Previously the - # membership check (``item in self``) and the insertion each re-encoded the - # element via dumps(), doubling the cost — which dominated decode of - # set-heavy transactions. - key = dumps(item, default=default_encoder) + # Compute the de-dup key once. Membership check + insertion previously each + # re-encoded the element via dumps(), which dominated decode of set-heavy + # transactions; hashable elements now avoid CBOR encoding entirely. + key = self._dedup_key(item) if key in self._dict: return self._list.append(item) @@ -1443,7 +1491,7 @@ def extend(self, items: Iterable[T]) -> None: self.append(item) def remove(self, item: T) -> None: - key = dumps(item, default=default_encoder) + key = self._dedup_key(item) if key not in self._dict: return index = self._dict.pop(key) @@ -1454,7 +1502,7 @@ def remove(self, item: T) -> None: self._dict[key] = idx - 1 def __contains__(self, item: object) -> bool: - return dumps(item, default=default_encoder) in self._dict + return self._dedup_key(item) in self._dict def __iter__(self): return iter(self._list) diff --git a/test/pycardano/test_serialization.py b/test/pycardano/test_serialization.py index 9a8c0ed9..3c05bf27 100644 --- a/test/pycardano/test_serialization.py +++ b/test/pycardano/test_serialization.py @@ -681,6 +681,24 @@ def test_ordered_set_with_complex_types(): assert restored == witness_set +def test_ordered_set_dedup_unhashable_and_mixed(): + # Unhashable elements (e.g. plain dicts) de-duplicate via their CBOR bytes — the + # fallback path for the native-hash de-dup key. + s = OrderedSet([{"a": 1}, {"a": 1}, {"b": 2}]) + assert len(list(s)) == 2 + assert {"a": 1} in s + assert {"c": 3} not in s + + # Mixing hashable and unhashable elements must not produce false collisions. + mixed = OrderedSet([1, {"a": 1}, 1, {"a": 1}]) + assert len(list(mixed)) == 2 + assert 1 in mixed and {"a": 1} in mixed + + # remove() works on the unhashable (CBOR-keyed) path and preserves order. + s.remove({"a": 1}) + assert list(s) == [{"b": 2}] + + def test_non_empty_ordered_set(): # Test basic functionality s = NonEmptyOrderedSet([1, 2, 3]) From 6b3d4739585790f53886228f63b26e5f6f728ea8 Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Fri, 5 Jun 2026 09:56:29 -0400 Subject: [PATCH 8/9] Guard Byron-address CBOR probe behind a byte prefix Address.from_primitive ran a speculative cbor2.loads() on every address to detect a Byron tag-24 wrapper. A Byron address is a 2-element CBOR array whose first element is tag 24, i.e. bytes starting b"\x82\xd8\x18"; no Shelley header byte is 0x82. Only run the probe when the prefix matches, skipping it on the common Shelley path. Byte-identical behavior. Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/address.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/pycardano/address.py b/pycardano/address.py index 0b4290ab..b8fd5213 100644 --- a/pycardano/address.py +++ b/pycardano/address.py @@ -418,19 +418,24 @@ def from_primitive(cls: Type[Address], value: Union[bytes, str]) -> Address: raise DecodingException(f"Failed to decode address string: {e}") # At this point, value is always bytes - # Check if it's a Byron address (CBOR with tag 24) - try: - decoded = cbor2.loads(value) - if isinstance(decoded, (tuple, list)) and len(decoded) == 2: - if isinstance(decoded[0], CBORTag) and decoded[0].tag == 24: - # This is definitely a Byron address - validate and decode it - return cls._from_byron_cbor(value) - except DecodingException: - # Byron decoding failed with validation error - re-raise it - raise - except Exception: - # Not Byron CBOR (general CBOR decode error), continue with Shelley decoding - pass + # Check if it's a Byron address (CBOR with tag 24). A Byron address is a + # 2-element CBOR array whose first element is tag 24, i.e. its bytes start with + # b"\x82\xd8\x18" (array(2) + tag(24)). Guarding on that prefix avoids a + # speculative cbor2.loads() on every (Shelley) address, whose header byte is + # never 0x82. + if value[:3] == b"\x82\xd8\x18": + try: + decoded = cbor2.loads(value) + if isinstance(decoded, (tuple, list)) and len(decoded) == 2: + if isinstance(decoded[0], CBORTag) and decoded[0].tag == 24: + # This is definitely a Byron address - validate and decode it + return cls._from_byron_cbor(value) + except DecodingException: + # Byron decoding failed with validation error - re-raise it + raise + except Exception: + # Not Byron CBOR (general CBOR decode error), continue with Shelley + pass # Shelley address decoding (existing logic) header = value[0] From 4bcaa5e37c9429c5d5f0fa1880976043c4d1b6de Mon Sep 17 00:00:00 2001 From: Elder Millenial Date: Fri, 5 Jun 2026 10:16:36 -0400 Subject: [PATCH 9/9] Add test coverage for the new optimization code paths Cover the reachable new branches with targeted tests: - OrderedSet de-dup unhashable/mixed fallback and remove() (test_serialization) - direct _restore_typed_primitive entry + ByteString passthrough/wrap - Asset/MultiAsset.to_shallow_primitive deepcopy/normalize path (zero values) - PlutusData.__post_init__ cached fast-path byte-length validation - Byron address decode from raw CBOR bytes + invalid-CBOR fallback Mark genuinely-unreachable defensive branches with `# pragma: no cover` (impossible generic-alias arities; the is-CBORSerializable-AND-PRIMITIVE_TYPE case that cannot co-occur; non-hashable / non-weakreferenceable type fallbacks; non-init map fields). All diff lines are now covered or excludable. 574 tests pass; flake8/mypy/black/isort clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- pycardano/serialization.py | 66 ++++++++++++++-------------- test/pycardano/test_byron_address.py | 18 ++++++++ test/pycardano/test_plutus.py | 16 +++++++ test/pycardano/test_serialization.py | 11 +++++ test/pycardano/test_transaction.py | 17 +++++++ 5 files changed, 94 insertions(+), 34 deletions(-) diff --git a/pycardano/serialization.py b/pycardano/serialization.py index 4345473d..9022b2ea 100644 --- a/pycardano/serialization.py +++ b/pycardano/serialization.py @@ -831,26 +831,24 @@ def plan(v): def plan(v): return from_primitive(v) - if not in_primitive: - return plan - - # A CBORSerializable that is also a primitive type (e.g. IndefiniteList, - # ByteString subclasses): the original would short-circuit-return ``v`` when - # ``isinstance(v, t)``; otherwise it would take the is_cbor_serializable branch. - def plan_primitive_cbor(v, _t=t, _plan=plan): - if isinstance(v, _t): - return v - return _plan(v) - - return plan_primitive_cbor + if ( + in_primitive + ): # pragma: no cover - no CBORSerializable type is a PRIMITIVE_TYPE + # Defensive mirror of the original short-circuit for a CBORSerializable that + # is also a primitive type. No such type exists, so this never executes. + def plan_primitive_cbor(v, _t=t, _plan=plan): + return v if isinstance(v, _t) else _plan(v) + + return plan_primitive_cbor + return plan has_origin = hasattr(t, "__origin__") origin = t.__origin__ if has_origin else None if has_origin and origin is list: t_args = t.__args__ - if len(t_args) != 1: - # Defer the error to call time to match original (it raised during decode). + if len(t_args) != 1: # pragma: no cover - typing guarantees exactly one arg + # Defensive: defer the error to call time to match the original. def plan_bad_list(v, _t_args=t_args): raise DeserializeException( f"List types need exactly one type argument, but got {_t_args}" @@ -864,15 +862,15 @@ def plan_list(v, _sub=sub_plan): raise DeserializeException(f"Expected type list but got {type(v)}") return v.__class__([_sub(w) for w in v]) - if not in_primitive: - return plan_list + if ( + in_primitive + ): # pragma: no cover - a List[...] alias is never a PRIMITIVE_TYPE - def plan_primitive_list(v, _t=t, _plan=plan_list): - if isinstance(v, _t): - return v - return _plan(v) + def plan_primitive_list(v, _t=t, _plan=plan_list): + return v if isinstance(v, _t) else _plan(v) - return plan_primitive_list + return plan_primitive_list + return plan_list if isclass(t) and t == ByteString: # ByteString is in PRIMITIVE_TYPES, so the original returns ``v`` unchanged when @@ -889,7 +887,7 @@ def plan_bytestring(v): if has_origin and origin is dict: t_args = t.__args__ - if len(t_args) != 2: + if len(t_args) != 2: # pragma: no cover - typing guarantees exactly two args def plan_bad_dict(v, _t_args=t_args): raise DeserializeException( @@ -905,15 +903,15 @@ def plan_dict(v, _kp=key_plan, _vp=val_plan): raise DeserializeException(f"Expected dict type but got {type(v)}") return {_kp(key): _vp(val) for key, val in v.items()} - if not in_primitive: - return plan_dict + if ( + in_primitive + ): # pragma: no cover - a Dict[...] alias is never a PRIMITIVE_TYPE - def plan_primitive_dict(v, _t=t, _plan=plan_dict): - if isinstance(v, _t): - return v - return _plan(v) + def plan_primitive_dict(v, _t=t, _plan=plan_dict): + return v if isinstance(v, _t) else _plan(v) - return plan_primitive_dict + return plan_primitive_dict + return plan_dict if has_origin and (origin is Union or origin is Optional): t_args = t.__args__ @@ -968,7 +966,7 @@ def _decode_plan(t: typing.Type) -> Callable[[Any], Any]: """Return a memoized decode plan for ``t``, building it on first use.""" try: plan = _DECODE_PLAN_CACHE.get(t) - except TypeError: + except TypeError: # pragma: no cover - real field types are hashable # ``t`` is not hashable; should not happen for real field types, but be safe. return _build_decode_plan(t) if plan is not None: @@ -976,7 +974,7 @@ def _decode_plan(t: typing.Type) -> Callable[[Any], Any]: plan = _build_decode_plan(t) try: _DECODE_PLAN_CACHE[t] = plan - except TypeError: + except TypeError: # pragma: no cover - real field types are weakly referenceable # ``t`` is not weakly referenceable on this interpreter; skip caching. pass return plan @@ -1049,7 +1047,7 @@ def _array_field_plan( plan.append((f.name, handler)) try: _ARRAY_FIELD_PLAN_CACHE[cls] = plan - except TypeError: + except TypeError: # pragma: no cover - real classes are weakly referenceable pass return plan @@ -1070,7 +1068,7 @@ def _map_field_plan( type_hints = _cached_type_hints(cls) plan = {} for f in fields(cls): - if not f.init: + if not f.init: # pragma: no cover - map serializable fields are init fields continue key = f.metadata.get("key", f.name) if not isclass(f.type): @@ -1082,7 +1080,7 @@ def _map_field_plan( plan[key] = (f.name, handler) try: _MAP_FIELD_PLAN_CACHE[cls] = plan - except TypeError: + except TypeError: # pragma: no cover - real classes are weakly referenceable pass return plan diff --git a/test/pycardano/test_byron_address.py b/test/pycardano/test_byron_address.py index 5df8894f..18361790 100644 --- a/test/pycardano/test_byron_address.py +++ b/test/pycardano/test_byron_address.py @@ -17,6 +17,24 @@ class TestAddress: # Known Byron mainnet address for testing BYRON_MAINNET_ADDR = "DdzFFzCqrhsxrgB6w6VhgfAqUZ69Va583murc21S4QFTJ6WUHAh4Gk8t1QHofpza5MZxG4dNVQWe8q78h4Utp9MGBQHBLD54rz6CTLsm" + def test_from_primitive_byron_cbor_bytes(self): + """Decoding a Byron address from raw CBOR bytes must hit the byte-prefix + guarded Byron probe (not just the base58 string path).""" + addr = Address.decode(self.BYRON_MAINNET_ADDR) + raw = addr.to_primitive() # Byron addresses serialize to CBOR bytes + assert isinstance(raw, bytes) and raw[:3] == b"\x82\xd8\x18" + # The bytes path runs the cbor2.loads/tag-24 probe (guarded by the prefix). + restored = Address.from_primitive(raw) + assert restored == addr + assert restored.is_byron + + def test_from_primitive_byron_prefix_invalid_cbor(self): + """Bytes with the Byron prefix (0x82 0xd8 0x18) but invalid CBOR must fall + through the probe's general-exception handler and not be treated as Byron.""" + bad = b"\x82\xd8\x18" # array(2)+tag(24) header, then truncated -> loads fails + with pytest.raises(Exception): + Address.from_primitive(bad) + def test_decode_mainnet_address(self): """Test decoding a Byron mainnet address.""" addr = Address.decode(self.BYRON_MAINNET_ADDR) diff --git a/test/pycardano/test_plutus.py b/test/pycardano/test_plutus.py index 825f3464..d7312775 100644 --- a/test/pycardano/test_plutus.py +++ b/test/pycardano/test_plutus.py @@ -341,6 +341,22 @@ def test_raw_plutus_data(): check_two_way_cbor(raw_plutus_data) +def test_plutusdata_long_bytes_validation_cached_path(): + from pycardano.exception import InvalidArgumentException + + @dataclass + class HasBytes(PlutusData): + CONSTR_ID = 0 + a: bytes + + # First instance (cache miss) validates the field types and caches the class. + HasBytes(b"short") + # Second instance hits the cached fast path, which must still enforce the + # per-instance >64-byte limit. + with pytest.raises(InvalidArgumentException): + HasBytes(b"x" * 65) + + def test_clone_raw_plutus_data(): tag = RawPlutusData(CBORTag(121, [1000])) diff --git a/test/pycardano/test_serialization.py b/test/pycardano/test_serialization.py index 3c05bf27..a1aca0d3 100644 --- a/test/pycardano/test_serialization.py +++ b/test/pycardano/test_serialization.py @@ -699,6 +699,17 @@ def test_ordered_set_dedup_unhashable_and_mixed(): assert list(s) == [{"b": 2}] +def test_restore_typed_primitive_direct(): + from pycardano.serialization import _restore_typed_primitive + + # Direct entry point: a primitive value passes straight through. + assert _restore_typed_primitive(int, 5) == 5 + # ByteString: a raw bytes value is wrapped; an already-ByteString value passes through. + bs = ByteString(b"hello") + assert _restore_typed_primitive(ByteString, b"hello") == bs + assert _restore_typed_primitive(ByteString, bs) is bs + + def test_non_empty_ordered_set(): # Test basic functionality s = NonEmptyOrderedSet([1, 2, 3]) diff --git a/test/pycardano/test_transaction.py b/test/pycardano/test_transaction.py index c2ffcf71..d9c7626b 100644 --- a/test/pycardano/test_transaction.py +++ b/test/pycardano/test_transaction.py @@ -27,6 +27,23 @@ from pycardano.witness import TransactionWitnessSet, VerificationKeyWitness +def test_asset_multiasset_to_shallow_with_zeros(): + # A zero-valued entry forces the deepcopy/normalize path in to_shallow_primitive + # (the fast path skips it only when there is nothing to strip). + a = Asset() + a[AssetName(b"tok")] = 0 + a[AssetName(b"tok2")] = 5 + # normalize() strips the zero entry; output must not contain it. + assert a.to_primitive() == {b"tok2": 5} + + ma = MultiAsset() + z = Asset() + z[AssetName(b"x")] = 0 + z[AssetName(b"y")] = 1 + ma[ScriptHash(b"\x00" * SCRIPT_HASH_SIZE)] = z + assert ma.to_primitive() == {b"\x00" * SCRIPT_HASH_SIZE: {b"y": 1}} + + def test_transaction_input(): tx_id_hex = "732bfd67e66be8e8288349fcaaa2294973ef6271cc189a239bb431275401b8e5" tx_in = TransactionInput(TransactionId(bytes.fromhex(tx_id_hex)), 0)