From aee296dc35ff02f1004850dc45caf3139be5b4fc Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 18 Mar 2026 10:11:45 +0100 Subject: [PATCH 1/2] Redesign event log with proper separation of concerns - Add EventLog class owning all event_log table access (write + fetch) - EventLog.write() accepts domain events directly - EventLog.fetch_batch() reconstructs and returns domain events - EventLogSubscriber delegates to EventLog - EventLogConsumer owns reader_state, uses EventLog for events - Simplify tests to 4 focused cases --- ingestify/infra/event_log/__init__.py | 3 +- ingestify/infra/event_log/consumer.py | 48 ++---- ingestify/infra/event_log/event_log.py | 78 +++++++++ ingestify/infra/event_log/subscriber.py | 36 ++--- ingestify/tests/test_event_log.py | 205 ++++++------------------ 5 files changed, 158 insertions(+), 212 deletions(-) create mode 100644 ingestify/infra/event_log/event_log.py diff --git a/ingestify/infra/event_log/__init__.py b/ingestify/infra/event_log/__init__.py index 76b9acc..e8a491b 100644 --- a/ingestify/infra/event_log/__init__.py +++ b/ingestify/infra/event_log/__init__.py @@ -1,4 +1,5 @@ from .consumer import EventLogConsumer +from .event_log import EventLog from .subscriber import EventLogSubscriber -__all__ = ["EventLogConsumer", "EventLogSubscriber"] +__all__ = ["EventLog", "EventLogConsumer", "EventLogSubscriber"] diff --git a/ingestify/infra/event_log/consumer.py b/ingestify/infra/event_log/consumer.py index b7d54ec..fd6220c 100644 --- a/ingestify/infra/event_log/consumer.py +++ b/ingestify/infra/event_log/consumer.py @@ -1,10 +1,10 @@ -import json import logging import time from typing import Callable, Optional from sqlalchemy import create_engine, select +from .event_log import EventLog from .tables import get_tables logger = logging.getLogger(__name__) @@ -14,10 +14,10 @@ class EventLogConsumer: """Cursor-based consumer for the event_log table. Usage (run once, e.g. cron): - EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event) + EventLogConsumer.from_config("ingestify.yaml", reader_name="default").run(on_event) Usage (keep running, poll every 5 seconds): - EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event, poll_interval=5) + EventLogConsumer.from_config("ingestify.yaml", reader_name="default").run(on_event, poll_interval=5) Exit codes (returned by run): 0 Batch processed successfully (or nothing new). @@ -25,13 +25,13 @@ class EventLogConsumer: """ def __init__(self, database_url: str, reader_name: str, table_prefix: str = ""): + engine = create_engine(database_url) + self._event_log = EventLog(engine, table_prefix) self._reader_name = reader_name + self._engine = engine tables = get_tables(table_prefix) - self._metadata = tables["metadata"] - self._event_log_table = tables["event_log_table"] self._reader_state_table = tables["reader_state_table"] - self._engine = create_engine(database_url) - self._metadata.create_all(self._engine, checkfirst=True) + self._reader_state_table.create(engine, checkfirst=True) @classmethod def from_config(cls, config_file: str, reader_name: str) -> "EventLogConsumer": @@ -39,10 +39,9 @@ def from_config(cls, config_file: str, reader_name: str) -> "EventLogConsumer": config = parse_config(config_file, default_value="") main = config["main"] - database_url = main["metadata_url"] table_prefix = main.get("metadata_options", {}).get("table_prefix", "") return cls( - database_url=database_url, + database_url=main["metadata_url"], reader_name=reader_name, table_prefix=table_prefix, ) @@ -70,18 +69,6 @@ def _get_last_event_id(self, conn) -> int: ).fetchone() return row[0] if row else 0 - def _fetch_batch(self, conn, last_event_id: int, batch_size: int) -> list: - return conn.execute( - select( - self._event_log_table.c.id, - self._event_log_table.c.event_type, - self._event_log_table.c.payload_json, - ) - .where(self._event_log_table.c.id > last_event_id) - .order_by(self._event_log_table.c.id) - .limit(batch_size) - ).fetchall() - def _update_cursor(self, conn, event_id: int) -> None: conn.execute( self._reader_state_table.update() @@ -94,27 +81,22 @@ def _run_once(self, on_event: Callable, batch_size: int = 100) -> int: with self._engine.connect() as conn: self._ensure_reader_state(conn) last_id = self._get_last_event_id(conn) - rows = self._fetch_batch(conn, last_id, batch_size) - if not rows: - return 0 + rows = self._event_log.fetch_batch(last_id, batch_size) + if not rows: + return 0 - for event_id, event_type, payload_json in rows: + with self._engine.connect() as conn: + for event_id, event in rows: try: - payload = ( - payload_json - if isinstance(payload_json, dict) - else json.loads(payload_json) - ) - on_event(event_type, payload) + on_event(event) except Exception: logger.exception( "Failed to process event id=%d type=%r — cursor NOT advanced", event_id, - event_type, + type(event).event_type, ) return 1 - self._update_cursor(conn, event_id) return 0 diff --git a/ingestify/infra/event_log/event_log.py b/ingestify/infra/event_log/event_log.py new file mode 100644 index 0000000..52cee24 --- /dev/null +++ b/ingestify/infra/event_log/event_log.py @@ -0,0 +1,78 @@ +import json +import logging + +from sqlalchemy import select + +from ingestify.domain.models.dataset.dataset import Dataset +from ingestify.domain.models.dataset.events import ( + DatasetCreated, + MetadataUpdated, + RevisionAdded, +) +from ingestify.domain.models.event.domain_event import DomainEvent +from ingestify.utils import utcnow + +from .tables import get_tables + +logger = logging.getLogger(__name__) + +_EVENT_TYPE_MAP = { + "dataset_created": DatasetCreated, + "revision_added": RevisionAdded, + "metadata_updated": MetadataUpdated, +} + + +class EventLog: + def __init__(self, engine, table_prefix: str = ""): + tables = get_tables(table_prefix) + self._engine = engine + self._table = tables["event_log_table"] + self._table.create(engine, checkfirst=True) + + def write(self, event: DomainEvent) -> None: + dataset = event.dataset + with self._engine.connect() as conn: + conn.execute( + self._table.insert().values( + event_type=type(event).event_type, + payload_json=dataset.model_dump(mode="json"), + source=dataset.provider, + dataset_id=dataset.dataset_id, + created_at=utcnow(), + ) + ) + conn.commit() + + def fetch_batch(self, last_event_id: int, batch_size: int) -> list: + """Returns a list of (event_id, domain_event) tuples.""" + with self._engine.connect() as conn: + rows = conn.execute( + select( + self._table.c.id, + self._table.c.event_type, + self._table.c.payload_json, + ) + .where(self._table.c.id > last_event_id) + .order_by(self._table.c.id) + .limit(batch_size) + ).fetchall() + + result = [] + for event_id, event_type, payload_json in rows: + event_cls = _EVENT_TYPE_MAP.get(event_type) + if event_cls is None: + logger.debug( + "Skipping unknown event_type=%r (id=%d)", event_type, event_id + ) + continue + payload = ( + payload_json + if isinstance(payload_json, dict) + else json.loads(payload_json) + ) + result.append( + (event_id, event_cls(dataset=Dataset.model_validate(payload))) + ) + + return result diff --git a/ingestify/infra/event_log/subscriber.py b/ingestify/infra/event_log/subscriber.py index 680462f..6eb413d 100644 --- a/ingestify/infra/event_log/subscriber.py +++ b/ingestify/infra/event_log/subscriber.py @@ -1,9 +1,8 @@ import logging from ingestify.domain.models.event import Subscriber -from ingestify.utils import utcnow -from .tables import get_tables +from .event_log import EventLog logger = logging.getLogger(__name__) @@ -21,38 +20,25 @@ class EventLogSubscriber(Subscriber): def __init__(self, store): super().__init__(store) session_provider = store.dataset_repository.session_provider - tables = get_tables(session_provider.table_prefix) - tables["metadata"].create_all(session_provider.engine, checkfirst=True) - self._engine = session_provider.engine - self._event_log_table = tables["event_log_table"] + self._event_log = EventLog( + session_provider.engine, session_provider.table_prefix + ) - def _write(self, event_type: str, dataset) -> None: + def _write(self, event) -> None: try: - with self._engine.connect() as conn: - conn.execute( - self._event_log_table.insert().values( - event_type=event_type, - payload_json=dataset.model_dump( - mode="json", exclude={"revisions"} - ), - source=dataset.provider, - dataset_id=dataset.dataset_id, - created_at=utcnow(), - ) - ) - conn.commit() + self._event_log.write(event) except Exception: logger.exception( "EventLogSubscriber: failed to write event_type=%r dataset_id=%r", - event_type, - dataset.dataset_id, + type(event).event_type, + event.dataset.dataset_id, ) def on_dataset_created(self, event) -> None: - self._write(type(event).event_type, event.dataset) + self._write(event) def on_metadata_updated(self, event) -> None: - self._write(type(event).event_type, event.dataset) + self._write(event) def on_revision_added(self, event) -> None: - self._write(type(event).event_type, event.dataset) + self._write(event) diff --git a/ingestify/tests/test_event_log.py b/ingestify/tests/test_event_log.py index cbd4b34..024dd9c 100644 --- a/ingestify/tests/test_event_log.py +++ b/ingestify/tests/test_event_log.py @@ -3,177 +3,76 @@ import pytest from sqlalchemy import create_engine +from ingestify.domain.models.dataset.dataset import Dataset, DatasetState +from ingestify.domain.models.dataset.events import DatasetCreated, RevisionAdded from ingestify.infra.event_log.consumer import EventLogConsumer +from ingestify.infra.event_log.event_log import EventLog from ingestify.infra.event_log.subscriber import EventLogSubscriber -from ingestify.infra.event_log.tables import get_tables from ingestify.utils import utcnow -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def make_consumer() -> EventLogConsumer: - consumer = EventLogConsumer("sqlite:///:memory:", reader_name="test") - return consumer - - -def insert_events(consumer: EventLogConsumer, *events): - """Insert (event_type, payload) tuples directly into the event_log table.""" - with consumer._engine.connect() as conn: - for event_type, payload in events: - conn.execute( - consumer._event_log_table.insert().values( - event_type=event_type, - payload_json=payload, - created_at=utcnow(), - ) - ) - conn.commit() - - -def make_subscriber() -> EventLogSubscriber: - engine = create_engine("sqlite:///:memory:") - mock_store = MagicMock() - mock_store.dataset_repository.session_provider.table_prefix = "" - mock_store.dataset_repository.session_provider.engine = engine - return EventLogSubscriber(mock_store) - - -# --------------------------------------------------------------------------- -# Consumer tests -# --------------------------------------------------------------------------- - - -def test_processes_events_in_order(): - consumer = make_consumer() - insert_events( - consumer, - ("dataset_created", {"dataset_id": "a"}), - ("revision_added", {"dataset_id": "b"}), - ("revision_added", {"dataset_id": "c"}), +@pytest.fixture +def dataset(): + return Dataset( + bucket="main", + dataset_id="ds1", + name="test", + state=DatasetState.COMPLETE, + dataset_type="match", + provider="test", + identifier={"match_id": "1"}, + metadata={}, + created_at=utcnow(), + updated_at=utcnow(), + last_modified_at=None, ) - processed = [] - consumer._run_once(lambda et, p: processed.append(p["dataset_id"])) - - assert processed == ["a", "b", "c"] +@pytest.fixture +def event_log(): + return EventLog(create_engine("sqlite:///:memory:")) -def test_cursor_advanced_after_each_event(): - consumer = make_consumer() - insert_events( - consumer, - ("dataset_created", {"dataset_id": "a"}), - ("revision_added", {"dataset_id": "b"}), - ) - cursors = [] - original = consumer._update_cursor +@pytest.fixture +def consumer(): + return EventLogConsumer("sqlite:///:memory:", reader_name="test") - def capture(conn, event_id): - cursors.append(event_id) - original(conn, event_id) - consumer._update_cursor = capture - consumer._run_once(lambda et, p: None) +@pytest.fixture +def subscriber(): + engine = create_engine("sqlite:///:memory:") + store = MagicMock() + store.dataset_repository.session_provider.table_prefix = "" + store.dataset_repository.session_provider.engine = engine + return EventLogSubscriber(store) - assert len(cursors) == 2 - assert cursors[0] < cursors[1] +def test_event_log_write_and_fetch(event_log, dataset): + event_log.write(RevisionAdded(dataset=dataset)) + _, event = event_log.fetch_batch(0, 10)[0] + assert isinstance(event, RevisionAdded) + assert event.dataset.dataset_id == "ds1" -def test_cursor_not_advanced_on_error(): - consumer = make_consumer() - insert_events(consumer, ("dataset_created", {"dataset_id": "a"})) - cursors = [] - original = consumer._update_cursor +def test_consumer_processes_events(consumer, dataset): + consumer._event_log.write(RevisionAdded(dataset=dataset)) + received = [] + consumer._run_once(lambda e: received.append(type(e))) + assert received == [RevisionAdded] - def capture(conn, event_id): - cursors.append(event_id) - original(conn, event_id) - - consumer._update_cursor = capture - exit_code = consumer._run_once( - lambda et, p: (_ for _ in ()).throw(RuntimeError("boom")) - ) +def test_consumer_cursor_not_advanced_on_error(consumer, dataset): + consumer._event_log.write(RevisionAdded(dataset=dataset)) + exit_code = consumer._run_once(lambda e: 1 / 0) assert exit_code == 1 - assert cursors == [] - - -def test_no_events_returns_zero(): - consumer = make_consumer() - exit_code = consumer._run_once(lambda et, p: None) - assert exit_code == 0 - - -def test_only_new_events_processed_after_cursor(): - consumer = make_consumer() - insert_events( - consumer, - ("dataset_created", {"dataset_id": "a"}), - ("revision_added", {"dataset_id": "b"}), - ) - - # consume first batch - consumer._run_once(lambda et, p: None) - - # insert a new event - insert_events(consumer, ("revision_added", {"dataset_id": "c"})) - - processed = [] - consumer._run_once(lambda et, p: processed.append(p["dataset_id"])) - - assert processed == ["c"] - - -# --------------------------------------------------------------------------- -# Subscriber tests -# --------------------------------------------------------------------------- - - -def make_dataset(dataset_id="ds1", provider="test"): - dataset = MagicMock() - dataset.dataset_id = dataset_id - dataset.provider = provider - dataset.model_dump.return_value = {"dataset_id": dataset_id, "provider": provider} - return dataset - - -def make_event(event_type, dataset): - event = MagicMock() - type(event).event_type = event_type - event.dataset = dataset - return event - - -def test_subscriber_writes_event(): - subscriber = make_subscriber() - subscriber.on_dataset_created(make_event("dataset_created", make_dataset())) - - with subscriber._engine.connect() as conn: - rows = conn.execute(subscriber._event_log_table.select()).fetchall() - - assert len(rows) == 1 - assert rows[0].event_type == "dataset_created" - assert rows[0].dataset_id == "ds1" - - -def test_subscriber_writes_all_event_types(): - subscriber = make_subscriber() - dataset = make_dataset() - - subscriber.on_dataset_created(make_event("dataset_created", dataset)) - subscriber.on_revision_added(make_event("revision_added", dataset)) - subscriber.on_metadata_updated(make_event("metadata_updated", dataset)) + # Next run still sees the same event + received = [] + consumer._run_once(lambda e: received.append(e)) + assert len(received) == 1 - with subscriber._engine.connect() as conn: - rows = conn.execute(subscriber._event_log_table.select()).fetchall() - assert [r.event_type for r in rows] == [ - "dataset_created", - "revision_added", - "metadata_updated", - ] +def test_subscriber_writes_event(subscriber, dataset): + subscriber.on_revision_added(RevisionAdded(dataset=dataset)) + _, event = subscriber._event_log.fetch_batch(0, 10)[0] + assert isinstance(event, RevisionAdded) + assert event.dataset.dataset_id == "ds1" From 8c6612148d53d3059d0e3d17b563d41fa8b293ea Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 18 Mar 2026 10:17:59 +0100 Subject: [PATCH 2/2] Store full event payload, use Pydantic deserialization, add cursor test - write() stores event.model_dump() so future event fields survive round-trip - fetch_batch() uses event_cls.model_validate() directly - Add test_consumer_only_processes_new_events --- ingestify/infra/event_log/event_log.py | 18 ++++------------- ingestify/tests/test_event_log.py | 28 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/ingestify/infra/event_log/event_log.py b/ingestify/infra/event_log/event_log.py index 52cee24..800128a 100644 --- a/ingestify/infra/event_log/event_log.py +++ b/ingestify/infra/event_log/event_log.py @@ -1,9 +1,7 @@ -import json import logging from sqlalchemy import select -from ingestify.domain.models.dataset.dataset import Dataset from ingestify.domain.models.dataset.events import ( DatasetCreated, MetadataUpdated, @@ -31,14 +29,13 @@ def __init__(self, engine, table_prefix: str = ""): self._table.create(engine, checkfirst=True) def write(self, event: DomainEvent) -> None: - dataset = event.dataset with self._engine.connect() as conn: conn.execute( self._table.insert().values( event_type=type(event).event_type, - payload_json=dataset.model_dump(mode="json"), - source=dataset.provider, - dataset_id=dataset.dataset_id, + payload_json=event.model_dump(mode="json"), + source=event.dataset.provider, + dataset_id=event.dataset.dataset_id, created_at=utcnow(), ) ) @@ -66,13 +63,6 @@ def fetch_batch(self, last_event_id: int, batch_size: int) -> list: "Skipping unknown event_type=%r (id=%d)", event_type, event_id ) continue - payload = ( - payload_json - if isinstance(payload_json, dict) - else json.loads(payload_json) - ) - result.append( - (event_id, event_cls(dataset=Dataset.model_validate(payload))) - ) + result.append((event_id, event_cls.model_validate(payload_json))) return result diff --git a/ingestify/tests/test_event_log.py b/ingestify/tests/test_event_log.py index 024dd9c..466116c 100644 --- a/ingestify/tests/test_event_log.py +++ b/ingestify/tests/test_event_log.py @@ -28,6 +28,23 @@ def dataset(): ) +@pytest.fixture +def dataset2(): + return Dataset( + bucket="main", + dataset_id="ds2", + name="test", + state=DatasetState.COMPLETE, + dataset_type="match", + provider="test", + identifier={"match_id": "2"}, + metadata={}, + created_at=utcnow(), + updated_at=utcnow(), + last_modified_at=None, + ) + + @pytest.fixture def event_log(): return EventLog(create_engine("sqlite:///:memory:")) @@ -71,6 +88,17 @@ def test_consumer_cursor_not_advanced_on_error(consumer, dataset): assert len(received) == 1 +def test_consumer_only_processes_new_events(consumer, dataset, dataset2): + consumer._event_log.write(RevisionAdded(dataset=dataset)) + consumer._run_once(lambda e: None) + + consumer._event_log.write(RevisionAdded(dataset=dataset2)) + received = [] + consumer._run_once(lambda e: received.append(e.dataset.dataset_id)) + + assert received == ["ds2"] + + def test_subscriber_writes_event(subscriber, dataset): subscriber.on_revision_added(RevisionAdded(dataset=dataset)) _, event = subscriber._event_log.fetch_batch(0, 10)[0]