@@ -56,23 +56,17 @@ def preprocess_log(self, log_message):
5656 """
5757 # Common timestamp patterns across different log formats
5858 timestamp_date_patterns = [
59- # Android: "03-17 16:13:38.811"
6059 r'^(\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})' ,
6160
62- # Linux: "May 1 12:34:56"
6361 r'^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})' ,
6462
65- # Spark/HDFS: "20/05/01 12:34:56"
6663 r'^(\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})' ,
6764
68- # ISO format: "2023-05-01T12:34:56.789"
6965 r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3})' ,
7066
71- # Common syslog: "2023-05-01 12:34:56"
7267 r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'
7368 ]
7469
75- # Try to extract timestamp using the patterns
7670 timestamp = None
7771 for pattern in timestamp_date_patterns :
7872 match = re .search (pattern , log_message )
@@ -84,57 +78,45 @@ def preprocess_log(self, log_message):
8478 log_levels = ['INFO' , 'WARNING' , 'WARN' , 'ERROR' , 'DEBUG' , 'VERBOSE' , 'FATAL' , 'CRITICAL' , 'TRACE' , 'NOTICE' , 'SEVERE' , 'ALERT' ]
8579
8680 log_level = None
87- # Use a regex pattern that requires word boundaries to avoid partial matches
8881 for level in log_levels :
8982 pattern = r'\b' + re .escape (level ) + r'\b'
9083 match = re .search (pattern , log_message , re .IGNORECASE )
9184 if match :
92- # Use the original format from our list, not the matched text
9385 log_level = level
9486 break
9587
9688 # Extract the actual message content - look for patterns like "Component: message"
9789 message_patterns = [
98- # Component followed by colon and message
9990 r'[^:]+:\s+(.+)$' ,
100-
101- # Log level followed by component and message
10291 r'(?i)(?:WARNING|WARN|ERROR|VERBOSE|FATAL|CRITICAL|TRACE|NOTICE|SEVERE|ALERT)\s+[^:]+:\s+(.+)$'
10392 ]
10493
105- # Try to extract message using the patterns
106- message = log_message # Default to full log if no pattern matches
94+ message = log_message
10795 for pattern in message_patterns :
10896 match = re .search (pattern , log_message )
10997 if match :
11098 message = match .group (1 )
11199 break
112100
113- # Create the result with timestamp, log level, and message
114101 result = {
115102 "timestamp" : timestamp ,
116103 "message" : message
117104 }
118105
119- # Only include log_level if one was found
120106 if log_level :
121107 result ["log_level" ] = log_level
122108
123109 return result
124110
125111
126-
127112 def send_preprocessed_log (self , preprocessed_log ):
128113 """Send preprocessed log to output topic."""
129114 try :
130- # Add timestamp if not present
131115 if 'processing_timestamp' not in preprocessed_log :
132116 preprocessed_log ['processing_timestamp' ] = time .time ()
133117
134- # Send message to output topic
135118 future = self .producer .send (self .output_topic , preprocessed_log )
136119
137- # Block until message is sent (or timeout)
138120 record_metadata = future .get (timeout = 10 )
139121
140122 self .logger .info (f"Preprocessed log sent to { record_metadata .topic } :{ record_metadata .partition } :{ record_metadata .offset } " )
@@ -146,30 +128,24 @@ def send_preprocessed_log(self, preprocessed_log):
146128 def process_message (self , message ):
147129 """Process a message from Kafka."""
148130 try :
149- # Extract data from message
150131 topic = message .topic
151132 partition = message .partition
152133 offset = message .offset
153134 value = message .value
154135
155136 self .logger .info (f"Processing message from { topic } (partition={ partition } , offset={ offset } )" )
156137
157- # Get log message
158138 log_message = value .get ('message' , '' )
159139
160- # Preprocess log to extract timestamp, log level, and message
161140 preprocessed_log = self .preprocess_log (log_message )
162141
163- # Add processing timestamp
164142 preprocessed_log ['processing_timestamp' ] = time .time ()
165143
166- # Log preprocessing result
167144 log_info = f"Preprocessed log: timestamp={ preprocessed_log .get ('timestamp' )} "
168145 if 'log_level' in preprocessed_log :
169146 log_info += f", level={ preprocessed_log .get ('log_level' )} "
170147 self .logger .info (log_info )
171148
172- # Send preprocessed log to output topic
173149 self .send_preprocessed_log (preprocessed_log )
174150
175151 return True
@@ -183,7 +159,6 @@ def run(self):
183159 self .logger .info (f"Kafka consumer created successfully, listening to topic: { self .input_topic } " )
184160 self .logger .info (f"Preprocessed logs will be sent to topic: { self .output_topic } " )
185161
186- # Process messages continuously
187162 for message in self .consumer :
188163 self .process_message (message )
189164
@@ -204,6 +179,5 @@ def close(self):
204179 self .logger .info ("Producer closed" )
205180
206181if __name__ == "__main__" :
207- import time
208182 consumer = LogConsumer ()
209- consumer .run ()
183+ consumer .run ()
0 commit comments