Last Updated: January 9, 2026
Status: Comprehensive Design Archive
LTSeq is a hybrid Python-Rust library for high-performance sequential data processing. It combines a Pythonic lambda-based DSL with the raw speed of Rust's DataFusion engine. The core philosophy is lazy evaluation with strict ordering guarantees, enabling complex time-series and sequential operations (shift, rolling, diff, cum_sum) that are difficult or slow in standard SQL/pandas.
This document consolidates design decisions, architectural patterns, and lessons learned throughout development.
- Core Architecture
- Expression System
- Relational Operations
- Sequence Operators
- Linking & Joins
- Optimization Strategy
- Grouping Operations
- API Design Patterns
- Lessons Learned
- Phase 10: Performance Optimization
- Python Layer: Handles API surface, DSL parsing, schema tracking, and high-level logic. Uses
_schemadict to track columns without querying the engine. - Rust Layer (py-ltseq): Exposes a
RustTableclass via PyO3. Wraps Apache DataFusion for execution. - Interaction: Python builds a logical plan; Rust executes it only when
.show(),.to_pandas(), or.count()is called.
- Operations like
filter(),select(),with_columns()return a newLTSeqinstance with an updated plan. - No data is processed until terminal actions.
- Benefit: Allows query optimization (predicate pushdown, projection pushdown) by DataFusion.
- Crucial Feature: Unlike standard SQL engines which treat tables as unordered sets, LTSeq respects order.
- Mechanism:
RustTablemaintains asort_exprsvector. - Invariant: Window functions (shift, rolling) require explicit sorting. If
.sort()hasn't been called, these operations raise an error.
- Problem: Python lambdas are opaque. We need to convert
lambda r: r.age > 18into a DataFusion logical expression. - Solution:
SchemaProxyobject intercepts attribute access (r.age). - Mechanism:
ris aSchemaProxy.r.agereturns aCallExprrepresenting column "age".r.age > 18returns aCallExprrepresenting "binary op >".
- Result: A serializable expression tree (AST) built at runtime.
- Instead of complex enum variants for every operation type, we use a generic
CallExpr. - Structure:
{ func: "gt", args: [col_expr, lit_expr] } - Benefit: Reduced boilerplate by ~40% compared to explicit expression classes.
- Expressions are serialized to JSON-compatible dictionaries before being passed to Rust.
- Format:
{"type": "Column", "name": "age"}or{"type": "Literal", "value": 18, "dtype": "Int64"}.
- Automatic inference for literals:
bool,int,float,string,null. - Rust side deserializes these into DataFusion
ScalarValuetypes.
- Constraint: Ascending only (in early phases), multi-column support.
- Requirement: Must specify at least one key.
- State: Updates the internal
sort_exprsstate, enabling sequence operations.
- Implementation: Hash-based deduplication using DataFusion's
LogicalPlan::Distinct. - Scope: Global by default (deduplicates across entire dataset).
- Implementation: Zero-copy logical operation mapping to SQL
LIMIT/OFFSET. - Performance: Extremely fast as it modifies the plan limit, not the data.
shift(),diff(),rolling(),cum_sum()fail hard if.sort()was not called previously.- Rationale: Sequential operations are undefined without order. Explicit is better than implicit.
- "Safe" operations (filter, derive, slice) preserve the
sort_exprsmetadata. - "Unsafe" operations (group by, join) might reset or invalidate sort requirements.
- shift(n): Introduces NULLs at boundaries (first
nrows). - diff():
val - shift(1). First row is NULL. - rolling(): Configurable
min_periods. Defaults to window size (result is NULL until window is full). - cum_sum(): Always has value (running total).
- Sequence ops are implemented via pattern matching on the
CallExprfunction string in the transpiler. - They translate to DataFusion Window Functions (e.g.,
LEAD/LAG,SUM() OVER (...)).
- Concept:
left.link(right, on=...)returns aLinkedTable. - State: Stores references to left/right tables and join keys. Does not execute join immediately.
- API: Allows chaining operations on the "virtual" joined result.
- Design Decision: When a user filters or selects a column belonging to the linked (right) table, the system automatically materializes the join.
- User Experience: "It just works".
linked.filter(lambda r: r.right_col > 5)triggers the join. - Return Types:
LinkedTable: No materialization happened (operations were on left table only).LTSeq: Materialization occurred (result is a flat table).
- Critical Issue: DataFusion joins fail or behave unpredictably with duplicate column names.
- Solution: ALL columns from the right table are renamed with unique temporary identifiers before joining.
- Restoration: After join, columns are aliased back to their expected names (or user-provided prefixes).
- Supported:
inner(default),left,right,full. - Defaults to
innerto match SQL intuition.
- Bug Fix: Fixed a critical bug where chaining multiple links caused schema mismatches.
- Solution: Ensure schema synchronization between Python
_schemaand RustArrowSchemaafter every materialization.
- join_merge: Moved join logic to Rust. 5-10x speedup.
- search_first: Implemented as
Limit(1)+Filterin Rust. 10-87x speedup (dataset dependent).
- pivot: Analysis showed Pandas pivot is sufficient for typical result set sizes (hundreds/thousands of rows). Kept in Python for now.
- Window Functions: Already optimized by DataFusion's execution planner. No custom Rust needed.
- Problem: Standard
GROUP BYdestroys order. We need "group by X, but keep rows ordered by time within groups". - Solution: Window Functions.
- Assign
__group_id__using dense rank or hashing. - Maintain sort order within partitions.
- Assign
group_ordered()returns aNestedTable.- Metadata: Tracks
__group_id__and__group_count__. - Operations:
aggregate()reduces to 1 row per group.filter()/derive()maintain group structure.
_group_assignmentsdictionary preserves original group IDs even after filtering.- Allows operations like "filter out first 2 rows of each group" while keeping group integrity.
- Fluent interface:
df.sort().filter().derive(). - Immutability: Each call returns a new instance; original is untouched.
- String Columns:
df.select("a", "b") - Lambda Expressions:
df.select(lambda r: r.a + r.b) - Mixed: Supported in
with_columns/derive.
- The type of object returned tells the user what happened:
LinkedTable: Virtual join.LTSeq: Physical table.NestedTable: Grouped context.
- Philosophy: Helpful & specific.
- Example: If column missing, list available columns.
- Example: If sort missing for window op, explain why sort is needed.
- Lesson: Never trust default name handling in joins.
- Fix: Aggressively rename everything on the right side of a join to UUIDs/temps, then alias back.
- Lesson:
inspect.getsource()is messy. It returns the whole line/block. - Fix: Use
astmodule to parse the source, then walk the tree to find the specific lambda corresponding to the argument.
- Strategy: Start narrow, expand later.
- Example:
cum_sumstarted as "column name only". Later expanded to "arbitrary expressions". This kept momentum high.
- Lesson:
ast.NameConstantis deprecated. - Fix: Migrated to
ast.Constantproactively to ensure future-proofing.
Key Finding: DataFusion and Arrow already handle low-level optimizations.
- SIMD: Arrow arrays use SIMD-optimized compute kernels automatically.
- Multi-threading: DataFusion parallelizes partition-level operations; Tokio runtime is multi-threaded.
- Batch processing: Default batch size (8192 rows) is cache-optimized.
Actual Bottleneck: The materialization pattern used for complex operations.
// This pattern appears in 15+ places:
let batches = df.collect().await?; // Full materialization
let temp = MemTable::try_new(schema, batches)?;
session.register_table("__temp", temp)?;
session.sql("SELECT ... FROM __temp").await?; // SQL executionOperations using this pattern: window functions, joins, group_by, pivot.
| Decision | Rationale |
|---|---|
| No custom SIMD code | Arrow/DataFusion already optimized; custom SIMD adds maintenance burden with minimal gain |
| No custom threading | DataFusion + Tokio handle parallelization; adding custom threading risks contention |
| Focus on configuration | SessionContext uses bare defaults; proper tuning gives easy wins |
| Focus on materialization | Reducing collect() → MemTable round-trips has highest impact |
| Smart defaults, no config API | 80% of users won't tune; add Python config API later if requested |
| Benchmark suite required | Can't optimize what we don't measure |
| # | Task | Description | Effort |
|---|---|---|---|
| 1 | Benchmark suite | 5-10 key benchmarks (filter, join, window, group, chain) | 0.5 day |
| 2 | DataFusion configuration | Configure SessionContext with batch_size, target_partitions, memory settings | 1 day |
| 3 | Reduce join materialization | Refactor join_impl to avoid double-collection where possible | 1-2 days |
| 4 | Reduce window materialization | Optimize derive_with_window_functions_impl to batch operations | 1 day |
| 5 | Expression optimization | Constant folding, predicate combining in transpiler | 1 day |
SessionContext (to be implemented in src/lib.rs):
let config = SessionConfig::new()
.with_target_partitions(num_cpus::get()) // Match CPU cores
.with_batch_size(8192) // DataFusion default, tunable
.with_information_schema(false); // Disable unused feature
let session = SessionContext::new_with_config(config);Tokio Runtime (already multi-threaded, minimal changes needed).
- Filter: 10K, 1M, 10M rows with simple and complex predicates
- Join: Small×Small, Large×Small, Large×Large
- Window: LAG/LEAD, running sum, rolling average
- Group: group_ordered + aggregate
- Chain: filter → derive → select (typical workflow)
As-of join is a specialized join operation for time-series data that matches each row from the left table with the "nearest" row from the right table based on a time/key column. This is commonly used in financial applications (e.g., matching trades with quotes).
def asof_join(
self,
other: LTSeq,
on: Callable,
direction: str = "backward",
is_sorted: bool = False
) -> LTSeqParameters:
other: Right table to join withon: Lambda with inequality condition, e.g.,lambda t, q: t.time >= q.timedirection: One of"backward"(default),"forward","nearest"is_sorted: IfTrue, trust that both tables are sorted by time column (skip verification/sorting)
Direction Semantics:
- backward: Find largest
right.timewhereright.time <= left.time(most recent quote before trade) - forward: Find smallest
right.timewhereright.time >= left.time(next quote after trade) - nearest: Find closest
right.time(backward bias on ties)
Example:
# Auto-sort (safe, default)
result = trades.asof_join(quotes, lambda t, q: t.time >= q.time)
# Skip sort verification (faster, requires pre-sorted data)
trades_sorted = trades.sort("time")
quotes_sorted = quotes.sort("time")
result = trades_sorted.asof_join(
quotes_sorted,
lambda t, q: t.time >= q.time,
is_sorted=True
)| Decision | Rationale |
|---|---|
| Pure Rust binary search | O(N log M) complexity; more efficient than SQL window function workarounds |
| NULL for unmatched rows | LEFT JOIN semantics - keep all left rows, NULL for missing right columns |
_other_ column prefix |
Consistent with join_merge() for right table column naming |
Optional is_sorted parameter |
Allow users who know their data is sorted to skip sort overhead |
Default is_sorted=False |
Safe default - auto-sort tables if not explicitly marked as sorted |
| Backward bias on ties | For direction="nearest" with equal distances, prefer earlier timestamp |
Inequality-based on condition |
Parse >= or <= from lambda to extract time columns (not equality like regular joins) |
Binary Search Implementation (Rust):
// For direction="backward": find largest index where right[i] <= left_time
fn find_asof_backward(left_time: i64, right_times: &[i64]) -> Option<usize> {
let idx = right_times.partition_point(|&t| t <= left_time);
if idx == 0 { None } else { Some(idx - 1) }
}
// For direction="forward": find smallest index where right[i] >= left_time
fn find_asof_forward(left_time: i64, right_times: &[i64]) -> Option<usize> {
let idx = right_times.partition_point(|&t| t < left_time);
if idx >= right_times.len() { None } else { Some(idx) }
}
// For direction="nearest": compute both, pick closer (backward bias on ties)
fn find_asof_nearest(left_time: i64, right_times: &[i64]) -> Option<usize> {
match (find_asof_backward(left_time, right_times),
find_asof_forward(left_time, right_times)) {
(None, None) => None,
(Some(b), None) => Some(b),
(None, Some(f)) => Some(f),
(Some(b), Some(f)) => {
let diff_back = left_time - right_times[b];
let diff_fwd = right_times[f] - left_time;
if diff_back <= diff_fwd { Some(b) } else { Some(f) }
}
}
}Complexity: O(N log M) where N = left rows, M = right rows
Unlike regular joins that use equality (==), asof_join uses inequality operators (>=, <=). The helper _extract_asof_keys() parses:
lambda t, q: t.time >= q.time # -> ("time", "time", "Gte")
lambda t, q: t.trade_time <= q.quote_time # -> ("trade_time", "quote_time", "Lte")- All columns from left table (unchanged)
- All columns from right table with
_other_prefix - For unmatched rows (no right match): right columns are NULL
DataFusion does not support:
ASOF JOINsyntax (like DuckDB)LATERAL JOIN(PostgreSQL-style)RANGEframe in window functions for this use case
Custom Rust implementation with binary search is the most efficient approach.
Sort order tracking is a metadata system that tracks which columns an LTSeq table is sorted by. This enables:
- Efficient
join_sorted()with validation - Smart auto-detection in
asof_join()(skip unnecessary sorting) - Clear user feedback about table state
class LTSeq:
_sort_keys: Optional[List[Tuple[str, bool]]] = None # [(col_name, is_desc), ...]None: Sort order unknown (unsorted or invalidated)[(col, False), ...]: Sorted ascending by columns in order[(col, True), ...]: Sorted descending
Property: sort_keys
@property
def sort_keys(self) -> Optional[List[Tuple[str, bool]]]:
"""Return current sort keys or None if unknown."""Method: is_sorted_by()
def is_sorted_by(self, *keys: str, desc: Union[bool, List[bool]] = False) -> bool:
"""
Check if table is sorted by specified keys using prefix matching.
Example:
t.sort("a", "b").is_sorted_by("a") # True (prefix match)
t.sort("a", "b").is_sorted_by("a", "b") # True (exact match)
t.sort("a", "b").is_sorted_by("b") # False (wrong order)
"""Method: join_sorted()
def join_sorted(self, other: LTSeq, on: Callable, how: str = "inner") -> LTSeq:
"""
Merge join with sort validation. Raises ValueError if either table
is not sorted by join keys.
"""| Operation | Sort Behavior | Rationale |
|---|---|---|
filter() |
Preserves | Row subset, order unchanged |
derive() |
Preserves | Adds columns, order unchanged |
slice() |
Preserves | Contiguous row subset |
select() |
Conditional | Preserves if sort key columns included |
cum_sum() |
Preserves | Adds columns, order unchanged |
sort() |
Sets new | Explicitly sets sort order |
distinct() |
Invalidates | Row reordering may occur |
union() |
Invalidates | Combines tables, no guaranteed order |
join() |
Invalidates | Hash join doesn't preserve order |
agg() |
Invalidates | Creates new rows |
pivot() |
Invalidates | Creates new structure |
| Decision | Rationale |
|---|---|
Prefix matching for is_sorted_by() |
t.sort("a", "b") is sorted by "a" (prefix), enabling flexible checks |
| Descending support | Both ascending and descending sorts are tracked; merge join works with either direction as long as both tables match |
Strict validation in join_sorted() |
Raises ValueError rather than silent fallback; users should know if their data isn't sorted |
| Lambda-only sort keys not tracked | Complex expressions like lambda r: r.a + r.b result in _sort_keys = None (can't reliably track computed sorts) |
| Direction matching required | Both tables in join_sorted() must have same direction (both ASC or both DESC) |
asof_join() now uses is_sorted_by() for smart auto-detection:
# Before: Always sorted when is_sorted=False
if not is_sorted:
left_table = self.sort(left_time_col)
right_table = other.sort(right_time_col)
# After: Check sort tracking, only sort if needed
if is_sorted:
pass # Trust user
else:
# Auto-detect using sort tracking
if not self.is_sorted_by(left_time_col):
left_table = self.sort(left_time_col)
if not other.is_sorted_by(right_time_col):
right_table = other.sort(right_time_col)Benefit: If user has already sorted the tables, asof_join() skips redundant sorting automatically.
# Basic sort tracking
t = LTSeq.read_csv("data.csv")
print(t.sort_keys) # None (unsorted)
t_sorted = t.sort("id", "date")
print(t_sorted.sort_keys) # [('id', False), ('date', False)]
# Prefix matching
t_sorted.is_sorted_by("id") # True
t_sorted.is_sorted_by("id", "date") # True
t_sorted.is_sorted_by("date") # False (wrong position)
# Sort-preserving operations
t_filtered = t_sorted.filter(lambda r: r.value > 100)
print(t_filtered.sort_keys) # [('id', False), ('date', False)] - preserved
# Sort-invalidating operations
t_distinct = t_sorted.distinct("id")
print(t_distinct.sort_keys) # None - invalidated
# join_sorted() with validation
users = LTSeq.read_csv("users.csv").sort("user_id")
orders = LTSeq.read_csv("orders.csv").sort("user_id")
result = users.join_sorted(orders, on=lambda u, o: u.user_id == o.user_id)
# This would raise ValueError:
# users_unsorted = LTSeq.read_csv("users.csv") # Not sorted
# result = users_unsorted.join_sorted(orders, on=lambda u, o: u.user_id == o.user_id)
# ValueError: Left table is not sorted by join keys: ['user_id']