-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathemb_coverage.py
More file actions
375 lines (330 loc) · 19.3 KB
/
emb_coverage.py
File metadata and controls
375 lines (330 loc) · 19.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
import json
import subprocess
from json import JSONDecodeError
from typing import List, Tuple, Any
from cldk.analysis.java import JavaAnalysis
from reachability_emb import EMBReachability
import re
class EMBCoverage:
def __init__(self, analysis: JavaAnalysis, jacoco_port_number):
self.analysis = analysis
self.jacoco_port_number = jacoco_port_number
@staticmethod
def execute_http_requests(http_commands: List[str]) -> List[Tuple[int, Any]]:
"""
Execute a list of curl commands and return their responses with status codes.
Args:
http_commands (List[str]): List of curl commands to execute
Returns:
List[Tuple[int, Any]]: List of tuples containing (status_code, response)
"""
responses = []
for cmd in http_commands:
# Append --write-out for status code and separate response
cmd_with_status = f"{cmd} --silent --write-out 'STATUS:%{{http_code}}'"
# Execute the command
process = subprocess.Popen(
cmd_with_status,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
stdout, stderr = process.communicate(timeout=50)
# Split response and status code using a unique delimiter
if "STATUS:" in stdout:
response_text, status_code = stdout.rsplit("STATUS:", 1)
response_text = response_text.strip()
status_code = int(status_code.strip())
else:
# Handle cases where "STATUS:" is missing
response_text = stdout.strip() or stderr.strip()
status_code = 0 # Default to 0 if no status code is captured
responses.append((status_code, response_text))
except Exception as e:
print(f"Error executing http request: {cmd}: {e}")
responses.append((-100, e))
continue
return responses
def get_db_coverage(self):
"""
Computes the database coverage for the entire app
Returns:
"""
processed_uncovered_lines = {}
crud_operations = self.analysis.get_all_crud_operations()
# Get all the uncovered lines
uncovered_lines = self.execute_http_requests(
[f"curl -X GET http://localhost:{self.jacoco_port_number}/uncovered"]
)[0]
try:
coverage_details = json.loads(uncovered_lines[1])
# Go through each of the class
for klazz in coverage_details:
# Remove Test classes
class_name = klazz.split('.')[-1]
if (not class_name.startswith('Test') and
not class_name.startswith('Tests') and
not class_name.endswith('Test') and
not klazz.endswith('Tests')):
methods_in_class = {}
# Get all the methods in the class
methods = self.analysis.get_methods_in_class(qualified_class_name=klazz)
# Store the method details
for method in methods:
db_lines_per_method = [crud_operation.line_number
for crud_operation in methods[method].crud_operations]
# Add all the method call on Entity objects
for call_site in methods[method].call_sites:
if call_site.receiver_type != None:
receiver_class = self.analysis.get_class(call_site.receiver_type)
if receiver_class:
# Check if receiver class has any of the JPA ORM annotations
if any(annotation in ['@Entity']
for annotation in receiver_class.annotations):
# Remove any method within the entity class
if receiver_class != klazz:
db_lines_per_method.append(call_site.start_line)
# Add method calls where the callee method has annotation @Transactional (applicable for mybatis)
for call_site in methods[method].call_sites:
callee_method = self.analysis.get_method(
qualified_class_name=call_site.receiver_type,
qualified_method_name=self.__process_callee_signature(call_site.callee_signature))
if callee_method:
if any(annotation in ['@Transactional']
for annotation in callee_method.annotations):
db_lines_per_method.append(call_site.start_line)
db_lines_per_method = list(set(db_lines_per_method))
methods_in_class[method] = [methods[method].start_line,
methods[method].end_line,
db_lines_per_method]
# Get uncovered line details
for method in coverage_details[klazz]:
method_name = method.split(':')[0]
line_number = method.split(':')[-1]
for existing_methods in methods_in_class:
# JaCoCo returns the covered line, which may not be start line of the method
if methods_in_class[existing_methods][0] <= int(line_number) <= \
methods_in_class[existing_methods][1]:
uncovered_lines = coverage_details[klazz][method]
# Capture database uncovered lines
db_uncovered_lines = []
db_covered_lines = []
db_lines_per_method = methods_in_class[existing_methods][2]
db_line_coverage = 0
# Go through each database interaction point
for line in db_lines_per_method:
if line not in uncovered_lines:
db_line_coverage += 1
db_covered_lines.append(line)
else:
db_uncovered_lines.append(line)
method_dict = {
"method_signature": existing_methods,
"total_db_line_count": len(db_lines_per_method),
"db_line_coverage": db_line_coverage / len(db_lines_per_method) * 100.0 if
len(db_lines_per_method) > 0 else -100.0,
"db_uncovered_lines": db_uncovered_lines,
"db_covered_lines": db_covered_lines
}
if len(db_lines_per_method) > 0:
if klazz not in processed_uncovered_lines:
processed_uncovered_lines[klazz] = [method_dict]
else:
processed_uncovered_lines[klazz].append(method_dict)
return processed_uncovered_lines
except JSONDecodeError:
return []
def get_coverage(self) -> dict:
"""
Computes and returns the coverage using coverage monitoring agent
Args:
port: port number where the agent is running
Returns:
str: string representation of the json format response
"""
uncovered_lines = self.execute_http_requests(
[f"curl -X GET http://localhost:{self.jacoco_port_number}/coverage"]
)[0]
try:
current_coverage_details = json.loads(uncovered_lines[1])
return current_coverage_details
except JSONDecodeError:
return dict()
def get_app_coverage(self) -> dict:
"""
Computes and returns the application coverage using coverage monitoring agent
Args:
Returns:
CoverageEvaluation: application coverage
"""
uncovered_lines = self.execute_http_requests(
[f"curl -X GET http://localhost:{self.jacoco_port_number}/appcoverage"]
)[0]
try:
total_db_line = 0
total_covered_db_line = 0
current_coverage_details = json.loads(uncovered_lines[1])
keys = list(current_coverage_details.keys())
current_coverage_details = current_coverage_details[keys[0]]
db_coverage = self.get_db_coverage()
for klazz in db_coverage:
for method in db_coverage[klazz]:
total_db_line += method["total_db_line_count"]
total_covered_db_line += method["total_db_line_count"] - len(method["db_uncovered_lines"])
coverage = {"line_coverage": current_coverage_details["line"],
"branch_coverage": current_coverage_details["branch"],
"instruction_coverage": current_coverage_details["instruction"],
"database_interaction_line_coverage":
total_covered_db_line / total_db_line * 100.0 if
total_db_line > 0 else -100.0}
return coverage
except JSONDecodeError:
return {}
def get_method_coverage(self) -> str:
"""
Computes and returns the method-level coverage using coverage monitoring agent
Args:
port: port number where the agent is running
Returns:
str: string representation of the json format response
"""
uncovered_lines = self.execute_http_requests(
[f"curl -X GET http://localhost:{self.jacoco_port_number}/methodcoverage"]
)[0]
coverage_details = json.loads(uncovered_lines[1])
return coverage_details
def get_reachability_coverage(self) -> dict:
"""
Computes and returns the reachable coverage using coverage monitoring agent
Args:
class_method_pairs:
Returns:
"""
# Get coverage details from the coverage monitor
coverage_details = self.get_method_coverage()
db_coverage = self.get_db_coverage()
covered_db_interaction_lines = 0
total_db_interaction_lines = 0
total_lines = 0
total_branches = 0
total_inst = 0
covered_lines = 0
covered_branches = 0
covered_inst = 0
processed_methods = []
coverage_dict = {}
db_uncovered_lines_app = {}
db_covered_lines_app = {}
class_method_pairs = []
reachability = EMBReachability(self.analysis)
# gather all endpoints
for cls in self.analysis.get_classes():
all_method = self.analysis.get_methods_in_class(cls)
# Get lines that are uncovered
endpoint_methods = [method for method in all_method if all_method[method].is_entrypoint]
for endpoint_method in endpoint_methods:
class_method_pairs.append([cls, endpoint_method])
# Go through each endpoint class and method
for class_method_pair in class_method_pairs:
qualified_class_name = class_method_pair[0]
method_signature = class_method_pair[1]
# Get all the reachable methods
all_reachable_methods = reachability.get_reachable_methods(qualified_class_name,
method_signature,
depth=1000)
# Go through each reachable method
for method in all_reachable_methods:
if method not in processed_methods:
# class name is present
if method["qualified_class_name"] in coverage_details:
# Go through each method from the jacoco agent report
for method_jacoco in coverage_details[method["qualified_class_name"]]:
method_name = method_jacoco.split(':')[0]
start_line = int(method_jacoco.split(':')[1])
# Match name and the start line. Since start line is off by 1, we used the range
if (method_name == method["method_signature"].split('(')[0] and
method["end_line"] >= start_line >= method["start_line"]):
method_coverage_details = coverage_details[method["qualified_class_name"]][
method_jacoco]
covered_db_interaction_lines_per_method = 0
total_db_interaction_lines_per_method = 0
db_uncovered_lines = []
db_covered_lines = []
# Get database interaction coverage
if method["qualified_class_name"] in db_coverage:
for method_db in db_coverage[method["qualified_class_name"]]:
if method["method_signature"] == method_db["method_signature"]:
total_db_interaction_lines_per_method = method_db["total_db_line_count"]
covered_db_interaction_lines_per_method = (
total_db_interaction_lines_per_method - len(
method_db["db_uncovered_lines"]))
if total_db_interaction_lines_per_method > 0:
db_uncovered_lines = method_db["db_uncovered_lines"]
db_covered_lines = method_db["db_covered_lines"]
db_uncovered_lines_app[str(Tuple[method["qualified_class_name"],
method["method_signature"]])] = method_db["db_uncovered_lines"]
db_covered_lines_app[str(Tuple[method["qualified_class_name"],
method["method_signature"]])] = method_db["db_covered_lines"]
# Collect data for overall coverage
total_lines += method_coverage_details["totalLines"]
total_branches += method_coverage_details["totalBranches"]
total_inst += method_coverage_details["totalInsts"]
covered_lines += method_coverage_details["coveredLines"]
covered_branches += method_coverage_details["fullyCoveredBranches"]
covered_inst += method_coverage_details["coveredInsts"]
total_db_interaction_lines += total_db_interaction_lines_per_method
covered_db_interaction_lines += covered_db_interaction_lines_per_method
# Store method wise coverage
coverage = {"method_signature": method["method_signature"],
"line_coverage": (method_coverage_details["coveredLines"] /
method_coverage_details["totalLines"]) * 100.0 if
method_coverage_details["totalLines"] > 0 else -100.0,
"branch_coverage": (method_coverage_details["fullyCoveredBranches"] /
method_coverage_details["totalBranches"]) * 100.0 if
method_coverage_details["totalBranches"] > 0 else -100.0,
"instruction_coverage": (method_coverage_details["coveredInsts"] /
method_coverage_details["totalInsts"]) * 100.0 if
method_coverage_details["totalInsts"] > 0 else -100.0,
"database_interaction_coverage": (covered_db_interaction_lines_per_method /
total_db_interaction_lines_per_method) * 100.0 if
total_db_interaction_lines_per_method > 0 else -100.0,
"database_uncovered_lines": db_uncovered_lines if
len(db_uncovered_lines) > 0 else None,
"database_covered_lines": db_covered_lines if
len(db_covered_lines) > 0 else None,
}
if method["qualified_class_name"] not in coverage_dict:
coverage_dict[method["qualified_class_name"]] = [coverage]
else:
coverage_dict[method["qualified_class_name"]].append(coverage)
processed_methods.append(method)
# Add overall coverage
coverage_dict["overall_coverage"] = [
{"line_coverage": (covered_lines / total_lines) * 100.0 if total_lines > 0 else -100,
"branch_coverage": (covered_branches / total_branches) * 100.0 if total_branches > 0 else -100,
"instruction_coverage": (covered_inst / total_inst) * 100.0 if total_inst > 0 else -100,
"database_interaction_coverage": (covered_db_interaction_lines / total_db_interaction_lines)
* 100.0 if total_db_interaction_lines > 0 else -100,
"database_uncovered_lines": json.dumps(db_uncovered_lines_app),
"database_covered_lines": json.dumps(db_covered_lines_app)}, ]
return coverage_dict
@staticmethod
def __process_callee_signature(callee_signature: str) -> str:
"""
Processes callee signature
Args:
callee_signature:
Returns:
"""
pattern = r"\b(?:[a-zA-Z_][\w\.]*\.)+([a-zA-Z_][\w]*)\b|<[^>]*>"
# Find the part within the parentheses
start = callee_signature.find("(") + 1
end = callee_signature.rfind(")")
# Extract the elements inside the parentheses
elements = callee_signature[start:end].split(",")
# Apply the regex to each element
simplified_elements = [re.sub(pattern, r"\1", element.strip()) for element in elements]
# Reconstruct the string with simplified elements
return f"{callee_signature[:start]}{', '.join(simplified_elements)}{callee_signature[end:]}"