Framework-agnostic Python toolkit to build backend applications with:
- Auto-CRUD — full REST surface from a model declaration, two lines of code
- typed use cases (
msgspec.Struct) with rules, computes, and dependency injection - repositories decoupled from infrastructure
- REST/FastAPI adapters with OpenAPI generation
- background jobs and Celery workers, first-class
- declarative ETL — compile-time-validated pipelines for Polars and Spark
- testing utilities for business workflows and ETL steps
loom-kernel helps you ship production APIs faster without sacrificing clean
architecture. Declare your domain model, describe your business rules, and let the
framework handle the infrastructure plumbing — DB wiring, DI, routing, serialization.
The library separates core contracts from concrete adapters so you can swap infrastructure (DB, cache, transport) without breaking business logic.
- Usage guides and architecture docs are available in the
docs/site. - API reference is autogenerated from public docstrings.
- End-to-end REST demo:
dummy-loom. - End-to-end ETL demo:
dummy-loom-etl— full Polars and Spark pipeline examples.
| Subpath | What it is for |
|---|---|
src/loom/core/use_case |
UseCase definition, rules (Rule), and compute steps (Compute). |
src/loom/core/engine |
Compilation and runtime execution of a use-case plan. |
src/loom/core/repository/abc |
Repository contracts, pagination, and typed query spec. |
src/loom/core/repository/sqlalchemy |
Concrete async SQLAlchemy repository implementation. |
src/loom/core/model |
Base model, fields, relations, and entity introspection. |
src/loom/core/cache |
Decorators and cached repository with dependency invalidation. |
src/loom/core/config |
YAML config loader with cloud URI support and pluggable resolvers. |
src/loom/rest |
Framework-agnostic REST model and route compiler. |
src/loom/rest/fastapi |
Direct FastAPI integration (auto wiring and runtime router). |
src/loom/prometheus |
Middleware and adapter for runtime metrics. |
src/loom/testing |
Harnesses for unit/integration tests and golden tests. |
src/loom/etl |
Declarative ETL subsystem — pipelines, Polars/Spark backends, observability. |
src/loom/etl/pipeline |
ETLStep, ETLProcess, ETLPipeline, StepSQL, and ETLParams. |
src/loom/etl/declarative |
FromTable, FromFile, IntoTable, IntoFile, predicate DSL, and params proxy. |
src/loom/etl/schema |
Backend-agnostic schema model (LoomDtype, ColumnSchema, TableRef). |
src/loom/etl/storage |
Storage config, table/file locators, and route resolution. |
src/loom/etl/compiler |
Compile-time validation and execution plan builder. |
src/loom/etl/runner |
ETLRunner entry point and YAML config loader. |
src/loom/etl/checkpoint |
Step-level checkpoint store for incremental re-runs. |
src/loom/etl/observability |
Run/step observers, OTEL sink, structlog sink, and execution records. |
src/loom/etl/backends/polars |
Polars + Delta Lake reader, writer, and schema aligner. |
src/loom/etl/backends/spark |
Spark + Delta reader, writer, and schema aligner. |
src/loom/etl/testing |
PolarsStepRunner, SparkStepRunner, ETLScenario, and test stubs. |
from loom.core.model import ColumnField, OnDelete, TimestampedModel
class User(TimestampedModel):
__tablename__ = "users"
id: int = ColumnField(primary_key=True, autoincrement=True)
full_name: str = ColumnField(length=120)
email: str = ColumnField(length=255, unique=True, index=True)
class Address(TimestampedModel):
__tablename__ = "addresses"
id: int = ColumnField(primary_key=True, autoincrement=True)
user_id: int = ColumnField(foreign_key="users.id", on_delete=OnDelete.CASCADE, index=True)
city: str = ColumnField(length=120)
country: str = ColumnField(length=120)Use cases declare their inputs and invariants declaratively. The engine resolves them before execute() runs.
import re
from loom.core.command import Command, Patch
from loom.core.errors import NotFound
from loom.core.use_case import Exists, F, Input, LoadById, OnMissing, Rule
from loom.core.use_case.use_case import UseCase
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
class CreateUser(Command, frozen=True):
full_name: str
email: str
class UpdateUser(Command, frozen=True):
full_name: Patch[str] = None
email: Patch[str] = None
def _name_must_not_be_blank(full_name: str) -> str | None:
return None if full_name.strip() else "full_name must not be blank"
def _email_must_be_valid(email: str) -> str | None:
return None if _EMAIL_RE.fullmatch(email) else "email must be valid"
class CreateUserUseCase(UseCase[User, User]):
rules = [
Rule.check(F(CreateUser).full_name, via=_name_must_not_be_blank),
Rule.check(F(CreateUser).email, via=_email_must_be_valid),
Rule.forbid(lambda _, __, exists: exists, message="email already exists").from_params("email_exists"),
]
async def execute(
self,
cmd: CreateUser = Input(),
email_exists: bool = Exists(User, from_command="email", against="email"),
) -> User:
return await self.main_repo.create(cmd)
class UpdateUserUseCase(UseCase[User, User | None]):
rules = [Rule.check(F(UpdateUser).full_name, via=_name_must_not_be_blank).when_present(F(UpdateUser).full_name)]
async def execute(
self,
user_id: int,
cmd: UpdateUser = Input(),
current_user: User = LoadById(User, by="user_id"), # loaded automatically
) -> User | None:
return await self.main_repo.update(user_id, cmd)Exists checks a DB condition before execute runs — no boilerplate in the body.
LoadById fetches an entity by a path/command parameter, available in rules and the body.
Patch[T] marks a field as optional in partial updates; .when_present(...) gates rules on whether the field was sent.
Use from_param to guard nested routes (e.g. /users/{user_id}/addresses/{address_id}):
from loom.core.use_case import Exists, Input, OnMissing
class CreateAddressUseCase(UseCase[Address, Address]):
async def execute(
self,
user_id: int,
cmd: CreateUserAddress = Input(),
_user_exists: bool = Exists(User, from_param="user_id", against="id", on_missing=OnMissing.RAISE),
) -> Address:
return await self.main_repo.create(CreateAddressRecord(user_id=user_id, **cmd.__dict__))OnMissing.RAISE returns a structured 404 automatically — no if in the body.
Build explicit queries without raw SQL:
from loom.core.repository.abc.query import (
FilterGroup, FilterOp, FilterSpec, PageResult, PaginationMode, QuerySpec, SortSpec,
)
class ListLowStockProductsUseCase(UseCase[Product, PageResult[Product]]):
async def execute(self, profile: str = "default") -> PageResult[Product]:
query = QuerySpec(
filters=FilterGroup(filters=(FilterSpec(field="stock", op=FilterOp.LTE, value=5),)),
sort=(SortSpec(field="stock", direction="ASC"),),
pagination=PaginationMode.OFFSET,
limit=20,
page=1,
)
result = await self.main_repo.list_with_query(query, profile=profile)
if not isinstance(result, PageResult):
raise RuntimeError("expected offset result")
return resultJobs are use-case-like executors that run in a queue. LoadById works the same way:
from loom.core.job.job import Job
from loom.core.use_case import Input, LoadById
class SendRestockEmailJob(Job[bool]):
__queue__ = "notifications"
async def execute(
self,
product_id: int,
cmd: SendRestockEmailCommand = Input(),
product: Product = LoadById(Product, by="product_id"),
) -> bool:
if product.stock > 0:
return False
# send email to cmd.recipient_email ...
return Truefrom loom.core.job.service import JobService
from loom.core.use_case.use_case import UseCase
class DispatchRestockEmailUseCase(UseCase[Product, DispatchRestockEmailResponse]):
def __init__(self, job_service: JobService) -> None:
self._jobs = job_service
async def execute(self, product_id: str, cmd: DispatchRestockEmailCommand = Input()) -> DispatchRestockEmailResponse:
handle = self._jobs.dispatch(
SendRestockEmailJob,
params={"product_id": int(product_id)},
payload={"product_id": int(product_id), "recipient_email": cmd.recipient_email},
on_success=RestockEmailSuccessCallback,
on_failure=RestockEmailFailureCallback,
)
return DispatchRestockEmailResponse(job_id=handle.job_id, queue=handle.queue)Callbacks are resolved by the DI container and receive the job result + context:
class RestockEmailSuccessCallback:
def __init__(self, app: ApplicationInvoker) -> None:
self._app = app
async def on_success(self, job_id: str, result: Any, **context: Any) -> None:
if not result:
return
entity = self._app.entity(Product)
product = await entity.get(params={"id": context["product_id"]})
if product:
await entity.update(params={"id": product.id}, payload={"category": f"{product.category}-notified"})ApplicationInvoker lets a use case call another use case by type — no tight coupling:
from loom.core.use_case.invoker import ApplicationInvoker
class RestockWorkflowUseCase(UseCase[Product, RestockWorkflowResponse]):
def __init__(self, app: ApplicationInvoker, job_service: JobService) -> None:
self._app = app
self._jobs = job_service
async def execute(self, product_id: str, cmd: DispatchRestockEmailCommand = Input()) -> RestockWorkflowResponse:
summary = await self._app.invoke(BuildProductSummaryUseCase, params={"product_id": int(product_id)})
handle = self._jobs.dispatch(SendRestockEmailJob, params={"product_id": int(product_id)}, payload={...})
return RestockWorkflowResponse(summary=summary.summary, restock_job_id=handle.job_id, queue=handle.queue)from loom.rest.autocrud import build_auto_routes
from loom.rest.model import PaginationMode, RestInterface, RestRoute
class ProductRestInterface(RestInterface[Product]):
prefix = "/products"
tags = ("Products",)
pagination_mode = PaginationMode.CURSOR
routes = (
RestRoute(use_case=ListLowStockProductsUseCase, method="GET", path="/low-stock",
summary="List low stock products"),
RestRoute(use_case=DispatchRestockEmailUseCase, method="POST",
path="/{product_id}/jobs/restock-email", status_code=202,
summary="Dispatch restock email"),
RestRoute(use_case=RestockWorkflowUseCase, method="POST",
path="/{product_id}/workflows/restock", status_code=202,
summary="Run restock workflow"),
*build_auto_routes(Product, ()), # adds GET, POST, PATCH, DELETE automatically
)Nested resource interfaces work the same way — routes mirror the URL hierarchy:
class AddressRestInterface(RestInterface[Address]):
prefix = "/users"
tags = ("UserAddresses",)
routes = (
RestRoute(use_case=CreateAddressUseCase, method="POST", path="/{user_id}/addresses/", status_code=201),
RestRoute(use_case=ListAddressesUseCase, method="GET", path="/{user_id}/addresses/"),
RestRoute(use_case=GetAddressUseCase, method="GET", path="/{user_id}/addresses/{address_id}"),
RestRoute(use_case=UpdateAddressUseCase, method="PATCH", path="/{user_id}/addresses/{address_id}"),
RestRoute(use_case=DeleteAddressUseCase, method="DELETE", path="/{user_id}/addresses/{address_id}"),
)The create_app() factory wires everything — DB, cache, DI, routes — from a YAML config:
# config/api.yaml
app:
name: my_store
code_path: src
discovery:
mode: modules
modules:
include:
- app.user.model
- app.user.interface
- app.product.model
- app.product.interface
rest:
backend: fastapi
title: My Store API
version: 0.1.0
database:
url: ${oc.env:DATABASE_URL,sqlite+aiosqlite:///store.db}
metrics:
enabled: false# main.py — 3 lines
from loom.rest.fastapi.auto import create_app
app = create_app("config/api.yaml")For larger projects, use mode: manifest and a manifest module:
# app/manifest.py
from app.user.model import User
from app.user.interface import UserRestInterface
MODELS = [User, ...]
INTERFACES = [UserRestInterface, ...]discovery:
mode: manifest
manifest:
module: app.manifestFor compute-heavy write flows, declare field derivations and run them before rules:
from loom.core.use_case import Compute, F
class PricingPreviewUseCase(UseCase[Record, PricingPreviewResponse]):
computes = (
Compute.set(F(PricingCommand).normalized_email).from_command(
F(PricingCommand).email, via=lambda v: v.strip().lower(),
),
Compute.set(F(PricingCommand).subtotal).from_command(
F(PricingCommand).unit_price, F(PricingCommand).quantity,
via=lambda price, qty: price * qty,
),
Compute.set(F(PricingCommand).tax_amount).from_command(
F(PricingCommand).subtotal, F(PricingCommand).tax_rate,
via=lambda sub, rate: sub * rate,
),
)
rules = (
Rule.check(F(PricingCommand).unit_price, via=lambda v: v <= 0, message="unit_price must be > 0"),
Rule.check(F(PricingCommand).country, via=lambda v: v not in TAX_RATES, message="Unsupported country"),
)
async def execute(self, record_id: int, cmd: PricingCommand = Input()) -> PricingPreviewResponse:
...Computes run in declaration order — later computes can reference fields set by earlier ones.
For deeper references, review the integration examples under
tests/integration/fake_repo.
For a runnable full-stack sample with all patterns combined, check the companion repository
dummy-loom.
Install a backend:
pip install "loom-kernel[etl-polars]"
# or
pip install "loom-kernel[etl-spark]"Declare a pipeline — sources, targets, and transformation logic are explicit and compile-time validated:
from datetime import date
import polars as pl
from loom.etl import ETLParams, ETLStep, ETLProcess, ETLPipeline, ETLRunner, FromTable, IntoTable
class DailyParams(ETLParams):
run_date: date
class CleanOrders(ETLStep[DailyParams]):
orders = FromTable("raw.orders").columns("id", "amount", "run_date")
target = IntoTable("staging.orders").replace()
def execute(self, params: DailyParams, *, orders: pl.LazyFrame) -> pl.LazyFrame:
return orders.filter(pl.col("amount") > 0)
class DailyProcess(ETLProcess[DailyParams]):
steps = [CleanOrders]
class DailyPipeline(ETLPipeline[DailyParams]):
processes = [DailyProcess]
runner = ETLRunner.from_yaml("config/etl.yaml")
runner.run(DailyPipeline, DailyParams(run_date=date.today()))Write modes — replace, append, replace_partition, replace_partitions, replace_where, upsert — are declared on the target, validated at compile time. Partition predicates use the params proxy so no values are hard-coded:
from loom.etl import params
target = IntoTable("staging.orders").replace_partition(
year=params.run_date.year,
month=params.run_date.month,
)File aliases decouple paths from pipelines — declare once in YAML, reference by name:
events = FromFile.alias("events_raw", format=Format.CSV)
target = IntoFile.alias("exports_daily", format=Format.PARQUET)For a full Polars + Spark + Delta Lake example see dummy-loom-etl.
For the complete API reference and write-mode guide see the ETL docs.
loom-kernel adds zero measurable overhead at the concurrency levels typical of
production REST APIs. The benchmark below compares loompy (loom-kernel + SQLAlchemy)
against a hand-written FastAPI application hitting the same PostgreSQL database.
Methodology: 3 independent runs × 3 concurrency levels (20 / 100 / 300 workers), median RPS reported. Dataset: 1 200 records, 3 notes each. Infrastructure: each target runs in its own isolated Docker container with a dedicated PostgreSQL instance.
loompy vs hand-written FastAPI (median RPS, 3 repeats):
| Scenario | c=20 | c=100 | c=300 |
|---|---|---|---|
GET /:id with joins |
≈ tied | +8.9 % | −11.7 % |
LIST cursor |
−2.8 % | +3.8 % | −19.4 % |
LIST offset + COUNT(*) |
≈ tied | +6.2 % | −25.4 % |
PATCH (UPDATE RETURNING) |
+2.1 % | ≈ tied | +12.7 % |
GET /ping (no DB) |
≈ tied | −5.1 % | +16.5 % |
At the production sweet spot (moderate concurrency, c=100), loom-kernel matches or
outperforms the baseline in 4 out of 5 scenarios — without a single line of hand-tuned
SQL. The GET and LIST advantages come from the compiled single-pass SQL read path.
The PATCH advantage at high concurrency comes from the UPDATE … RETURNING pattern
(one round-trip vs three in a naive implementation).
The full benchmark suite is available in dummy-loom.
Project under active development.
Designed and built by Massive Data Scope.
For questions, feedback, or collaboration: massivedatascope@gmail.com
