Package:
pyfly.data.relational.sqlalchemyCommons:pyfly.data— shared ports, pagination, query parsing, entity mappingThis guide covers the SQLAlchemy adapter for relational databases. For generic data concepts shared across all adapters (repository ports,
Page/Pageable/Sort,QueryMethodParser,Mapper, extensibility), see the Data Module Guide. For document databases, see the Data Document Guide.Hexagonal by design: your services depend on
RepositoryPort[T, ID](the port), never onRepository[T, ID](the adapter). SQLAlchemy is the default relational adapter today — but the layer is designed so any relational backend (Tortoise ORM, Django ORM, etc.) can be added by implementing the same ports.
PyFly Data Relational implements the Repository pattern with Spring Data-style derived query methods, composable specifications, pagination, entity mapping, and declarative transaction management — backed by SQLAlchemy's async ORM.
- Architecture Overview
- Entity Definition
- Repository Pattern
- Derived Query Methods
- Custom Queries with @query
- Specifications
- FilterOperator
- FilterUtils: Query by Example
- Pagination
- Transaction Management
- Run Migrations on Startup (Flyway-Style)
- Read/Write Routing (Read Replicas)
- Multiple Named Datasources
- Data Auditing
- RepositoryBeanPostProcessor
- QueryMethodCompiler
- Complete CRUD Example
- See Also
All concrete types live in the SQLAlchemy adapter package. The namespace pyfly.data.relational is a pass-through and does not re-export anything.
from pyfly.data.relational.sqlalchemy import (
Base, BaseEntity, # SQLAlchemy entity base classes
Repository, # Repository[T, ID] implementation
Specification, # Composable query predicates
FilterOperator, FilterUtils, # Query-by-example utilities
QueryExecutor, query, # Custom @query decorator
QueryMethodCompiler, # Derived query → SQLAlchemy compiler
RepositoryBeanPostProcessor, # Auto-wires query methods
reactive_transactional, # Declarative transaction management
)Note: Always import concrete types from
pyfly.data.relational.sqlalchemy. Commons types (Page,Pageable,RepositoryPort, etc.) are imported frompyfly.data— see the Data Module Guide.
PyFly exports a pre-configured SQLAlchemy DeclarativeBase:
from pyfly.data.relational.sqlalchemy import BaseUse Base directly when you need SQLAlchemy entities without the built-in audit trail fields.
BaseEntity extends Base and provides a UUID primary key plus four audit trail columns. All domain entities should inherit from this class:
from pyfly.data.relational.sqlalchemy import BaseEntityInherited fields:
| Field | Type | Column Type | Description |
|---|---|---|---|
id |
Mapped[UUID] |
Primary key | Auto-generated UUID v4 |
created_at |
Mapped[datetime] |
DateTime(tz=True) |
Set automatically on insert |
updated_at |
Mapped[datetime] |
DateTime(tz=True) |
Set on insert, updated on every save |
created_by |
Mapped[str|None] |
String(255) |
Creator identifier (default None) |
updated_by |
Mapped[str|None] |
String(255) |
Updater identifier (default None) |
BaseEntity is declared with __abstract__ = True, so it does not create its own database table.
Extend BaseEntity and declare your domain columns:
from pyfly.data.relational.sqlalchemy import BaseEntity
from sqlalchemy import String, Float, Boolean
from sqlalchemy.orm import Mapped, mapped_column
class Order(BaseEntity):
__tablename__ = "orders"
customer_id: Mapped[str] = mapped_column(String(255))
status: Mapped[str] = mapped_column(String(50), default="PENDING")
total: Mapped[float] = mapped_column(Float, default=0.0)
active: Mapped[bool] = mapped_column(Boolean, default=True)This entity will have all five inherited fields (id, created_at, updated_at, created_by, updated_by) plus your four custom columns.
The Repository[T, ID] class provides generic async CRUD operations for any SQLAlchemy model. The two type parameters are:
- T — The entity type (any SQLAlchemy model, including
BaseEntitysubclasses or plainBasesubclasses) - ID — The primary key type (e.g.
UUID,int,str)
When you subclass Repository[T, ID] with concrete type parameters, the framework automatically extracts the entity type and ID type via __init_subclass__. The AsyncSession is injected by the DI container from the auto-configured async_sessionmaker. No explicit __init__ is needed:
from uuid import UUID
from pyfly.data.relational.sqlalchemy import Repository
from pyfly.container import repository as repo_stereotype
@repo_stereotype
class OrderRepository(Repository[Order, UUID]):
pass
# Usage (session is injected by the container):
order = await repo.save(Order(customer_id="abc", status="PENDING"))
found = await repo.find_by_id(order.id)Repository[T, ID] satisfies the RepositoryPort[T, ID] protocol, enabling hexagonal architecture where your service layer depends on the port, not the adapter. The port hierarchy is CrudRepository[T, ID] -> ReactiveSortingRepository[T, ID] -> PagingAndSortingRepository[T, ID] (mirroring Spring Data WebFlux's ReactiveCrudRepository -> ReactiveSortingRepository + paging), and RepositoryPort is an alias of CrudRepository.
Subclass Repository[T, ID] with concrete type parameters and register it with the @repository stereotype:
from uuid import UUID
from pyfly.data.relational.sqlalchemy import Repository
from pyfly.container import repository as repo_stereotype
@repo_stereotype
class OrderRepository(Repository[Order, UUID]):
passFor entities with integer primary keys:
@repo_stereotype
class ProductRepository(Repository[Product, int]):
passHow it works:
__init_subclass__inspects__orig_bases__to extract the entity type (Order) and ID type (UUID) from the generic parameters at class definition time.- The
AsyncSessionis provided as an auto-configured bean byRelationalAutoConfigurationand injected by the container into the repository's constructor. - The entity type is used internally for all query operations — no need to pass it manually.
| Method | Return Type | Description |
|---|---|---|
save(entity) |
T |
Insert or update; flushes and refreshes |
find_by_id(id: ID) |
T | None |
Find by primary key |
find_all(**filters) |
list[T] |
Find all, optionally filtered by column values |
find_all(sort: Sort) |
list[T] |
Fetch all, applying the Sort order |
find_all(pageable: Pageable) |
Page[T] |
Paginated query: counts total, applies sort, slices |
find_all_by_id(ids) |
list[T] |
Find all entities whose IDs are in ids |
stream_all(criteria: Sort | None, **filters) |
AsyncIterator[T] |
Stream all (the Flux[T] analogue); optional Sort |
delete(entity: T) |
None |
Delete the given entity |
delete_by_id(id: ID) |
None |
Delete by primary key (no-op if not found) |
delete_all(entities=None) |
None |
Delete the given entities; with no args, truncate all |
delete_all_by_id(ids) |
None |
Delete all entities whose IDs are in ids |
count() |
int |
Count all entities in the table |
exists_by_id(id: ID) |
bool |
Check if an entity with this ID exists |
find_all_by_spec(spec) |
list[T] |
Find all matching a Specification |
find_all_by_spec_paged(spec, pageable) |
Page[T] |
Paginated query with Specification + sorting |
save() calls session.add(), then session.flush() and session.refresh() to ensure the returned entity has all database-generated values (ID, defaults, etc.).
find_all() accepts keyword arguments that are translated into equality filters:
orders = await repo.find_all(status="PENDING", customer_id="abc")
# Equivalent to: SELECT * FROM orders WHERE status = 'PENDING' AND customer_id = 'abc'delete_by_id() looks up the entity first and deletes it if found. If not found, it is a no-op. delete(entity) removes the given entity directly. The delete_all(entities) form deletes each given entity, while delete_all() with no arguments truncates the whole table; both return None. delete_all_by_id(ids) deletes every entity whose ID is in ids.
find_all(pageable) counts the total, applies the Pageable's sort, slices with LIMIT/OFFSET, and returns a Page[T]. find_all(sort) fetches every row in the given Sort order, and stream_all(criteria=Sort.by(...)) yields entities one at a time as an AsyncIterator[T] (the Flux[T] analogue):
from pyfly.data import Sort
async for order in repo.stream_all(Sort.by("name")):
process(order)PyFly automatically generates query implementations from method names using the Spring Data naming convention. You define stub methods on your repository and the RepositoryBeanPostProcessor compiles them into real SQLAlchemy queries at startup.
For the full naming convention reference (prefixes, operators, connectors, ordering), see the Data Module Guide — Derived Query Methods.
@repo_stereotype
class OrderRepository(Repository[Order, UUID]):
# Equals (default operator)
async def find_by_status(self, status: str) -> list[Order]: ...
# Multiple conditions with AND
async def find_by_customer_id_and_status(
self, customer_id: str, status: str
) -> list[Order]: ...
# Greater than
async def find_by_total_greater_than(self, min_total: float) -> list[Order]: ...
# Between (takes 2 arguments)
async def find_by_total_between(self, low: float, high: float) -> list[Order]: ...
# LIKE pattern
async def find_by_customer_id_like(self, pattern: str) -> list[Order]: ...
# Contains (wraps value in %)
async def find_by_customer_id_containing(self, fragment: str) -> list[Order]: ...
# IN a list
async def find_by_status_in(self, statuses: list[str]) -> list[Order]: ...
# IS NULL / IS NOT NULL (zero arguments consumed)
async def find_by_deleted_at_is_null(self) -> list[Order]: ...
async def find_by_email_is_not_null(self) -> list[User]: ...
# COUNT prefix
async def count_by_status(self, status: str) -> int: ...
# EXISTS prefix
async def exists_by_customer_id(self, customer_id: str) -> bool: ...
# DELETE prefix (returns number of rows deleted)
async def delete_by_status(self, status: str) -> int: ...
# With ordering
async def find_by_status_order_by_created_at_desc(
self, status: str
) -> list[Order]: ...
# Complex: AND + ordering
async def find_by_status_and_customer_id_order_by_total_desc(
self, status: str, customer_id: str
) -> list[Order]: ...Each method body should be a stub (... or pass). The RepositoryBeanPostProcessor detects them and replaces them with real implementations at startup.
For complex queries that cannot be expressed through method naming conventions, use the @query decorator:
from pyfly.data.relational.sqlalchemy import queryBy default, @query accepts a JPQL-like query string that is transpiled to SQL at startup:
@repo_stereotype
class OrderRepository(Repository[Order, UUID]):
@query("SELECT o FROM Order o WHERE o.status = :status AND o.total > :min_total")
async def find_expensive_orders(
self, status: str, min_total: float
) -> list[Order]: ...
@query("SELECT COUNT(o) FROM Order o WHERE o.role = :role")
async def count_by_role(self, role: str) -> int: ...Named parameters (:param_name) are bound from the method's keyword arguments.
Set native=True for raw SQL queries:
@query("SELECT * FROM orders WHERE status = :status", native=True)
async def find_by_status_native(self, status: str) -> list[Order]: ...The QueryExecutor infers the return type from the query shape:
| Query Pattern | Return Type |
|---|---|
SELECT COUNT(...) |
int |
Query containing EXISTS |
bool |
All other SELECT queries |
list[entity] |
The lightweight JPQL-to-SQL transpiler performs these transformations:
FROM Entity aliasbecomesFROM <tablename>(alias is removed)SELECT aliasbecomesSELECT *COUNT(alias)becomesCOUNT(*)alias.fieldreferences become justfield(alias prefix stripped)- Boolean literals
= true/= falsebecome= 1/= 0
Example transpilation:
JPQL: SELECT u FROM User u WHERE u.email LIKE :pattern AND u.active = true
SQL: SELECT * FROM users WHERE email LIKE :pattern AND active = 1
Specifications provide composable, type-safe query predicates inspired by Spring Data's Specification pattern. They let you build arbitrarily complex WHERE clauses from small, reusable building blocks.
Commons port: The SQLAlchemy
Specification[T]subclasses the genericSpecification[T, Q]ABC frompyfly.data.specification. This means SQLAlchemy specifications are polymorphic with the commons port — code that acceptspyfly.data.Specificationwill work with the SQLAlchemy adapter. See the Specification Port section in the Data Commons guide.
A Specification[T] wraps a callable that takes an entity class (root) and a SQLAlchemy Select statement, and returns a modified Select:
from pyfly.data.relational.sqlalchemy import Specification
# Inline specification
active = Specification(lambda root, q: q.where(root.active == True))
admin = Specification(lambda root, q: q.where(root.role == "admin"))Specifications support Python's standard operators for composition:
# AND: both conditions must match
active_admins = active & admin
# OR: either condition may match
active_or_admin = active | admin
# NOT: negate a specification
inactive = ~active
# Complex combinations with parentheses
complex_spec = (active & admin) | ~adminHow combination works internally:
&(AND): Chains the two predicates sequentially. SQLAlchemy naturally combines successive.where()calls with AND.|(OR): Applies each predicate independently, extracts thewhereclausefrom each, and combines them usingsqlalchemy.or_().~(NOT): Applies the predicate, extracts thewhereclause, and wraps it withsqlalchemy.not_().
# Find all matching a specification
orders = await repo.find_all_by_spec(active & admin)
# Find with pagination
from pyfly.data import Pageable, Sort
pageable = Pageable.of(page=1, size=20, sort=Sort.by("created_at").descending())
page = await repo.find_all_by_spec_paged(active & admin, pageable)FilterOperator provides a library of static factory methods for creating common Specification predicates without writing lambdas.
| Method | SQL Equivalent | Arguments |
|---|---|---|
eq(field, value) |
field = value |
field, value |
neq(field, value) |
field != value |
field, value |
gt(field, value) |
field > value |
field, value |
gte(field, value) |
field >= value |
field, value |
lt(field, value) |
field < value |
field, value |
lte(field, value) |
field <= value |
field, value |
like(field, pattern) |
field LIKE pattern |
field, pattern |
contains(field, value) |
field LIKE '%value%' |
field, value |
in_list(field, values) |
field IN (values) |
field, list |
is_null(field) |
field IS NULL |
field |
is_not_null(field) |
field IS NOT NULL |
field |
between(field, low, high) |
field BETWEEN low AND high |
field, low, high |
Every FilterOperator method returns a Specification, so they can be composed with &, |, and ~:
from pyfly.data.relational.sqlalchemy import FilterOperator
# Adults between 18 and 65
age_filter = FilterOperator.gte("age", 18) & FilterOperator.lt("age", 65)
# Active users with a verified email
user_filter = FilterOperator.eq("active", True) & FilterOperator.is_not_null("email_verified_at")
# Premium or VIP customers
tier_filter = FilterOperator.in_list("tier", ["PREMIUM", "VIP"])
# Combine everything
final_spec = age_filter & user_filter & tier_filter
results = await repo.find_all_by_spec(final_spec)Commons port:
FilterUtilsextends theBaseFilterUtilsABC frompyfly.data.filter. Theby(),from_dict(), andfrom_example()algorithms are inherited from the base class —FilterUtilsonly implements the adapter-specific hooks_create_eq()and_create_noop(). See the BaseFilterUtils Port section in the Data Commons guide.
FilterUtils generates Specification objects from various input formats, providing a Pythonic take on Spring Data's Query by Example pattern.
from pyfly.data.relational.sqlalchemy import FilterUtils
# From keyword arguments (all eq, ANDed together)
spec = FilterUtils.by(name="Alice", active=True)
results = await repo.find_all_by_spec(spec)
# From a dictionary (None values are automatically skipped)
filters = {"role": "admin", "name": None, "active": True}
spec = FilterUtils.from_dict(filters)
# Produces: role = 'admin' AND active = True (name is skipped)
# From an example object (dataclass or plain object)
# Non-None fields become equality predicates
from dataclasses import dataclass
@dataclass
class UserFilter:
role: str | None = None
active: bool | None = None
example = UserFilter(role="admin")
spec = FilterUtils.from_example(example)
# Produces: role = 'admin' (active is None, so skipped)FilterUtils methods:
| Method | Input | Behavior |
|---|---|---|
by(**kwargs) |
Keyword arguments | All eq, ANDed together |
from_dict(filters) |
dict[str, Any] |
All eq, ANDed; None values skipped |
from_example(example) |
Dataclass or object | Non-None fields become eq predicates |
For the full Pageable, Sort, Order, and Page[T] API reference, see the Data Module Guide — Pagination & Sorting.
from pyfly.data import Pageable, Sort
# Basic pagination
page = await repo.find_all(Pageable.of(page=1, size=20))
# With Pageable (page, size and sorting)
pageable = Pageable.of(page=2, size=10, sort=Sort.by("name"))
page = await repo.find_all(pageable)find_all(pageable) counts the total, applies the Pageable's sort, slices with LIMIT/OFFSET, and returns a Page[T]. Pageable is 1-based, so page=1 is the first page.
spec = FilterOperator.eq("status", "ACTIVE")
pageable = Pageable.of(page=1, size=20, sort=Sort.by("created_at").descending())
page = await repo.find_all_by_spec_paged(spec, pageable)
# Returns Page[Order] with filtered, sorted, paginated resultsThe implementation:
- Applies the specification's predicate to get the filtered query.
- Counts total matching rows via a subquery.
- Applies sort orders from
Pageable.sort. - Applies
offsetandlimitfor pagination.
The @reactive_transactional decorator provides declarative async transaction management:
from pyfly.data.relational.sqlalchemy import reactive_transactional
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
session_factory: async_sessionmaker[AsyncSession] = ...
@reactive_transactional(session_factory)
async def create_order(session: AsyncSession, customer_id: str) -> Order:
order = Order(customer_id=customer_id, status="PENDING")
session.add(order)
return order
# Transaction is automatically committed on success
# Transaction is automatically rolled back on exceptionHow it works:
- Opens a new
AsyncSessionfrom thesession_factory. - Begins a transaction with
session.begin(). - Calls the wrapped function with the session as the first argument.
- On success: the transaction is committed (via the
async withcontext manager). - On exception: the transaction is rolled back and the exception is re-raised.
The decorated function's original arguments are passed after the injected session:
@reactive_transactional(session_factory)
async def transfer_funds(session: AsyncSession, from_id: str, to_id: str, amount: float):
# session is injected; from_id, to_id, amount are passed through
...
# Call it without the session argument:
await transfer_funds("acc-1", "acc-2", 100.0)By default schema migrations are applied with the pyfly db CLI commands. PyFly can also apply them automatically on application startup — the equivalent of Spring Boot's Flyway/Liquibase auto-migrate. This is opt-in and reuses the existing Alembic environment created by pyfly db init; the CLI commands keep working exactly as before.
Enable it with pyfly.data.relational.migrations.enabled:
pyfly:
data:
relational:
url: postgresql+asyncpg://user:pass@primary:5432/app
migrations:
enabled: true # apply `alembic upgrade head` on startup
config: alembic.ini # path to the Alembic config (default: alembic.ini)
revision: head # target revision (default: head)When enabled, MigrationAutoConfiguration registers a MigrationRunner bean. MigrationRunner implements the start() / stop() lifecycle, so the ApplicationContext auto-discovers it as an infrastructure adapter and calls start() once during startup. On start() it runs alembic upgrade <revision> against the same datasource the app uses (it forwards pyfly.data.relational.url into Alembic's sqlalchemy.url, so there is a single source of truth for the connection string).
The upgrade runs in a worker thread (asyncio.to_thread) because the generated async alembic/env.py calls asyncio.run internally, which must not be nested inside the running event loop.
If the Alembic config file is not found, startup migration is skipped with a warning (rather than failing) telling you to run pyfly db init first:
pyfly.data.relational.migrations.enabled is true but alembic.ini was not found —
run 'pyfly db init' to create the Alembic environment; skipping migrations.
| Config key | Default | Description |
|---|---|---|
pyfly.data.relational.migrations.enabled |
false (absent) |
Apply migrations on startup when true. |
pyfly.data.relational.migrations.config |
alembic.ini |
Path to the Alembic config file. |
pyfly.data.relational.migrations.revision |
head |
Target revision passed to alembic upgrade. |
Migrations vs.
ddl-auto: startup migrations are independent of theengine_lifecycleddl-autoschema strategy. For an Alembic-managed database, setpyfly.data.relational.ddl-auto: noneso the engine does not also create tables fromBase.metadata, and let migrations own the schema.
Source: src/pyfly/data/relational/migrations.py (MigrationRunner) · src/pyfly/data/relational/auto_configuration.py (MigrationAutoConfiguration)
PyFly can route read-only work to a database read replica while keeping writes on the primary — the equivalent of Spring's AbstractRoutingDataSource driven by @Transactional(readOnly = true). Routing is opt-in: with no replica configured, every session goes to the primary, so behavior is unchanged for existing apps.
from pyfly.data.relational.routing import RoutingSessionFactory, read_only, is_read_onlySet the replica URL under pyfly.data.relational.read-replica.url. RelationalAutoConfiguration then builds a separate engine + async_sessionmaker for the replica and wires it into the routing_session_factory bean:
pyfly:
data:
relational:
url: postgresql+asyncpg://user:pass@primary:5432/app
read-replica:
url: postgresql+asyncpg://user:pass@replica:5432/appWhen read-replica.url is absent, routing_session_factory is still registered but has no replica — it always returns a primary session.
RoutingSessionFactory is a drop-in replacement for an async_sessionmaker call site: calling the factory (factory()) returns an AsyncSession, routed by context.
| Member | Returns | Description |
|---|---|---|
factory() (__call__) |
AsyncSession |
Routes by context: the replica when inside a read_only() block and a replica is configured; otherwise the primary. |
factory.primary() |
AsyncSession |
Forces a primary (read/write) session regardless of context. |
factory.replica() |
AsyncSession |
Forces a replica session; falls back to the primary when none is configured. |
factory.has_replica |
bool |
Whether a replica session maker is configured. |
The read_only() context manager marks the enclosed block read-only so the factory routes to the replica (the @Transactional(readOnly = true) analogue). It is backed by a ContextVar, so it is safe across async/await and supports nesting — the prior value is restored on exit. is_read_only() reports whether the current context is marked read-only.
from pyfly.container import service
from pyfly.data.relational.routing import RoutingSessionFactory, read_only
from sqlalchemy import select
@service
class UserService:
def __init__(self, sessions: RoutingSessionFactory) -> None:
self._sessions = sessions
async def list_users(self) -> list[User]:
with read_only(): # routes to the replica when one is configured
session = self._sessions() # AsyncSession bound to the replica
result = await session.execute(select(User))
return list(result.scalars())
async def create_user(self, name: str) -> User:
session = self._sessions() # no read_only() -> primary (read/write)
user = User(name=name)
session.add(user)
await session.commit()
return userOutside any read_only() block, factory() always returns a primary session. Inside one, it returns a replica session only if a replica is configured; otherwise it falls back to the primary, so the same code runs unchanged in environments without a replica.
Source: src/pyfly/data/relational/routing.py · bean: RelationalAutoConfiguration.routing_session_factory
In addition to the primary datasource, PyFly can configure any number of secondary datasources — the equivalent of Spring declaring multiple DataSource beans. Each named datasource gets its own engine and async_sessionmaker, kept separate from the primary's dedicated beans.
Declare each one under pyfly.data.relational.datasources.<name>; only url is required (echo is optional and defaults to false):
pyfly:
data:
relational:
url: postgresql+asyncpg://user:pass@primary:5432/app # primary (unchanged)
datasources:
reporting:
url: postgresql+asyncpg://user:pass@reporting:5432/reports
echo: false
analytics:
url: postgresql+asyncpg://user:pass@analytics:5432/warehouseRelationalAutoConfiguration builds a NamedDataSources registry bean from this config. Inject it and call .get("<name>") to retrieve that datasource's async_sessionmaker:
from pyfly.container import service
from pyfly.data.relational.named_datasources import NamedDataSources
from sqlalchemy import text
@service
class ReportingService:
def __init__(self, datasources: NamedDataSources) -> None:
# async_sessionmaker for the "reporting" datasource
self._reporting = datasources.get("reporting")
async def daily_total(self) -> int:
async with self._reporting() as session: # AsyncSession on the reporting DB
result = await session.execute(text("SELECT COUNT(*) FROM orders"))
return int(result.scalar_one())| Member | Returns | Description |
|---|---|---|
get(name) |
async_sessionmaker[AsyncSession] |
Session factory for name; raises KeyError if unknown. |
names() |
list[str] |
Sorted names of all configured secondary datasources. |
dispose() |
None (await) |
Disposes every secondary engine — call on shutdown. |
name in datasources |
bool |
Whether a datasource is configured (__contains__). |
len(datasources) |
int |
Number of configured secondary datasources. |
The primary datasource keeps its own async_session_factory / async_session beans and is not part of this registry. When no datasources are configured, the bean is still registered but empty (len(...) == 0), so existing apps are unaffected.
Source: src/pyfly/data/relational/named_datasources.py · bean: RelationalAutoConfiguration.named_data_sources (v26.06.48)
PyFly provides automatic entity auditing through the AuditingEntityListener. It auto-populates the created_at, updated_at, created_by, and updated_by fields on BaseEntity subclasses via SQLAlchemy ORM events, so you never need to set these fields manually.
from pyfly.data.relational.sqlalchemy.auditing import AuditingEntityListenerThe AuditingEntityListener registers SQLAlchemy before_insert and before_update event listeners on BaseEntity. Because the listeners use propagate=True, they automatically apply to all subclasses of BaseEntity.
| Event | Fields Set | Behavior |
|---|---|---|
before_insert |
created_at, updated_at, created_by, updated_by |
Sets both timestamps to datetime.now(UTC). Sets both user fields to the current authenticated user (if available). |
before_update |
updated_at, updated_by |
Sets updated_at to datetime.now(UTC). Sets updated_by to the current authenticated user (if available). |
Example:
from pyfly.data.relational.sqlalchemy import BaseEntity
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
class Order(BaseEntity):
__tablename__ = "orders"
customer_id: Mapped[str] = mapped_column(String(255))
status: Mapped[str] = mapped_column(String(50))
# When you save a new Order, the audit fields are populated automatically:
order = Order(customer_id="abc", status="PENDING")
saved = await repo.save(order)
# saved.created_at = 2026-02-20T10:30:00+00:00
# saved.updated_at = 2026-02-20T10:30:00+00:00
# saved.created_by = "user-123" (from SecurityContext)
# saved.updated_by = "user-123"
# On subsequent updates, only updated_at and updated_by change:
saved.status = "SHIPPED"
updated = await repo.save(saved)
# updated.created_at = 2026-02-20T10:30:00+00:00 (unchanged)
# updated.updated_at = 2026-02-20T10:35:00+00:00 (new timestamp)
# updated.created_by = "user-123" (unchanged)
# updated.updated_by = "admin-456" (new user)The AuditingEntityListener resolves the current authenticated user from the RequestContext:
- Calls
RequestContext.current()to get the current request context. - If a
RequestContextis available, reads thesecurity_contextattribute. - If the
SecurityContextis authenticated (is_authenticatedisTrue), usesuser_idas the value forcreated_by/updated_by. - If there is no
RequestContextor the user is not authenticated, the user fields are left unchanged (they remainNonefor new entities).
This means auditing works transparently in HTTP request handlers (where the SecurityMiddleware or SecurityFilter populates the SecurityContext) and degrades gracefully in background tasks or CLI scripts where no request context is available.
The AuditingEntityListener must be registered once at application startup. Call register() to attach the ORM event listeners:
from pyfly.data.relational.sqlalchemy.auditing import AuditingEntityListener
listener = AuditingEntityListener()
listener.register()When using auto-configuration, the listener is registered automatically by RelationalAutoConfiguration when pyfly.data.relational.enabled is true. No manual registration is needed in that case.
You can also register it in a custom @configuration class:
from pyfly.container import configuration, bean
from pyfly.data.relational.sqlalchemy.auditing import AuditingEntityListener
@configuration
class DataConfig:
@bean
def auditing_listener(self) -> AuditingEntityListener:
listener = AuditingEntityListener()
listener.register()
return listenerSource: src/pyfly/data/relational/sqlalchemy/auditing.py
The @transactional decorator provides Spring-style declarative transaction management with propagation and isolation support.
from pyfly.data.relational.sqlalchemy import transactional, Propagation, Isolation
@service
class OrderService:
_session_factory: async_sessionmaker # injected by DI
@transactional()
async def create_order(self, order: Order) -> Order:
return await self.repo.save(order)
@transactional(propagation=Propagation.REQUIRES_NEW)
async def audit_log(self, message: str) -> None:
# Always opens a new transaction, even if called from within another
...
@transactional(isolation=Isolation.SERIALIZABLE, read_only=True)
async def generate_report(self) -> Report:
...| Propagation | Behavior |
|---|---|
REQUIRED (default) |
Join existing transaction or start new |
REQUIRES_NEW |
Always start a new transaction, suspending existing |
SUPPORTS |
Run within transaction if one exists, or without |
NOT_SUPPORTED |
Suspend any existing transaction |
MANDATORY |
Require existing transaction, raise if none |
NEVER |
Raise if a transaction exists |
DEFAULT, READ_UNCOMMITTED, READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE
The decorator resolves async_sessionmaker from self._session_factory and automatically patches Repository instances on the service with the transaction-scoped session.
PyFly provides opt-in mixins for soft delete and optimistic locking, mirroring JPA's @SoftDelete and @Version annotations.
from pyfly.data.relational.sqlalchemy import BaseEntity, SoftDeleteMixin
class Order(BaseEntity, SoftDeleteMixin):
__tablename__ = "orders"
name: Mapped[str] = mapped_column(String(255))This adds a deleted_at column. Use SoftDeleteRepository for automatic soft-delete-aware CRUD:
from pyfly.data.relational.sqlalchemy import SoftDeleteRepository
class OrderRepository(SoftDeleteRepository[Order, UUID]):
pass # delete_by_id()/delete() set deleted_at, find methods exclude deleted entities| Method | Behavior |
|---|---|
delete_by_id(id) |
Sets deleted_at (soft delete); returns None |
delete(entity) |
Sets deleted_at on the given entity (soft delete); returns None |
find_by_id(id) |
Excludes soft-deleted entities |
find_all() |
Excludes soft-deleted entities |
find_all(pageable) |
Excludes soft-deleted entities; counts total, applies the Pageable's sort, slices with LIMIT/OFFSET, returns Page[T] |
find_all_by_id(ids) |
Excludes soft-deleted entities |
find_all_by_spec(spec) |
Applies spec predicate AND excludes soft-deleted entities |
find_all_by_spec_paged(spec, pageable) |
Applies spec predicate AND excludes soft-deleted entities |
find_all_including_deleted() |
Includes soft-deleted entities |
restore(id) |
Clears deleted_at |
hard_delete(id) |
Permanently removes from DB |
count() |
Counts only non-deleted entities |
from pyfly.data.relational.sqlalchemy import BaseEntity, VersionedMixin
class Order(BaseEntity, VersionedMixin):
__tablename__ = "orders"
name: Mapped[str] = mapped_column(String(255))This adds a version column. SQLAlchemy automatically appends WHERE version = :old to every UPDATE and raises StaleDataError on concurrent modification — the equivalent of JPA's @Version.
The RepositoryBeanPostProcessor is a BeanPostProcessor that runs after each repository bean is initialized. It scans the repository class for stub methods and replaces them with real query implementations.
The after_init(bean, bean_name) method:
- Checks if the bean is an instance of
Repository. If not, it is returned unchanged. - Gets the entity type from
bean._model. - Iterates over all attributes defined on the bean's class (not inherited from
Repository). - For
@query-decorated methods: compiles them viaQueryExecutor.compile_query_method()and replaces the stub with a wrapper that injectsbean._session. - For derived query methods (
find_by_*,count_by_*,exists_by_*,delete_by_*): checks if the method is a stub, parses the method name viaQueryMethodParser.parse(), compiles it viaQueryMethodCompiler.compile(), and replaces the stub with a wrapper.
A method is considered a stub when its code object contains no meaningful constants beyond None and Ellipsis. This covers both forms:
async def find_by_status(self, status: str) -> list[Order]: ... # Ellipsis stub
async def find_by_status(self, status: str) -> list[Order]: pass # Pass stubRegister the post-processor in your application context:
from pyfly.data.relational.sqlalchemy import RepositoryBeanPostProcessor
context.register_post_processor(RepositoryBeanPostProcessor())The SQLAlchemy QueryMethodCompiler implements the QueryMethodCompilerPort protocol. It takes ParsedQuery objects produced by the shared QueryMethodParser and compiles them into SQLAlchemy column expressions.
| Prefix | Generated Query Pattern |
|---|---|
find_by |
SELECT entity WHERE ... ORDER BY ... |
count_by |
SELECT COUNT(*) FROM entity WHERE ... |
exists_by |
SELECT COUNT(*) FROM entity WHERE ... > 0 |
delete_by |
DELETE FROM entity WHERE ... (returns rowcount) |
The compiler builds SQLAlchemy column expressions from each FieldPredicate, combines them using the connectors, and applies ORDER BY clauses from OrderClause objects.
The following example demonstrates an entity, repository with derived queries, specifications, pagination, and mapping.
# --- Entity ---
from pyfly.data.relational.sqlalchemy import BaseEntity
from sqlalchemy import String, Float, Boolean
from sqlalchemy.orm import Mapped, mapped_column
class Product(BaseEntity):
__tablename__ = "products"
name: Mapped[str] = mapped_column(String(255))
price: Mapped[float] = mapped_column(Float)
category: Mapped[str] = mapped_column(String(100))
active: Mapped[bool] = mapped_column(Boolean, default=True)
# --- Repository ---
from pyfly.data.relational.sqlalchemy import Repository, query
from pyfly.container import repository as repo_stereotype
from sqlalchemy.ext.asyncio import AsyncSession
@repo_stereotype
class ProductRepository(Repository[Product, UUID]):
# Derived query methods (stubs, auto-compiled at startup)
async def find_by_category(self, category: str) -> list[Product]: ...
async def find_by_active_and_category(
self, active: bool, category: str
) -> list[Product]: ...
async def find_by_price_greater_than_order_by_price_desc(
self, min_price: float
) -> list[Product]: ...
async def find_by_name_containing(self, fragment: str) -> list[Product]: ...
async def count_by_category(self, category: str) -> int: ...
async def exists_by_name(self, name: str) -> bool: ...
async def delete_by_active(self, active: bool) -> int: ...
# Custom query method
@query("SELECT p FROM Product p WHERE p.category = :category AND p.price > :min_price")
async def find_expensive_in_category(
self, category: str, min_price: float
) -> list[Product]: ...
# --- Service ---
from pyfly.container import service
from pyfly.data import Pageable, Sort, Page, Mapper
from pyfly.data.relational.sqlalchemy import FilterOperator, FilterUtils, Specification
from dataclasses import dataclass
@dataclass
class ProductDTO:
id: str
name: str
price: float
category: str
@service
class ProductService:
def __init__(self, repo: ProductRepository) -> None:
self._repo = repo
self._mapper = Mapper()
async def find_all_active(self, category: str | None = None) -> list[ProductDTO]:
spec: Specification = FilterOperator.eq("active", True)
if category:
spec = spec & FilterOperator.eq("category", category)
products = await self._repo.find_all_by_spec(spec)
return self._mapper.map_list(products, ProductDTO)
async def find_paginated(
self,
page: int = 1,
size: int = 20,
category: str | None = None,
) -> Page[ProductDTO]:
spec = FilterOperator.eq("active", True)
if category:
spec = spec & FilterOperator.eq("category", category)
pageable = Pageable.of(
page=page,
size=size,
sort=Sort.by("name"),
)
result = await self._repo.find_all_by_spec_paged(spec, pageable)
return result.map(lambda p: self._mapper.map(p, ProductDTO))
async def search_by_name(self, query: str) -> list[ProductDTO]:
products = await self._repo.find_by_name_containing(query)
return self._mapper.map_list(products, ProductDTO)
async def find_by_id(self, product_id: str) -> ProductDTO | None:
from uuid import UUID
product = await self._repo.find_by_id(UUID(product_id))
if product is None:
return None
return self._mapper.map(product, ProductDTO)
async def create(self, name: str, price: float, category: str) -> ProductDTO:
product = Product(name=name, price=price, category=category)
saved = await self._repo.save(product)
return self._mapper.map(saved, ProductDTO)
async def delete(self, product_id: str) -> None:
from uuid import UUID
await self._repo.delete_by_id(UUID(product_id))
async def count_in_category(self, category: str) -> int:
return await self._repo.count_by_category(category)- Data Module Guide — Generic commons: repository ports, pagination, query parsing, entity mapping, extensibility
- Data Document Guide — MongoDB adapter
- SQLAlchemy Adapter Reference — Setup, configuration, adapter-specific features