Skip to content

Commit 0b0139a

Browse files
committed
Update trigger-lite pattern to track the maximum GSN in a separate
table. This can dramatically improve performance for large numbers of changes.
1 parent 7e21b38 commit 0b0139a

2 files changed

Lines changed: 43 additions & 22 deletions

File tree

patterns/trigger-lite/README.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ The trigger-lite pattern provides automatic Change Data Capture (CDC) for SQLite
66

77
The pattern consists of three main components:
88

9-
### 1. Changes Table
9+
### 1. Changes Tables
1010

11-
A central table that tracks all modifications across your application tables:
11+
A changes table that tracks all modifications across your application tables:
1212

1313
```sql
1414
CREATE TABLE changes (
@@ -19,7 +19,15 @@ CREATE TABLE changes (
1919
) WITHOUT ROWID;
2020
```
2121

22-
See `setup_changes_table()` in [pattern.py] for the implementation.
22+
And a table used to track the maximum global sequence number:
23+
24+
```sql
25+
CREATE TABLE IF NOT EXISTS changes_gsn (
26+
max_gsn INTEGER NOT NULL -- Maximum GSN
27+
)
28+
```
29+
30+
See `setup_changes_tables()` in [pattern.py] for the implementation.
2331

2432
### 2. Database Triggers
2533

@@ -37,7 +45,7 @@ See `setup_triggers()` in [pattern.py] for the implementation.
3745

3846
A context manager that atomically:
3947

40-
1. Captures the current GSN
48+
1. Captures the current max GSN
4149
2. Generates changesets by joining application tables with the changes table
4250
3. Returns typed operations (`UpsertOp` for inserts/updates, `DeleteOp` for deletes)
4351
4. Automatically cleans up processed changes
@@ -48,7 +56,7 @@ See `changeset()` in [pattern.py] for the implementation.
4856

4957
Periodically a Checkpoint should be created to allow the history of changesets to restart. This ensures that the history doesn't get too long, as well as serving as a backup should something go wrong.
5058

51-
Before Checkpointing the database, you must truncate the `changes` table. This effectively resets the change history allowing it to start from scratch.
59+
Before Checkpointing the database, you must truncate the `changes` table. This effectively resets the change history allowing it to start from scratch. You may choose to reset the max_gsn to 0 in `changes_gsn` table as well, although that is not needed. It is safer to never reset the max_gsn.
5260

5361
After running compaction and generating a Checkpoint, the database history is fully reset. The resulting Checkpoint needs to be sent to any replicas in entirely before replicating changes can resume.
5462

patterns/trigger-lite/pattern.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,17 @@ class DeleteOp:
5151
Changeset = Dict[str, List[ChangesetOp]]
5252

5353

54-
def setup_changes_table(conn: sqlite3.Connection) -> None:
54+
def setup_changes_tables(conn: sqlite3.Connection) -> None:
5555
"""
56-
Create the changes table for tracking modifications.
56+
Create the changes and changes_gsn tables for tracking modifications.
5757
5858
The changes table stores:
5959
- tid: unique identifier for the application table
6060
- rid: row identifier of the changed row in the application table
6161
- gsn: Global Sequence Number for ordering changes
62+
63+
The changes_gsn table stores:
64+
- max_gsn: The maximum GSN seen so far, incremented by each change
6265
"""
6366
conn.execute("""
6467
CREATE TABLE IF NOT EXISTS changes (
@@ -75,6 +78,13 @@ def setup_changes_table(conn: sqlite3.Connection) -> None:
7578
PRIMARY KEY (tid, rid)
7679
) WITHOUT ROWID
7780
""")
81+
conn.execute("""
82+
CREATE TABLE IF NOT EXISTS changes_gsn (
83+
-- Global Sequence Number (GSN) is a monotonically increasing number used to sequence changes.
84+
max_gsn INTEGER NOT NULL
85+
)
86+
""")
87+
conn.execute("INSERT INTO changes_gsn (max_gsn) VALUES (0)")
7888

7989

8090
def setup_triggers(conn: sqlite3.Connection, table_name: str, table_id: int) -> None:
@@ -97,8 +107,9 @@ def setup_triggers(conn: sqlite3.Connection, table_name: str, table_id: int) ->
97107
CREATE TRIGGER IF NOT EXISTS trg_{table_name}_insert
98108
AFTER INSERT ON {table_name}
99109
BEGIN
110+
UPDATE changes_gsn SET max_gsn = max_gsn + 1;
100111
INSERT INTO changes(tid, rid, gsn)
101-
VALUES ({table_id}, new.rowid, (SELECT IFNULL(MAX(gsn), 0) + 1 FROM changes))
112+
VALUES ({table_id}, new.rowid, (SELECT max_gsn FROM changes_gsn))
102113
ON CONFLICT DO UPDATE SET gsn = excluded.gsn;
103114
END
104115
"""
@@ -109,8 +120,9 @@ def setup_triggers(conn: sqlite3.Connection, table_name: str, table_id: int) ->
109120
CREATE TRIGGER IF NOT EXISTS trg_{table_name}_update
110121
AFTER UPDATE ON {table_name}
111122
BEGIN
123+
UPDATE changes_gsn SET max_gsn = max_gsn + 1;
112124
INSERT INTO changes(tid, rid, gsn)
113-
VALUES ({table_id}, new.rowid, (SELECT IFNULL(MAX(gsn), 0) + 1 FROM changes))
125+
VALUES ({table_id}, new.rowid, (SELECT max_gsn FROM changes_gsn))
114126
ON CONFLICT DO UPDATE SET gsn = excluded.gsn;
115127
END
116128
"""
@@ -121,8 +133,9 @@ def setup_triggers(conn: sqlite3.Connection, table_name: str, table_id: int) ->
121133
CREATE TRIGGER IF NOT EXISTS trg_{table_name}_delete
122134
AFTER DELETE ON {table_name}
123135
BEGIN
136+
UPDATE changes_gsn SET max_gsn = max_gsn + 1;
124137
INSERT INTO changes(tid, rid, gsn)
125-
VALUES ({table_id}, old.rowid, (SELECT IFNULL(MAX(gsn), 0) + 1 FROM changes))
138+
VALUES ({table_id}, old.rowid, (SELECT max_gsn FROM changes_gsn))
126139
ON CONFLICT DO UPDATE SET gsn = excluded.gsn;
127140
END
128141
"""
@@ -153,7 +166,7 @@ def changeset(conn: sqlite3.Connection, table_mapping: Dict[str, int]) -> Iterat
153166
# Generate changeset in read transaction
154167
with conn:
155168
# Snapshot the global sequence number
156-
cursor = conn.execute("SELECT IFNULL(MAX(gsn), 0) FROM changes")
169+
cursor = conn.execute("SELECT max_gsn FROM changes_gsn")
157170
snapshot_gsn = cursor.fetchone()[0]
158171

159172
for table_name, table_id in table_mapping.items():
@@ -383,8 +396,8 @@ def test_random_workload(
383396
sqlite3_test_db() as regular_conn,
384397
):
385398
# Set up changes table for writer and replica
386-
setup_changes_table(writer_conn)
387-
setup_changes_table(replica_conn)
399+
setup_changes_tables(writer_conn)
400+
setup_changes_tables(replica_conn)
388401

389402
# Set up tables
390403
writer_table = setup_workload_table(writer_conn, "WorkloadTable")
@@ -461,7 +474,7 @@ def run_example() -> None:
461474
"""
462475
with sqlite3_test_db() as conn:
463476
# Set up the changes tracking infrastructure
464-
setup_changes_table(conn)
477+
setup_changes_tables(conn)
465478

466479
# Create example tables and get table mapping
467480
table_mapping = create_example_tables(conn)
@@ -536,7 +549,7 @@ def test_pattern():
536549
def test_basic_changeset():
537550
"""Test basic changeset generation with mixed operations."""
538551
with sqlite3_test_db() as conn:
539-
setup_changes_table(conn)
552+
setup_changes_tables(conn)
540553

541554
# Create test table and define table mapping
542555
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
@@ -589,7 +602,7 @@ def test_basic_changeset():
589602
def test_multiple_tables():
590603
"""Test changeset generation across multiple tables."""
591604
with sqlite3_test_db() as conn:
592-
setup_changes_table(conn)
605+
setup_changes_tables(conn)
593606

594607
# Create multiple tables and define table mapping
595608
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
@@ -634,7 +647,7 @@ def test_multiple_tables():
634647
def test_changeset_cleanup():
635648
"""Test that changeset context manager cleans up processed changes."""
636649
with sqlite3_test_db() as conn:
637-
setup_changes_table(conn)
650+
setup_changes_tables(conn)
638651

639652
# Create test table and define table mapping
640653
conn.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, data TEXT)")
@@ -673,7 +686,7 @@ def test_changeset_cleanup():
673686
def test_explicit_rowid_table():
674687
"""Test changeset generation with explicit rowid tables."""
675688
with sqlite3_test_db() as conn:
676-
setup_changes_table(conn)
689+
setup_changes_tables(conn)
677690

678691
# Create table with explicit rowid and define table mapping
679692
conn.execute("""
@@ -730,8 +743,8 @@ def test_replication():
730743
# Create writer and replica databases
731744
with sqlite3_test_db() as writer_conn, sqlite3_test_db() as replica_conn:
732745
# Setup both databases with identical schema
733-
setup_changes_table(writer_conn)
734-
setup_changes_table(replica_conn)
746+
setup_changes_tables(writer_conn)
747+
setup_changes_tables(replica_conn)
735748

736749
writer_conn.execute("CREATE TABLE AppTable (id INTEGER PRIMARY KEY, data TEXT)")
737750
replica_conn.execute("CREATE TABLE AppTable (id INTEGER PRIMARY KEY, data TEXT)")
@@ -852,8 +865,8 @@ def test_foreign_key_cascade():
852865
replica_conn.execute("PRAGMA foreign_keys = ON")
853866

854867
# Setup both databases with identical schema
855-
setup_changes_table(writer_conn)
856-
setup_changes_table(replica_conn)
868+
setup_changes_tables(writer_conn)
869+
setup_changes_tables(replica_conn)
857870

858871
# Create parent and child tables with foreign key relationship
859872
schema_queries = [

0 commit comments

Comments
 (0)