1- import json
21import logging
32import time
43from typing import Callable , Optional
54
65from sqlalchemy import create_engine , select
76
7+ from .event_log import EventLog
88from .tables import get_tables
99
1010logger = logging .getLogger (__name__ )
@@ -14,35 +14,34 @@ class EventLogConsumer:
1414 """Cursor-based consumer for the event_log table.
1515
1616 Usage (run once, e.g. cron):
17- EventLogConsumer.from_config("ingestify.yaml", reader_name="importer ").run(on_event)
17+ EventLogConsumer.from_config("ingestify.yaml", reader_name="default ").run(on_event)
1818
1919 Usage (keep running, poll every 5 seconds):
20- EventLogConsumer.from_config("ingestify.yaml", reader_name="importer ").run(on_event, poll_interval=5)
20+ EventLogConsumer.from_config("ingestify.yaml", reader_name="default ").run(on_event, poll_interval=5)
2121
2222 Exit codes (returned by run):
2323 0 Batch processed successfully (or nothing new).
2424 1 A processing error occurred; cursor was NOT advanced.
2525 """
2626
2727 def __init__ (self , database_url : str , reader_name : str , table_prefix : str = "" ):
28+ engine = create_engine (database_url )
29+ self ._event_log = EventLog (engine , table_prefix )
2830 self ._reader_name = reader_name
31+ self ._engine = engine
2932 tables = get_tables (table_prefix )
30- self ._metadata = tables ["metadata" ]
31- self ._event_log_table = tables ["event_log_table" ]
3233 self ._reader_state_table = tables ["reader_state_table" ]
33- self ._engine = create_engine (database_url )
34- self ._metadata .create_all (self ._engine , checkfirst = True )
34+ self ._reader_state_table .create (engine , checkfirst = True )
3535
3636 @classmethod
3737 def from_config (cls , config_file : str , reader_name : str ) -> "EventLogConsumer" :
3838 from pyaml_env import parse_config
3939
4040 config = parse_config (config_file , default_value = "" )
4141 main = config ["main" ]
42- database_url = main ["metadata_url" ]
4342 table_prefix = main .get ("metadata_options" , {}).get ("table_prefix" , "" )
4443 return cls (
45- database_url = database_url ,
44+ database_url = main [ "metadata_url" ] ,
4645 reader_name = reader_name ,
4746 table_prefix = table_prefix ,
4847 )
@@ -70,18 +69,6 @@ def _get_last_event_id(self, conn) -> int:
7069 ).fetchone ()
7170 return row [0 ] if row else 0
7271
73- def _fetch_batch (self , conn , last_event_id : int , batch_size : int ) -> list :
74- return conn .execute (
75- select (
76- self ._event_log_table .c .id ,
77- self ._event_log_table .c .event_type ,
78- self ._event_log_table .c .payload_json ,
79- )
80- .where (self ._event_log_table .c .id > last_event_id )
81- .order_by (self ._event_log_table .c .id )
82- .limit (batch_size )
83- ).fetchall ()
84-
8572 def _update_cursor (self , conn , event_id : int ) -> None :
8673 conn .execute (
8774 self ._reader_state_table .update ()
@@ -94,27 +81,22 @@ def _run_once(self, on_event: Callable, batch_size: int = 100) -> int:
9481 with self ._engine .connect () as conn :
9582 self ._ensure_reader_state (conn )
9683 last_id = self ._get_last_event_id (conn )
97- rows = self ._fetch_batch (conn , last_id , batch_size )
9884
99- if not rows :
100- return 0
85+ rows = self ._event_log .fetch_batch (last_id , batch_size )
86+ if not rows :
87+ return 0
10188
102- for event_id , event_type , payload_json in rows :
89+ with self ._engine .connect () as conn :
90+ for event_id , event in rows :
10391 try :
104- payload = (
105- payload_json
106- if isinstance (payload_json , dict )
107- else json .loads (payload_json )
108- )
109- on_event (event_type , payload )
92+ on_event (event )
11093 except Exception :
11194 logger .exception (
11295 "Failed to process event id=%d type=%r — cursor NOT advanced" ,
11396 event_id ,
114- event_type ,
97+ type ( event ). event_type ,
11598 )
11699 return 1
117-
118100 self ._update_cursor (conn , event_id )
119101
120102 return 0
0 commit comments