Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ingestify/infra/event_log/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .consumer import EventLogConsumer
from .event_log import EventLog
from .subscriber import EventLogSubscriber

__all__ = ["EventLogConsumer", "EventLogSubscriber"]
__all__ = ["EventLog", "EventLogConsumer", "EventLogSubscriber"]
48 changes: 15 additions & 33 deletions ingestify/infra/event_log/consumer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
import logging
import time
from typing import Callable, Optional

from sqlalchemy import create_engine, select

from .event_log import EventLog
from .tables import get_tables

logger = logging.getLogger(__name__)
Expand All @@ -14,35 +14,34 @@ class EventLogConsumer:
"""Cursor-based consumer for the event_log table.

Usage (run once, e.g. cron):
EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event)
EventLogConsumer.from_config("ingestify.yaml", reader_name="default").run(on_event)

Usage (keep running, poll every 5 seconds):
EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event, poll_interval=5)
EventLogConsumer.from_config("ingestify.yaml", reader_name="default").run(on_event, poll_interval=5)

Exit codes (returned by run):
0 Batch processed successfully (or nothing new).
1 A processing error occurred; cursor was NOT advanced.
"""

def __init__(self, database_url: str, reader_name: str, table_prefix: str = ""):
engine = create_engine(database_url)
self._event_log = EventLog(engine, table_prefix)
self._reader_name = reader_name
self._engine = engine
tables = get_tables(table_prefix)
self._metadata = tables["metadata"]
self._event_log_table = tables["event_log_table"]
self._reader_state_table = tables["reader_state_table"]
self._engine = create_engine(database_url)
self._metadata.create_all(self._engine, checkfirst=True)
self._reader_state_table.create(engine, checkfirst=True)

@classmethod
def from_config(cls, config_file: str, reader_name: str) -> "EventLogConsumer":
from pyaml_env import parse_config

config = parse_config(config_file, default_value="")
main = config["main"]
database_url = main["metadata_url"]
table_prefix = main.get("metadata_options", {}).get("table_prefix", "")
return cls(
database_url=database_url,
database_url=main["metadata_url"],
reader_name=reader_name,
table_prefix=table_prefix,
)
Expand Down Expand Up @@ -70,18 +69,6 @@ def _get_last_event_id(self, conn) -> int:
).fetchone()
return row[0] if row else 0

def _fetch_batch(self, conn, last_event_id: int, batch_size: int) -> list:
return conn.execute(
select(
self._event_log_table.c.id,
self._event_log_table.c.event_type,
self._event_log_table.c.payload_json,
)
.where(self._event_log_table.c.id > last_event_id)
.order_by(self._event_log_table.c.id)
.limit(batch_size)
).fetchall()

def _update_cursor(self, conn, event_id: int) -> None:
conn.execute(
self._reader_state_table.update()
Expand All @@ -94,27 +81,22 @@ def _run_once(self, on_event: Callable, batch_size: int = 100) -> int:
with self._engine.connect() as conn:
self._ensure_reader_state(conn)
last_id = self._get_last_event_id(conn)
rows = self._fetch_batch(conn, last_id, batch_size)

if not rows:
return 0
rows = self._event_log.fetch_batch(last_id, batch_size)
if not rows:
return 0

for event_id, event_type, payload_json in rows:
with self._engine.connect() as conn:
for event_id, event in rows:
try:
payload = (
payload_json
if isinstance(payload_json, dict)
else json.loads(payload_json)
)
on_event(event_type, payload)
on_event(event)
except Exception:
logger.exception(
"Failed to process event id=%d type=%r — cursor NOT advanced",
event_id,
event_type,
type(event).event_type,
)
return 1

self._update_cursor(conn, event_id)

return 0
Expand Down
68 changes: 68 additions & 0 deletions ingestify/infra/event_log/event_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging

from sqlalchemy import select

from ingestify.domain.models.dataset.events import (
DatasetCreated,
MetadataUpdated,
RevisionAdded,
)
from ingestify.domain.models.event.domain_event import DomainEvent
from ingestify.utils import utcnow

from .tables import get_tables

logger = logging.getLogger(__name__)

_EVENT_TYPE_MAP = {
"dataset_created": DatasetCreated,
"revision_added": RevisionAdded,
"metadata_updated": MetadataUpdated,
}


class EventLog:
def __init__(self, engine, table_prefix: str = ""):
tables = get_tables(table_prefix)
self._engine = engine
self._table = tables["event_log_table"]
self._table.create(engine, checkfirst=True)

def write(self, event: DomainEvent) -> None:
with self._engine.connect() as conn:
conn.execute(
self._table.insert().values(
event_type=type(event).event_type,
payload_json=event.model_dump(mode="json"),
source=event.dataset.provider,
dataset_id=event.dataset.dataset_id,
created_at=utcnow(),
)
)
conn.commit()

def fetch_batch(self, last_event_id: int, batch_size: int) -> list:
"""Returns a list of (event_id, domain_event) tuples."""
with self._engine.connect() as conn:
rows = conn.execute(
select(
self._table.c.id,
self._table.c.event_type,
self._table.c.payload_json,
)
.where(self._table.c.id > last_event_id)
.order_by(self._table.c.id)
.limit(batch_size)
).fetchall()

result = []
for event_id, event_type, payload_json in rows:
event_cls = _EVENT_TYPE_MAP.get(event_type)
if event_cls is None:
logger.debug(
"Skipping unknown event_type=%r (id=%d)", event_type, event_id
)
continue
result.append((event_id, event_cls.model_validate(payload_json)))

return result
36 changes: 11 additions & 25 deletions ingestify/infra/event_log/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging

from ingestify.domain.models.event import Subscriber
from ingestify.utils import utcnow

from .tables import get_tables
from .event_log import EventLog

logger = logging.getLogger(__name__)

Expand All @@ -21,38 +20,25 @@ class EventLogSubscriber(Subscriber):
def __init__(self, store):
super().__init__(store)
session_provider = store.dataset_repository.session_provider
tables = get_tables(session_provider.table_prefix)
tables["metadata"].create_all(session_provider.engine, checkfirst=True)
self._engine = session_provider.engine
self._event_log_table = tables["event_log_table"]
self._event_log = EventLog(
session_provider.engine, session_provider.table_prefix
)

def _write(self, event_type: str, dataset) -> None:
def _write(self, event) -> None:
try:
with self._engine.connect() as conn:
conn.execute(
self._event_log_table.insert().values(
event_type=event_type,
payload_json=dataset.model_dump(
mode="json", exclude={"revisions"}
),
source=dataset.provider,
dataset_id=dataset.dataset_id,
created_at=utcnow(),
)
)
conn.commit()
self._event_log.write(event)
except Exception:
logger.exception(
"EventLogSubscriber: failed to write event_type=%r dataset_id=%r",
event_type,
dataset.dataset_id,
type(event).event_type,
event.dataset.dataset_id,
)

def on_dataset_created(self, event) -> None:
self._write(type(event).event_type, event.dataset)
self._write(event)

def on_metadata_updated(self, event) -> None:
self._write(type(event).event_type, event.dataset)
self._write(event)

def on_revision_added(self, event) -> None:
self._write(type(event).event_type, event.dataset)
self._write(event)
Loading
Loading