11from base_kafka import KafkaBase , json , logging
22import time
33from kafka import KafkaConsumer , KafkaProducer
4+ import ollama
45
56class AnomalyDetector (KafkaBase ):
6- """Detects anomalies in logs using an LLM."""
7+ """Detects anomalies in logs using an LLM via Ollama ."""
78
89 def __init__ (self , bootstrap_servers = ['localhost:9092' ],
910 input_topic = 'preprocessed-logs' ,
1011 output_topic = 'anomaly-results' ,
11- group_id = 'anomaly-detector' ):
12+ group_id = 'anomaly-detector' ,
13+ model = "llama3.1:8b" ):
1214 super ().__init__ (bootstrap_servers )
1315 self .input_topic = input_topic
1416 self .output_topic = output_topic
1517 self .group_id = group_id
18+ self .model = model
1619 self .consumer = self ._create_consumer ()
1720 self .producer = self ._create_producer ()
1821
1922 def _create_consumer (self ):
2023 """Create and return a Kafka consumer."""
2124 try :
25+ from kafka import KafkaConsumer
2226 consumer = KafkaConsumer (
2327 self .input_topic ,
2428 bootstrap_servers = self .bootstrap_servers ,
@@ -37,6 +41,7 @@ def _create_consumer(self):
3741 def _create_producer (self ):
3842 """Create and return a Kafka producer."""
3943 try :
44+ from kafka import KafkaProducer
4045 producer = KafkaProducer (
4146 bootstrap_servers = self .bootstrap_servers ,
4247 value_serializer = lambda v : json .dumps (v ).encode ('utf-8' ),
@@ -50,68 +55,99 @@ def _create_producer(self):
5055
5156 def detect_anomaly (self , log_data ):
5257 """
53- Use LLM to detect anomalies in the log.
54- This is a placeholder - you would implement your LLM call here.
58+ Use LLM (Ollama with llama3.1:8b) to detect anomalies in the log.
5559 """
56- # PLACEHOLDER: In a real implementation, you would:
57- # 1. Call your LLM API with the log data
58- # 2. Process the response
59- # 3. Return structured anomaly information
60-
61- # For demonstration, we'll implement a simple rule-based detection
62- is_anomaly = False
63- explanation = "No anomaly detected"
64-
65- # Check if the log has already been identified as a potential issue
66- if log_data .get ('potential_issue' , False ):
67- is_anomaly = True
68- explanation = f"Detected potential { log_data .get ('issue_type' , 'unknown' )} issue"
69-
70- # Check log level
71- if log_data .get ('log_level' ) in ['ERROR' , 'FATAL' , 'WARNING' ]:
72- is_anomaly = True
73- explanation = f"{ log_data .get ('log_level' )} level log indicates anomaly"
74-
75- # Look for error-related keywords in the message
76- error_keywords = ['crash' , 'exception' , 'failure' , 'failed' , 'error' , 'timeout' ]
77- message = log_data .get ('message' , '' ).lower ()
78- if any (keyword in message for keyword in error_keywords ):
79- is_anomaly = True
80- explanation = "Error keywords detected in log message"
81-
82- return {
83- "is_anomaly" : is_anomaly ,
84- "explanation" : explanation
85- }
60+ try :
61+ # Construct the prompt for the LLM
62+ timestamp = log_data .get ('timestamp' , 'No timestamp' )
63+ message = log_data .get ('message' , '' )
64+
65+ # Add log level if available
66+ log_level_text = ""
67+ if 'log_level' in log_data :
68+ log_level_text = f"Log Level: { log_data ['log_level' ]} \n "
69+
70+ prompt = f"""Analyze this log message and determine if it indicates an anomaly or issue:
71+ Timestamp: { timestamp }
72+ { log_level_text } Message: { message }
73+
74+ Is this log message indicating a problem, error, or unusual behavior?
75+ Respond with a JSON object with the following fields:
76+ - is_anomaly: boolean (true if this is an anomaly, false otherwise)
77+ - explanation: short explanation of why this is or isn't an anomaly
78+
79+ Your response should be valid JSON with no additional text."""
80+
81+ self .logger .info (f"Calling LLM for log analysis" )
82+ response = ollama .chat (model = self .model , messages = [
83+ {
84+ "role" : "user" ,
85+ "content" : prompt
86+ }
87+ ])
88+
89+ response_text = response .get ('message' , {}).get ('content' , '' )
90+ self .logger .info (f"LLM response received, length: { len (response_text )} " )
91+
92+ try :
93+ json_start = response_text .find ('{' )
94+ json_end = response_text .rfind ('}' ) + 1
95+
96+ if json_start >= 0 and json_end > json_start :
97+ json_part = response_text [json_start :json_end ]
98+ result = json .loads (json_part )
99+ else :
100+ raise ValueError ("No JSON content found in response" )
101+
102+ if 'is_anomaly' not in result :
103+ result ['is_anomaly' ] = False
104+ if 'explanation' not in result :
105+ result ['explanation' ] = "No explanation provided by LLM"
106+
107+ return result
108+
109+ except json .JSONDecodeError as e :
110+ self .logger .error (f"Failed to parse LLM response as JSON: { e } " )
111+ self .logger .error (f"Raw response: { response_text } " )
112+ return {
113+ "is_anomaly" : False ,
114+ "explanation" : f"Error parsing LLM response: { str (e )} " ,
115+ "raw_response" : response_text
116+ }
117+
118+ except Exception as e :
119+ self .logger .error (f"Error in anomaly detection: { e } " )
120+ return {
121+ "is_anomaly" : False ,
122+ "explanation" : f"Error in anomaly detection: { str (e )} " ,
123+ "error" : str (e )
124+ }
86125
87126 def process_message (self , message ):
88127 """Process a preprocessed log message and detect anomalies."""
89128 try :
90- # Extract data from message
91129 topic = message .topic
92130 partition = message .partition
93131 offset = message .offset
94132 log_data = message .value
95133
96134 self .logger .info (f"Processing preprocessed log from { topic } (partition={ partition } , offset={ offset } )" )
97135
98- # Skip unparsed logs
99- if not log_data .get ('parsed' , False ) and 'raw_message' in log_data :
100- self .logger .warning (f"Skipping unparsed log: { log_data .get ('raw_message' )[:50 ]} ..." )
101- return False
102-
103- # Detect anomalies
104136 anomaly_result = self .detect_anomaly (log_data )
105137
106- # Create result message
107138 result = {
108139 "original_log" : log_data ,
109- "anomaly_detection" : anomaly_result
140+ "anomaly_detection" : anomaly_result ,
141+ "processing_timestamp" : time .time ()
110142 }
111143
112- # Send result to output topic
113144 self .send_result (result )
114145
146+ if anomaly_result .get ('is_anomaly' , False ):
147+ severity = anomaly_result .get ('severity' , 'UNKNOWN' )
148+ explanation = anomaly_result .get ('explanation' , 'No explanation provided' )
149+ self .logger .warning (f"ANOMALY DETECTED ({ severity } ): { explanation } " )
150+
115151 return True
116152 except Exception as e :
117153 self .logger .error (f"Error processing message: { e } " )
@@ -124,11 +160,6 @@ def send_result(self, result):
124160 record_metadata = future .get (timeout = 10 )
125161
126162 self .logger .info (f"Anomaly result sent to { record_metadata .topic } :{ record_metadata .partition } :{ record_metadata .offset } " )
127-
128- # Log the anomaly if detected
129- if result .get ('anomaly_detection' , {}).get ('is_anomaly' , False ):
130- self .logger .warning (f"ANOMALY DETECTED: { result ['anomaly_detection' ]['explanation' ]} " )
131-
132163 return True
133164 except Exception as e :
134165 self .logger .error (f"Error sending anomaly result to Kafka: { e } " )
@@ -137,10 +168,9 @@ def send_result(self, result):
137168 def run (self ):
138169 """Run the anomaly detector."""
139170 try :
140- self .logger .info (f"Anomaly detector started, listening to topic: { self .input_topic } " )
171+ self .logger .info (f"Anomaly detector started with model { self . model } , listening to topic: { self .input_topic } " )
141172 self .logger .info (f"Results will be sent to topic: { self .output_topic } " )
142173
143- # Process messages continuously
144174 for message in self .consumer :
145175 self .process_message (message )
146176
@@ -162,4 +192,4 @@ def close(self):
162192
163193if __name__ == "__main__" :
164194 detector = AnomalyDetector ()
165- detector .run ()
195+ detector .run ()
0 commit comments