Skip to content

Commit f99aa1f

Browse files
committed
master
1 parent 5c0ff70 commit f99aa1f

1 file changed

Lines changed: 95 additions & 56 deletions

File tree

reversibleai/core/annotations/manager.py

Lines changed: 95 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ def __init__(self, db_path: Optional[Path] = None) -> None:
5959

6060
def add_function_annotation(self, annotation: FunctionAnnotation) -> bool:
6161
"""Add a function annotation"""
62+
if not self.database:
63+
logger.warning("Database not available, cannot add annotation")
64+
return False
65+
6266
try:
6367
success = self.database.add_function_annotation(annotation)
6468
if success:
@@ -74,11 +78,27 @@ def get_function_annotation(self, address: int) -> Optional[FunctionAnnotation]:
7478

7579
def get_function_annotations_by_name(self, name: str) -> List[FunctionAnnotation]:
7680
"""Get function annotations by name"""
77-
return self.database.get_function_annotations_by_name(name)
81+
if not self.database:
82+
logger.warning("Database not available, cannot search functions")
83+
return []
84+
85+
try:
86+
return self.database.get_function_annotations_by_name(name)
87+
except Exception as e:
88+
logger.error(f"Failed to search functions: {e}")
89+
return []
7890

7991
def search_function_annotations(self, query: str) -> List[FunctionAnnotation]:
8092
"""Search function annotations"""
81-
return self.database.search_function_annotations(query)
93+
if not self.database:
94+
logger.warning("Database not available, cannot search functions")
95+
return []
96+
97+
try:
98+
return self.database.search_function_annotations(query)
99+
except Exception as e:
100+
logger.error(f"Failed to search functions: {e}")
101+
return []
82102

83103
def update_function_annotation(self, annotation: FunctionAnnotation) -> bool:
84104
"""Update a function annotation"""
@@ -104,6 +124,10 @@ def remove_function_annotation(self, address: int) -> bool:
104124

105125
def add_comment(self, comment: CommentAnnotation) -> bool:
106126
"""Add a comment annotation"""
127+
if not self.database:
128+
logger.warning("Database not available, cannot add comment")
129+
return False
130+
107131
try:
108132
success = self.database.add_comment(comment)
109133
if success:
@@ -115,88 +139,92 @@ def add_comment(self, comment: CommentAnnotation) -> bool:
115139

116140
def get_comments(self, address: Optional[int] = None) -> List[CommentAnnotation]:
117141
"""Get comments, optionally filtered by address"""
118-
return self.database.get_comments(address)
142+
if not self.database:
143+
logger.warning("Database not available, cannot get comments")
144+
return []
145+
146+
try:
147+
return self.database.get_comments(address)
148+
except Exception as e:
149+
logger.error(f"Failed to get comments: {e}")
150+
return []
119151

120152
def update_comment(self, comment: CommentAnnotation) -> bool:
121153
"""Update a comment"""
154+
if not self.database:
155+
logger.warning(" Database not available, cannot update comment")
156+
return False
157+
122158
try:
123159
success = self.database.update_comment(comment)
124160
if success:
125-
logger.debug(f"Updated comment at {hex(comment.address)}")
161+
logger.info(f"Updated comment at {hex(comment.address)}")
126162
return success
127163
except Exception as e:
128164
logger.error(f"Failed to update comment: {e}")
129165
return False
130166

131167
def remove_comment(self, address: int, comment_id: str) -> bool:
132168
"""Remove a comment"""
169+
if not self.database:
170+
logger.warning("Database not available, cannot remove comment")
171+
return False
172+
133173
try:
134174
success = self.database.remove_comment(address, comment_id)
135175
if success:
136-
logger.debug(f"Removed comment at {hex(address)}")
176+
logger.info(f"Removed comment at {hex(address)}")
137177
return success
138178
except Exception as e:
139179
logger.error(f"Failed to remove comment: {e}")
140180
return False
141181

142182
def auto_annotate_functions(self, functions: List[Dict[str, Any]]) -> int:
143183
"""Automatically annotate functions based on patterns and API info"""
184+
if not self.database:
185+
logger.warning("Database not available, cannot auto-annotate functions")
186+
return 0
187+
144188
annotated_count = 0
145189

146-
for func in functions:
147-
address = func.get('start_address', 0)
148-
name = func.get('name', '')
149-
150-
# Skip if already annotated
151-
if self.get_function_annotation(address):
152-
continue
153-
154-
# Try to get API information
155-
api_info = self.api_info.get_function_info(name)
156-
157-
if api_info:
158-
annotation = FunctionAnnotation(
159-
address=address,
160-
name=name,
161-
description=api_info.get('description', ''),
162-
parameters=api_info.get('parameters', []),
163-
return_value=api_info.get('return_value', {}),
164-
calling_convention=api_info.get('calling_convention', 'unknown'),
165-
tags=api_info.get('tags', []),
166-
confidence=api_info.get('confidence', 0.5),
167-
source='automatic',
168-
metadata={'api_source': api_info.get('source', 'unknown')}
169-
)
190+
try:
191+
for func in functions:
192+
if self._should_annotate_by_pattern(func):
193+
annotation = FunctionAnnotation(
194+
address=func.get('start_address', 0),
195+
name=func.get('name', ''),
196+
description=self.api_info.get_function_info(func.get('name', '')),
197+
parameters=func.get('parameters', []),
198+
return_value=func.get('return_value', {}),
199+
calling_convention=self.api_info.get_calling_convention(func.get('name', 'unknown')),
200+
tags=self.api_info.get_tags(func.get('name', [])),
201+
confidence=0.7, # Default confidence
202+
source='automatic',
203+
metadata={'auto_annotated': True}
204+
)
205+
206+
success = self.add_function_annotation(annotation)
207+
if success:
208+
annotated_count += 1
170209

171-
if self.add_function_annotation(annotation):
172-
annotated_count += 1
210+
logger.info(f"Auto-annotated {annotated_count} functions")
211+
return annotated_count
173212

174-
# Try pattern-based annotation
175-
elif self._should_annotate_by_pattern(func):
176-
annotation = self._create_pattern_annotation(func)
177-
if annotation and self.add_function_annotation(annotation):
178-
annotated_count += 1
179-
180-
logger.info(f"Auto-annotated {annotated_count} functions")
181-
return annotated_count
213+
except Exception as e:
214+
logger.error(f"Failed to auto-annotate functions: {e}")
215+
return 0
182216

183217
def _should_annotate_by_pattern(self, func: Dict[str, Any]) -> bool:
184218
"""Check if function should be annotated based on patterns"""
185219
name = func.get('name', '').lower()
186-
instructions = func.get('instructions', [])
187220

188221
# Check for common patterns
189-
if any(pattern in name for pattern in ['main', 'start', 'entry', 'dllmain']):
222+
if any(pattern in name for pattern in ['main', 'start', 'entry', 'dllmain', 'winmain', 'tls_callback', 'sub_', 'init', "start_"]):
190223
return True
191224

192-
# Check for API call patterns
193-
for insn in instructions:
194-
mnemonic = insn.get('mnemonic', '').lower()
195-
operands = insn.get('operands', '').lower()
196-
197-
if mnemonic == 'call':
198-
if any(api in operands for api in ['createfile', 'readfile', 'writefile', 'closehandle']):
199-
return True
225+
# Check for obfuscation patterns
226+
if any(pattern in name for pattern in ['sub_', 'xor', 'decode', 'encrypt', 'decrypt', 'hash', 'calc', 'compute']):
227+
return True
200228

201229
return False
202230

@@ -382,14 +410,25 @@ def suggest_annotations(self, func: Dict[str, Any]) -> List[Dict[str, Any]]:
382410
elif any(api in operands for api in ['socket', 'connect', 'send', 'recv']):
383411
api_calls.append('networking')
384412

385-
if api_calls:
386413
suggestions.append({
387-
'type': 'pattern_match',
388-
'confidence': 0.6,
389-
'annotation': {
390-
'description': f"Function performs {', '.join(set(api_calls))}",
391-
'tags': list(set(api_calls))
392-
}
414+
'type': 'api_match',
415+
'confidence': api_info.get('confidence', 0.5),
416+
'annotation': api_info
393417
})
394418

419+
# Check for instruction patterns
420+
api_calls = []
421+
for insn in instructions:
422+
mnemonic = insn.get('mnemonic', '').lower()
423+
operands = insn.get('operands', '').lower()
424+
425+
if mnemonic == 'call':
426+
if any(api in operands for api in ['createfile', 'readfile', 'writefile']):
427+
api_calls.append('file_operations')
428+
elif any(api in operands for api in ['createmutex', 'waitforsingleobject']):
429+
api_calls.append('synchronization')
430+
elif any(api in operands for api in ['socket', 'connect', 'send', 'recv']):
431+
api_calls.append('networking')
432+
return True
433+
395434
return suggestions

0 commit comments

Comments
 (0)