Skip to content

Commit 4a5c56e

Browse files
committed
three-phase: read_all_latest and setup database flag
The read_all_latest function returns the latest version of every non-deleted row in a three-phase table. The setup database helper can be run as `uv run pattern.py --setup <path to db>` and initializes a new SQLite database at the provided path. The database contains a three-phase table + some sample data to quickly test out SQL queries.
1 parent 7a14318 commit 4a5c56e

1 file changed

Lines changed: 180 additions & 14 deletions

File tree

patterns/three-phase/pattern.py

Lines changed: 180 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99
the pattern and tests its correctness.
1010
"""
1111

12+
import argparse
1213
import random
1314
import sqlite3
1415
import sys
1516
import time
1617
from contextlib import contextmanager
1718
from dataclasses import dataclass
1819
from pathlib import Path
19-
from typing import Dict, Iterator, List, Literal, Union
20+
from typing import Dict, Iterator, List, Literal, Tuple, Union
2021

2122
import fossil_delta
2223

@@ -60,6 +61,20 @@ class DeleteOp:
6061
# Type alias for a changeset, mapping table names to lists of operations
6162
Changeset = Dict[str, List[ChangesetOp]]
6263

64+
SAMPLE_TABLE_NAME = "AppTable"
65+
SAMPLE_BASELINE_ROWS: List[Tuple[int, str]] = [
66+
(1, "order-100:status=pending"),
67+
(2, "order-200:status=pending"),
68+
(3, "order-300:status=paid"),
69+
(4, "order-400:status=pending"),
70+
(5, "order-500:status=cancelled"),
71+
]
72+
SAMPLE_PENDING_UPSERTS: List[Tuple[int, str]] = [
73+
(2, "order-200:status=shipped"),
74+
(6, "order-600:status=pending"),
75+
]
76+
SAMPLE_PENDING_DELETES: List[int] = [4]
77+
6378

6479
def setup_three_phase_table(conn: sqlite3.Connection, table_name: str) -> str:
6580
"""
@@ -128,6 +143,61 @@ def setup_regular_table(conn: sqlite3.Connection, table_name: str) -> str:
128143
return table_name
129144

130145

146+
def setup_sample_database(db_path: Path, table_name: str = SAMPLE_TABLE_NAME) -> Path:
147+
"""
148+
Create a new SQLite database with schema and sample three-phase data.
149+
150+
The database includes a phase-2 baseline plus a few pending phase-0 mutations
151+
so it is immediately useful for testing changeset generation.
152+
153+
Args:
154+
db_path: Filesystem path to create the database at
155+
table_name: Name of the sample table to create
156+
157+
Returns:
158+
The created database path.
159+
160+
Raises:
161+
FileExistsError: If the database path already exists
162+
FileNotFoundError: If the parent directory does not exist
163+
sqlite3.Error: If SQLite setup fails
164+
"""
165+
target_path = db_path.expanduser().resolve(strict=False)
166+
167+
if target_path.exists():
168+
raise FileExistsError(f"Database already exists: {target_path}")
169+
170+
if not target_path.parent.exists():
171+
raise FileNotFoundError(f"Parent directory does not exist: {target_path.parent}")
172+
173+
conn = sqlite3.connect(target_path)
174+
try:
175+
setup_three_phase_table(conn, table_name)
176+
177+
for row_id, data_value in SAMPLE_BASELINE_ROWS:
178+
insert_or_update(conn, table_name, row_id, data_value)
179+
180+
# Promote baseline rows to phase 2.
181+
with changeset(conn, table_name):
182+
pass
183+
184+
# Leave pending phase-0 writes to demonstrate a realistic in-flight state.
185+
for row_id, data_value in SAMPLE_PENDING_UPSERTS:
186+
insert_or_update(conn, table_name, row_id, data_value)
187+
188+
for row_id in SAMPLE_PENDING_DELETES:
189+
logical_delete(conn, table_name, row_id)
190+
191+
conn.commit()
192+
except Exception:
193+
conn.close()
194+
target_path.unlink(missing_ok=True)
195+
raise
196+
197+
conn.close()
198+
return target_path
199+
200+
131201
def insert_or_update(conn: sqlite3.Connection, table_name: str, row_id: int, data: str) -> None:
132202
"""
133203
Insert or update a row using upsert operation targeting phase 0.
@@ -202,6 +272,35 @@ def read_latest(conn: sqlite3.Connection, table_name: str, row_id: int) -> str |
202272
return row[0]
203273

204274

275+
def read_all_latest(conn: sqlite3.Connection, table_name: str) -> List[Tuple[str, str]]:
276+
"""
277+
Read the latest version of every row (lowest phase, not deleted).
278+
279+
Args:
280+
conn: SQLite database connection
281+
table_name: Name of the table
282+
283+
Returns:
284+
List of data values
285+
"""
286+
# This query takes advantage of how SQLite handles bare-columns in an
287+
# aggregate query to select the data and deleted columns corresponding to
288+
# the minimum phase for each unique id.
289+
# Documented here: https://www.sqlite.org/lang_select.html#bareagg
290+
sql = f"""
291+
SELECT id, data
292+
FROM (
293+
SELECT id, data, deleted, MIN(phase) AS min_phase
294+
FROM {table_name}
295+
GROUP BY id
296+
)
297+
WHERE deleted = 0
298+
"""
299+
cursor = conn.execute(sql)
300+
rows = cursor.fetchall()
301+
return rows
302+
303+
205304
@contextmanager
206305
def changeset(conn: sqlite3.Connection, table_name: str) -> Iterator[List[ChangesetOp]]:
207306
"""
@@ -526,7 +625,7 @@ def run_example() -> None:
526625
"""
527626
with sqlite3_test_db() as conn:
528627
# Create example table
529-
table_name = setup_three_phase_table(conn, "AppTable")
628+
table_name = setup_three_phase_table(conn, SAMPLE_TABLE_NAME)
530629

531630
# Insert initial data
532631
initial_data = [
@@ -597,7 +696,7 @@ def test_pattern():
597696
def test_basic_operations():
598697
"""Test basic insert, update, and delete operations."""
599698
with sqlite3_test_db() as conn:
600-
table_name = setup_three_phase_table(conn, "AppTable")
699+
table_name = setup_three_phase_table(conn, SAMPLE_TABLE_NAME)
601700

602701
# Test insert
603702
insert_or_update(conn, table_name, 1, "test data")
@@ -616,10 +715,45 @@ def test_basic_operations():
616715

617716
print("✓ Basic operations test passed")
618717

718+
def test_read_all_latest():
719+
"""Test read_all_latest."""
720+
with sqlite3_test_db() as conn:
721+
table_name = setup_three_phase_table(conn, SAMPLE_TABLE_NAME)
722+
723+
# insert two rows
724+
insert_or_update(conn, table_name, 1, "bob")
725+
insert_or_update(conn, table_name, 2, "alice")
726+
727+
rows = read_all_latest(conn, table_name)
728+
assert len(rows) == 2, f"Expected 2 rows, got {len(rows)}"
729+
assert rows[0] == (1, "bob"), f"Expected (1, 'bob'), got {rows[0]}"
730+
assert rows[1] == (2, "alice"), f"Expected (2, 'alice'), got {rows[1]}"
731+
732+
# compact them to phase = 2
733+
with changeset(conn, table_name):
734+
pass
735+
736+
rows = read_all_latest(conn, table_name)
737+
assert len(rows) == 2, f"Expected 2 rows, got {len(rows)}"
738+
assert rows[0] == (1, "bob"), f"Expected (1, 'bob'), got {rows[0]}"
739+
assert rows[1] == (2, "alice"), f"Expected (2, 'alice'), got {rows[1]}"
740+
741+
# delete row 1, update row 2, insert row 3
742+
logical_delete(conn, table_name, 1)
743+
insert_or_update(conn, table_name, 2, "alice++")
744+
insert_or_update(conn, table_name, 3, "jones")
745+
746+
rows = read_all_latest(conn, table_name)
747+
assert len(rows) == 2, f"Expected 2 rows, got {len(rows)}"
748+
assert rows[0] == (2, "alice++"), f"Expected (2, 'alice++'), got {rows[0]}"
749+
assert rows[1] == (3, "jones"), f"Expected (3, 'jones'), got {rows[1]}"
750+
751+
print("✓ read all latest test passed")
752+
619753
def test_changeset_generation():
620754
"""Test changeset generation with mixed operations."""
621755
with sqlite3_test_db() as conn:
622-
table_name = setup_three_phase_table(conn, "AppTable")
756+
table_name = setup_three_phase_table(conn, SAMPLE_TABLE_NAME)
623757

624758
# Insert initial data
625759
insert_or_update(conn, table_name, 1, "data1")
@@ -665,8 +799,8 @@ def test_replication():
665799
"""Test replication between writer and replica using changesets and compaction."""
666800
# Create writer and replica databases
667801
with sqlite3_test_db() as writer_conn, sqlite3_test_db() as replica_conn:
668-
writer_table = setup_three_phase_table(writer_conn, "AppTable")
669-
replica_table = setup_three_phase_table(replica_conn, "AppTable")
802+
writer_table = setup_three_phase_table(writer_conn, SAMPLE_TABLE_NAME)
803+
replica_table = setup_three_phase_table(replica_conn, SAMPLE_TABLE_NAME)
670804

671805
# === CHECKPOINT 1: Initial data ===
672806
print(" Checkpoint 1: Initial data")
@@ -765,7 +899,7 @@ def test_replication():
765899
def test_phase_isolation():
766900
"""Test that concurrent writes don't interfere with changeset generation."""
767901
with sqlite3_test_db() as conn:
768-
table_name = setup_three_phase_table(conn, "AppTable")
902+
table_name = setup_three_phase_table(conn, SAMPLE_TABLE_NAME)
769903

770904
# Insert initial data
771905
insert_or_update(conn, table_name, 1, "data1")
@@ -801,7 +935,7 @@ def test_phase_isolation():
801935
def test_compact():
802936
"""Test table compaction functionality."""
803937
with sqlite3_test_db() as conn:
804-
table_name = setup_three_phase_table(conn, "AppTable")
938+
table_name = setup_three_phase_table(conn, SAMPLE_TABLE_NAME)
805939

806940
# Insert initial data
807941
insert_or_update(conn, table_name, 1, "data1")
@@ -848,8 +982,8 @@ def test_compact():
848982
def test_crash_safety():
849983
"""Test that changes are not lost when application crashes during changeset generation."""
850984
with sqlite3_test_db() as writer_conn, sqlite3_test_db() as replica_conn:
851-
writer_table = setup_three_phase_table(writer_conn, "AppTable")
852-
replica_table = setup_three_phase_table(replica_conn, "AppTable")
985+
writer_table = setup_three_phase_table(writer_conn, SAMPLE_TABLE_NAME)
986+
replica_table = setup_three_phase_table(replica_conn, SAMPLE_TABLE_NAME)
853987

854988
# === SETUP: Initial data ===
855989
print(" Setup: Creating baseline data")
@@ -981,6 +1115,7 @@ def test_crash_safety():
9811115
# Run all tests
9821116
print("Running three-phase CDC pattern tests...")
9831117
test_basic_operations()
1118+
test_read_all_latest()
9841119
test_changeset_generation()
9851120
test_replication()
9861121
test_phase_isolation()
@@ -994,11 +1129,42 @@ def test_crash_safety():
9941129
print("✅ All tests passed!")
9951130

9961131

997-
if __name__ == "__main__":
998-
# Run tests first
1132+
def parse_args(argv: List[str]) -> argparse.Namespace:
1133+
"""Parse command-line arguments."""
1134+
parser = argparse.ArgumentParser(description="Three-phase CDC pattern demo and tests")
1135+
parser.add_argument(
1136+
"--setup",
1137+
type=Path,
1138+
metavar="DB_PATH",
1139+
help=(
1140+
"Create a sample SQLite database at DB_PATH with schema and seed data. "
1141+
"Fails if DB_PATH already exists."
1142+
),
1143+
)
1144+
return parser.parse_args(argv)
1145+
1146+
1147+
def main(argv: List[str] | None = None) -> int:
1148+
"""Main entrypoint for CLI behavior."""
1149+
args = parse_args(sys.argv[1:] if argv is None else argv)
1150+
1151+
if args.setup is not None:
1152+
try:
1153+
created_path = setup_sample_database(args.setup)
1154+
except (FileExistsError, FileNotFoundError, sqlite3.Error, OSError) as exc:
1155+
print(f"Error: {exc}", file=sys.stderr)
1156+
return 1
1157+
1158+
print(f"Sample database created at {created_path}")
1159+
return 0
1160+
1161+
# Default behavior: run full test suite and example.
9991162
test_pattern()
10001163
print("\n" + "=" * 50 + "\n")
1001-
1002-
# Then run example
10031164
print("Running example demonstration...")
10041165
run_example()
1166+
return 0
1167+
1168+
1169+
if __name__ == "__main__":
1170+
raise SystemExit(main())

0 commit comments

Comments
 (0)