Skip to content

Commit cb19234

Browse files
authored
Merge pull request #45 from tidesdb/updates100
column family commit hook implementation; updated pyproject
2 parents cdbb3e3 + b2ab2d8 commit cb19234

File tree

4 files changed

+271
-3
lines changed

4 files changed

+271
-3
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "tidesdb"
7-
version = "0.9.3"
7+
version = "0.9.4"
88
description = "Official Python bindings for TidesDB - A high-performance embedded key-value storage engine"
99
readme = "README.md"
1010
requires-python = ">=3.10"

src/tidesdb/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
default_column_family_config,
2626
save_config_to_ini,
2727
load_config_from_ini,
28+
CommitOp,
2829
COMPARATOR_FUNC,
30+
COMMIT_HOOK_FUNC,
2931
)
3032

3133
__version__ = "0.9.1"
@@ -47,5 +49,7 @@
4749
"default_column_family_config",
4850
"save_config_to_ini",
4951
"load_config_from_ini",
52+
"CommitOp",
5053
"COMPARATOR_FUNC",
54+
"COMMIT_HOOK_FUNC",
5155
]

src/tidesdb/tidesdb.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import os
2424
import sys
2525
from ctypes import (
26+
CFUNCTYPE,
2627
POINTER,
2728
Structure,
2829
c_char,
2930
c_char_p,
3031
c_double,
3132
c_float,
3233
c_int,
34+
c_int64,
3335
c_size_t,
3436
c_uint8,
3537
c_uint32,
@@ -198,9 +200,28 @@ class _CColumnFamilyConfig(Structure):
198200
("l1_file_count_trigger", c_int),
199201
("l0_queue_stall_threshold", c_int),
200202
("use_btree", c_int),
203+
("commit_hook_fn", c_void_p),
204+
("commit_hook_ctx", c_void_p),
201205
]
202206

203207

208+
class _CCommitOp(Structure):
209+
"""C structure for tidesdb_commit_op_t."""
210+
211+
_fields_ = [
212+
("key", POINTER(c_uint8)),
213+
("key_size", c_size_t),
214+
("value", POINTER(c_uint8)),
215+
("value_size", c_size_t),
216+
("ttl", c_int64),
217+
("is_delete", c_int),
218+
]
219+
220+
221+
# Commit hook callback: int (*)(const tidesdb_commit_op_t*, int, uint64_t, void*)
222+
COMMIT_HOOK_FUNC = CFUNCTYPE(c_int, POINTER(_CCommitOp), c_int, c_uint64, c_void_p)
223+
224+
204225
class _CConfig(Structure):
205226
"""C structure for tidesdb_config_t."""
206227

@@ -290,7 +311,7 @@ class _CCacheStats(Structure):
290311
c_size_t,
291312
POINTER(c_uint8),
292313
c_size_t,
293-
c_int,
314+
c_int64,
294315
]
295316
_lib.tidesdb_txn_put.restype = c_int
296317

@@ -413,6 +434,9 @@ class _CCacheStats(Structure):
413434
_lib.tidesdb_cf_config_load_from_ini.argtypes = [c_char_p, c_char_p, POINTER(_CColumnFamilyConfig)]
414435
_lib.tidesdb_cf_config_load_from_ini.restype = c_int
415436

437+
_lib.tidesdb_cf_set_commit_hook.argtypes = [c_void_p, COMMIT_HOOK_FUNC, c_void_p]
438+
_lib.tidesdb_cf_set_commit_hook.restype = c_int
439+
416440
# Comparator function type: int (*)(const uint8_t*, size_t, const uint8_t*, size_t, void*)
417441
COMPARATOR_FUNC = ctypes.CFUNCTYPE(c_int, POINTER(c_uint8), c_size_t, POINTER(c_uint8), c_size_t, c_void_p)
418442
DESTROY_FUNC = ctypes.CFUNCTYPE(None, c_void_p)
@@ -533,6 +557,16 @@ class CacheStats:
533557
num_partitions: int
534558

535559

560+
@dataclass
561+
class CommitOp:
562+
"""A single operation from a committed transaction batch."""
563+
564+
key: bytes
565+
value: bytes | None
566+
ttl: int
567+
is_delete: bool
568+
569+
536570
def default_config() -> Config:
537571
"""Get default database configuration."""
538572
return Config(db_path="")
@@ -798,6 +832,60 @@ def update_runtime_config(self, config: ColumnFamilyConfig, persist_to_disk: boo
798832
if result != TDB_SUCCESS:
799833
raise TidesDBError.from_code(result, "failed to update runtime config")
800834

835+
def set_commit_hook(self, callback: callable) -> None:
836+
"""
837+
Set a commit hook (change data capture) callback for this column family.
838+
839+
The callback fires synchronously after every transaction commit on this
840+
column family. It receives the full batch of committed operations
841+
atomically, enabling real-time change data capture.
842+
843+
The callback signature is:
844+
callback(ops: list[CommitOp], commit_seq: int) -> int
845+
846+
Return 0 from the callback on success. A non-zero return is logged as a
847+
warning but does not roll back the commit.
848+
849+
Args:
850+
callback: Python callable with the signature above
851+
"""
852+
def c_hook(ops_ptr, num_ops, commit_seq, ctx_ptr):
853+
ops = []
854+
for i in range(num_ops):
855+
c_op = ops_ptr[i]
856+
key = ctypes.string_at(c_op.key, c_op.key_size) if c_op.key and c_op.key_size > 0 else b""
857+
value = ctypes.string_at(c_op.value, c_op.value_size) if c_op.value and c_op.value_size > 0 else None
858+
ops.append(CommitOp(
859+
key=key,
860+
value=value,
861+
ttl=c_op.ttl,
862+
is_delete=bool(c_op.is_delete),
863+
))
864+
try:
865+
return callback(ops, commit_seq)
866+
except Exception:
867+
return -1
868+
869+
c_func = COMMIT_HOOK_FUNC(c_hook)
870+
871+
# Store reference to prevent garbage collection
872+
if not hasattr(self, "_commit_hook_ref"):
873+
self._commit_hook_ref = None
874+
self._commit_hook_ref = c_func
875+
876+
result = _lib.tidesdb_cf_set_commit_hook(self._cf, c_func, None)
877+
if result != TDB_SUCCESS:
878+
raise TidesDBError.from_code(result, "failed to set commit hook")
879+
880+
def clear_commit_hook(self) -> None:
881+
"""Disable the commit hook for this column family."""
882+
result = _lib.tidesdb_cf_set_commit_hook(self._cf, COMMIT_HOOK_FUNC(0), None)
883+
if result != TDB_SUCCESS:
884+
raise TidesDBError.from_code(result, "failed to clear commit hook")
885+
886+
if hasattr(self, "_commit_hook_ref"):
887+
self._commit_hook_ref = None
888+
801889
def range_cost(self, key_a: bytes, key_b: bytes) -> float:
802890
"""
803891
Estimate the computational cost of iterating between two keys.

tests/test_tidesdb.py

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def test_ttl_expiration(self, db, cf):
301301
txn.commit()
302302

303303
cf.flush_memtable()
304-
time.sleep(0.5)
304+
time.sleep(2)
305305

306306
with db.begin_txn() as txn:
307307
try:
@@ -711,5 +711,181 @@ def test_load_preserves_all_fields(self, temp_db_path):
711711
assert loaded.l0_queue_stall_threshold == original.l0_queue_stall_threshold
712712

713713

714+
class TestCommitHook:
715+
"""Tests for commit hook (change data capture) API."""
716+
717+
def test_hook_fires_on_commit(self, db, cf):
718+
"""Test that the commit hook fires when a transaction commits."""
719+
captured = []
720+
721+
def hook(ops, commit_seq):
722+
captured.append({"ops": ops, "seq": commit_seq})
723+
return 0
724+
725+
cf.set_commit_hook(hook)
726+
727+
with db.begin_txn() as txn:
728+
txn.put(cf, b"key1", b"value1")
729+
txn.commit()
730+
731+
assert len(captured) == 1
732+
assert len(captured[0]["ops"]) == 1
733+
assert captured[0]["ops"][0].key == b"key1"
734+
assert captured[0]["ops"][0].value == b"value1"
735+
assert captured[0]["ops"][0].is_delete is False
736+
assert captured[0]["seq"] > 0
737+
738+
cf.clear_commit_hook()
739+
740+
def test_hook_captures_deletes(self, db, cf):
741+
"""Test that the hook correctly reports delete operations."""
742+
captured = []
743+
744+
with db.begin_txn() as txn:
745+
txn.put(cf, b"del_key", b"del_val")
746+
txn.commit()
747+
748+
def hook(ops, commit_seq):
749+
captured.append(ops)
750+
return 0
751+
752+
cf.set_commit_hook(hook)
753+
754+
with db.begin_txn() as txn:
755+
txn.delete(cf, b"del_key")
756+
txn.commit()
757+
758+
assert len(captured) == 1
759+
delete_ops = [op for op in captured[0] if op.is_delete]
760+
assert len(delete_ops) >= 1
761+
assert delete_ops[0].key == b"del_key"
762+
assert delete_ops[0].value is None
763+
764+
cf.clear_commit_hook()
765+
766+
def test_hook_multi_op_batch(self, db, cf):
767+
"""Test that the hook receives all operations in a batch."""
768+
captured = []
769+
770+
def hook(ops, commit_seq):
771+
captured.append(ops)
772+
return 0
773+
774+
cf.set_commit_hook(hook)
775+
776+
with db.begin_txn() as txn:
777+
txn.put(cf, b"batch1", b"v1")
778+
txn.put(cf, b"batch2", b"v2")
779+
txn.put(cf, b"batch3", b"v3")
780+
txn.commit()
781+
782+
assert len(captured) == 1
783+
assert len(captured[0]) == 3
784+
keys = {op.key for op in captured[0]}
785+
assert keys == {b"batch1", b"batch2", b"batch3"}
786+
787+
cf.clear_commit_hook()
788+
789+
def test_hook_commit_seq_increases(self, db, cf):
790+
"""Test that commit_seq is monotonically increasing."""
791+
seqs = []
792+
793+
def hook(ops, commit_seq):
794+
seqs.append(commit_seq)
795+
return 0
796+
797+
cf.set_commit_hook(hook)
798+
799+
for i in range(3):
800+
with db.begin_txn() as txn:
801+
txn.put(cf, f"seq_key_{i}".encode(), f"seq_val_{i}".encode())
802+
txn.commit()
803+
804+
assert len(seqs) == 3
805+
assert seqs[0] < seqs[1] < seqs[2]
806+
807+
cf.clear_commit_hook()
808+
809+
def test_clear_hook_stops_firing(self, db, cf):
810+
"""Test that clearing the hook stops callbacks."""
811+
captured = []
812+
813+
def hook(ops, commit_seq):
814+
captured.append(ops)
815+
return 0
816+
817+
cf.set_commit_hook(hook)
818+
819+
with db.begin_txn() as txn:
820+
txn.put(cf, b"before_clear", b"v")
821+
txn.commit()
822+
823+
assert len(captured) == 1
824+
825+
cf.clear_commit_hook()
826+
827+
with db.begin_txn() as txn:
828+
txn.put(cf, b"after_clear", b"v")
829+
txn.commit()
830+
831+
# No new captures after clearing
832+
assert len(captured) == 1
833+
834+
def test_hook_failure_does_not_rollback(self, db, cf):
835+
"""Test that a hook returning non-zero does not affect the commit."""
836+
def failing_hook(ops, commit_seq):
837+
return -1 # Simulate failure
838+
839+
cf.set_commit_hook(failing_hook)
840+
841+
with db.begin_txn() as txn:
842+
txn.put(cf, b"survive", b"value")
843+
txn.commit()
844+
845+
cf.clear_commit_hook()
846+
847+
# Data should still be committed despite hook failure
848+
with db.begin_txn() as txn:
849+
assert txn.get(cf, b"survive") == b"value"
850+
851+
def test_hook_with_ttl(self, db, cf):
852+
"""Test that the hook reports TTL values."""
853+
captured = []
854+
855+
def hook(ops, commit_seq):
856+
captured.append(ops)
857+
return 0
858+
859+
cf.set_commit_hook(hook)
860+
861+
ttl_val = int(time.time()) + 3600 # 1 hour from now
862+
with db.begin_txn() as txn:
863+
txn.put(cf, b"ttl_key", b"ttl_val", ttl=ttl_val)
864+
txn.commit()
865+
866+
assert len(captured) == 1
867+
assert captured[0][0].ttl == ttl_val
868+
869+
cf.clear_commit_hook()
870+
871+
def test_hook_exception_handled(self, db, cf):
872+
"""Test that Python exceptions in the hook don't crash the process."""
873+
def crashing_hook(ops, commit_seq):
874+
raise RuntimeError("hook crashed!")
875+
876+
cf.set_commit_hook(crashing_hook)
877+
878+
# Should not raise - exception is caught internally
879+
with db.begin_txn() as txn:
880+
txn.put(cf, b"crash_key", b"crash_val")
881+
txn.commit()
882+
883+
cf.clear_commit_hook()
884+
885+
# Data should still be committed
886+
with db.begin_txn() as txn:
887+
assert txn.get(cf, b"crash_key") == b"crash_val"
888+
889+
714890
if __name__ == "__main__":
715891
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)