Skip to content

Comments

Add nodes to convert between csp structs and arrow record batches#680

Open
arhamchopra wants to merge 1 commit intomainfrom
ac/arrow_nodes
Open

Add nodes to convert between csp structs and arrow record batches#680
arhamchopra wants to merge 1 commit intomainfrom
ac/arrow_nodes

Conversation

@arhamchopra
Copy link
Collaborator

@arhamchopra arhamchopra commented Feb 19, 2026

Add record_batches_to_struct and struct_to_record_batches nodes

Two new C++-backed nodes for bidirectional conversion between csp.Struct and Arrow RecordBatch:

  • struct_to_record_batchests[List[T]]ts[List[pa.RecordBatch]]
  • record_batches_to_structts[List[pa.RecordBatch]]ts[List[T]]

Both use the Arrow C Data Interface (PyCapsule protocol) to cross the Python/C++ boundary without serialization overhead. Scalar and temporal fields are read/written entirely in C++. Numpy array fields use bulk memory transfers rather than per-element iteration.

Supported types

bool, int8/16/32/64, uint8/16/32/64, double, str, bytes, datetime, timedelta, date, time, csp.Enum, nested csp.Struct, Numpy1DArray[T], and NumpyNDArray[T].

Usage

import csp
import pyarrow as pa
from csp.adapters.arrow import record_batches_to_struct, struct_to_record_batches
from csp.typing import Numpy1DArray, NumpyNDArray


class SensorReading(csp.Struct):
    temperature: float
    humidity: float
    station: str


# --- Struct to RecordBatch ---

@csp.graph
def write_example(data: csp.ts[csp.typing.List[SensorReading]]):
    # All fields included by default with identity naming
    batches = struct_to_record_batches(data, SensorReading)

    # With explicit field mapping {struct_field: arrow_column}
    batches = struct_to_record_batches(
        data, SensorReading,
        field_map={
            "temperature": "temp_c",
            "humidity": "rel_humidity",
            "station": "station_id",
        },
    )

    # Control max rows per output batch (default 65536)
    batches = struct_to_record_batches(data, SensorReading, max_batch_size=10000)


# --- RecordBatch to Struct ---

@csp.graph
def read_example(data: csp.ts[csp.typing.List[pa.RecordBatch]], schema: pa.Schema):
    # field_map maps {arrow_column: struct_field}
    structs = record_batches_to_struct(
        data, SensorReading,
        field_map={
            "temp_c": "temperature",
            "rel_humidity": "humidity",
            "station_id": "station",
        },
        schema=schema,
    )


# --- Numpy array fields ---

class ImageBatch(csp.Struct):
    label: str
    embedding: Numpy1DArray[float]
    pixels: NumpyNDArray[float]

@csp.graph
def numpy_example(data: csp.ts[csp.typing.List[ImageBatch]]):
    # Numpy fields are auto-detected from struct metadata.
    # 1D arrays stored as list<float64>.
    # NDArrays stored as list<float64> + a list<int64> shape column
    # (default name: <column>_csp_dimensions).
    batches = struct_to_record_batches(data, ImageBatch)


# --- Round-trip ---

@csp.graph
def roundtrip_example(readings: csp.ts[csp.typing.List[SensorReading]]):
    field_map = {"temperature": "temperature", "humidity": "humidity", "station": "station"}
    schema = pa.schema([
        ("temperature", pa.float64()),
        ("humidity", pa.float64()),
        ("station", pa.utf8()),
    ])

    batches = struct_to_record_batches(readings, SensorReading)
    restored = record_batches_to_struct(batches, SensorReading, field_map, schema)

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra marked this pull request as ready for review February 20, 2026 17:07
@arhamchopra arhamchopra added the type: feature Issues and PRs related to new features label Feb 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type: feature Issues and PRs related to new features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant